Skip to content

Commit c37d23b

Browse files
committed
Added Schedule
1 parent bf96176 commit c37d23b

File tree

7 files changed

+417
-41
lines changed

7 files changed

+417
-41
lines changed

agent/config/config.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,15 @@ import (
66
)
77

88
type AgentConfigurations struct {
9-
SANamespace string `envconfig:"SA_NAMESPACE" default:"default"`
10-
SAName string `envconfig:"SA_NAME" default:"default"`
9+
SANamespace string `envconfig:"SA_NAMESPACE" default:"default"`
10+
SAName string `envconfig:"SA_NAME" default:"default"`
11+
OutdatedInterval string `envconfig:"OUTDATED_INTERVAL" default:"*/20 * * * *"`
12+
GetAllInterval string `envconfig:"GETALL_INTERVAL" default:"*/30 * * * *"`
13+
KubeScoreInterval string `envconfig:"KUBESCORE_INTERVAL" default:"*/40 * * * *"`
14+
RakkessInterval string `envconfig:"RAKKESS_INTERVAL" default:"*/50 * * * *"`
15+
KubePreUpgradeInterval string `envconfig:"KUBEPREUPGRADE_INTERVAL" default:"*/60 * * * *"`
16+
TrivyInterval string `envconfig:"TRIVY_INTERVAL" default:"*/10 * * * *"`
17+
SchedulerEnable bool `envconfig:"SCHEDULER_ENABLE" default:"false"`
1118
}
1219

1320
func GetAgentConfigurations() (serviceConf *AgentConfigurations, err error) {

agent/kubviz/k8smetrics_agent.go

Lines changed: 97 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@ import (
44
"encoding/json"
55
"log"
66
"os"
7+
"os/signal"
78
"strconv"
89
"strings"
10+
"syscall"
911
"time"
1012

13+
"github.com/intelops/go-common/logging"
14+
1115
"github.com/go-co-op/gocron"
1216
"github.com/nats-io/nats.go"
1317

@@ -22,6 +26,7 @@ import (
2226

2327
"fmt"
2428

29+
"github.com/intelops/kubviz/agent/config"
2530
v1 "k8s.io/api/core/v1"
2631
"k8s.io/apimachinery/pkg/fields"
2732
_ "k8s.io/client-go/plugin/pkg/client/auth/azure"
@@ -77,6 +82,10 @@ func main() {
7782
log.SetFlags(log.LstdFlags | log.Lshortfile)
7883
env := Production
7984
clusterMetricsChan := make(chan error, 1)
85+
cfg, err := config.GetAgentConfigurations()
86+
if err != nil {
87+
log.Fatal("Failed to retrieve agent configurations", err)
88+
}
8089
var (
8190
config *rest.Config
8291
clientset *kubernetes.Clientset
@@ -126,15 +135,30 @@ func main() {
126135
if schedulingIntervalStr == "" {
127136
schedulingIntervalStr = "20m"
128137
}
129-
schedulingInterval, err := time.ParseDuration(schedulingIntervalStr)
130-
if err != nil {
131-
log.Fatalf("Failed to parse SCHEDULING_INTERVAL: %v", err)
138+
if cfg.SchedulerEnable { // Assuming "cfg.Schedule" is a boolean indicating whether to schedule or not.
139+
scheduler := initScheduler(config, js, *cfg, clientset)
140+
141+
// Start the scheduler
142+
scheduler.Start()
143+
signals := make(chan os.Signal, 1)
144+
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
145+
<-signals
146+
147+
scheduler.Stop()
148+
} else {
149+
if schedulingIntervalStr == "" {
150+
schedulingIntervalStr = "20m"
151+
}
152+
schedulingInterval, err := time.ParseDuration(schedulingIntervalStr)
153+
if err != nil {
154+
log.Fatalf("Failed to parse SCHEDULING_INTERVAL: %v", err)
155+
}
156+
s := gocron.NewScheduler(time.UTC)
157+
s.Every(schedulingInterval).Do(func() {
158+
collectAndPublishMetrics()
159+
})
160+
s.StartBlocking()
132161
}
133-
s := gocron.NewScheduler(time.UTC)
134-
s.Every(schedulingInterval).Do(func() {
135-
collectAndPublishMetrics()
136-
})
137-
s.StartBlocking()
138162
}
139163

140164
// publishMetrics publishes stream of events
@@ -272,3 +296,68 @@ func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) {
272296
time.Sleep(time.Second)
273297
}
274298
}
299+
func initScheduler(config *rest.Config, js nats.JetStreamContext, cfg config.AgentConfigurations, clientset *kubernetes.Clientset) (s *Scheduler) {
300+
log := logging.NewLogger()
301+
s = NewScheduler(log)
302+
if cfg.OutdatedInterval != "" {
303+
sj, err := NewOutDatedImagesJob(config, js, cfg.OutdatedInterval)
304+
if err != nil {
305+
log.Fatal("no time interval", err)
306+
}
307+
err = s.AddJob("Outdated", sj)
308+
if err != nil {
309+
log.Fatal("failed to do job", err)
310+
}
311+
}
312+
if cfg.GetAllInterval != "" {
313+
sj, err := NewKetallJob(config, js, cfg.GetAllInterval)
314+
if err != nil {
315+
log.Fatal("no time interval", err)
316+
}
317+
err = s.AddJob("GetALL", sj)
318+
if err != nil {
319+
log.Fatal("failed to do job", err)
320+
}
321+
}
322+
if cfg.KubeScoreInterval != "" {
323+
sj, err := NewKubescoreJob(clientset, js, cfg.KubeScoreInterval)
324+
if err != nil {
325+
log.Fatal("no time interval", err)
326+
}
327+
err = s.AddJob("KubeScore", sj)
328+
if err != nil {
329+
log.Fatal("failed to do job", err)
330+
}
331+
}
332+
if cfg.RakkessInterval != "" {
333+
sj, err := NewRakkessJob(config, js, cfg.RakkessInterval)
334+
if err != nil {
335+
log.Fatal("no time interval", err)
336+
}
337+
err = s.AddJob("Rakkess", sj)
338+
if err != nil {
339+
log.Fatal("failed to do job", err)
340+
}
341+
}
342+
if cfg.KubePreUpgradeInterval != "" {
343+
sj, err := NewKubePreUpgradeJob(config, js, cfg.KubePreUpgradeInterval)
344+
if err != nil {
345+
log.Fatal("no time interval", err)
346+
}
347+
err = s.AddJob("KubePreUpgrade", sj)
348+
if err != nil {
349+
log.Fatal("failed to do job", err)
350+
}
351+
}
352+
if cfg.TrivyInterval != "" {
353+
sj, err := NewTrivyJob(config, js, cfg.TrivyInterval)
354+
if err != nil {
355+
log.Fatal("no time interval", err)
356+
}
357+
err = s.AddJob("Trivy", sj)
358+
if err != nil {
359+
log.Fatal("failed to do job", err)
360+
}
361+
}
362+
return
363+
}

agent/kubviz/scheduler.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package main
2+
3+
import (
4+
"sync"
5+
6+
"github.com/pkg/errors"
7+
"github.com/robfig/cron/v3"
8+
9+
"github.com/intelops/go-common/logging"
10+
)
11+
12+
type jobHandler interface {
13+
CronSpec() string
14+
Run()
15+
}
16+
17+
type Scheduler struct {
18+
log logging.Logger
19+
jobs map[string]jobHandler
20+
cronIDs map[string]cron.EntryID
21+
c *cron.Cron
22+
cronMutex *sync.Mutex
23+
}
24+
25+
func NewScheduler(log logging.Logger) *Scheduler {
26+
clog := cron.VerbosePrintfLogger(log.(logging.StdLogger))
27+
return &Scheduler{
28+
log: log,
29+
c: cron.New(cron.WithChain(cron.SkipIfStillRunning(clog), cron.Recover(clog))),
30+
jobs: map[string]jobHandler{},
31+
cronIDs: map[string]cron.EntryID{},
32+
cronMutex: &sync.Mutex{},
33+
}
34+
}
35+
36+
func (t *Scheduler) AddJob(jobName string, job jobHandler) error {
37+
t.cronMutex.Lock()
38+
defer t.cronMutex.Unlock()
39+
_, ok := t.cronIDs[jobName]
40+
if ok {
41+
return errors.Errorf("%s job already exists", jobName)
42+
}
43+
spec := job.CronSpec()
44+
if spec == "" {
45+
return errors.Errorf("%s job has no cron spec", jobName)
46+
}
47+
entryID, err := t.c.AddJob(spec, job)
48+
if err != nil {
49+
return errors.WithMessagef(err, "%s job cron spec not valid", jobName)
50+
}
51+
52+
t.jobs[jobName] = job
53+
t.cronIDs[jobName] = entryID
54+
t.log.Infof("%s job added with cron '%s'", jobName, spec)
55+
return nil
56+
}
57+
58+
// RemoveJob ...
59+
func (t *Scheduler) RemoveJob(jobName string) error {
60+
t.cronMutex.Lock()
61+
defer t.cronMutex.Unlock()
62+
entryID, ok := t.cronIDs[jobName]
63+
if !ok {
64+
return errors.Errorf("%s job not exist", jobName)
65+
}
66+
67+
t.c.Remove(entryID)
68+
delete(t.jobs, jobName)
69+
delete(t.cronIDs, jobName)
70+
t.log.Infof("%s job removed", jobName)
71+
return nil
72+
}
73+
74+
func (t *Scheduler) Start() {
75+
t.c.Start()
76+
t.log.Infof("Job scheduler started")
77+
}
78+
79+
func (t *Scheduler) Stop() {
80+
t.c.Stop()
81+
t.log.Infof("Job scheduler stopped")
82+
}
83+
84+
func (t *Scheduler) GetJobs() map[string]jobHandler {
85+
t.cronMutex.Lock()
86+
defer t.cronMutex.Unlock()
87+
return t.jobs
88+
}

agent/kubviz/scheduler_watch.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package main
2+
3+
import (
4+
"github.com/nats-io/nats.go"
5+
"k8s.io/client-go/kubernetes"
6+
"k8s.io/client-go/rest"
7+
)
8+
9+
type OutDatedImagesJob struct {
10+
config *rest.Config
11+
js nats.JetStreamContext
12+
frequency string
13+
}
14+
15+
type KetallJob struct {
16+
config *rest.Config
17+
js nats.JetStreamContext
18+
frequency string
19+
}
20+
type TrivyJob struct {
21+
config *rest.Config
22+
js nats.JetStreamContext
23+
frequency string
24+
}
25+
type RakkessJob struct {
26+
config *rest.Config
27+
js nats.JetStreamContext
28+
frequency string
29+
}
30+
type KubePreUpgradeJob struct {
31+
config *rest.Config
32+
js nats.JetStreamContext
33+
frequency string
34+
}
35+
type KubescoreJob struct {
36+
clientset *kubernetes.Clientset
37+
js nats.JetStreamContext
38+
frequency string
39+
}
40+
41+
func NewOutDatedImagesJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*OutDatedImagesJob, error) {
42+
return &OutDatedImagesJob{
43+
config: config,
44+
js: js,
45+
frequency: frequency,
46+
}, nil
47+
}
48+
func (v *OutDatedImagesJob) CronSpec() string {
49+
return v.frequency
50+
}
51+
52+
func (j *OutDatedImagesJob) Run() {
53+
// Call the outDatedImages function with the provided config and js
54+
err := outDatedImages(j.config, j.js)
55+
LogErr(err)
56+
}
57+
func NewKetallJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*KetallJob, error) {
58+
return &KetallJob{
59+
config: config,
60+
js: js,
61+
frequency: frequency,
62+
}, nil
63+
}
64+
func (v *KetallJob) CronSpec() string {
65+
return v.frequency
66+
}
67+
68+
func (j *KetallJob) Run() {
69+
// Call the Ketall function with the provided config and js
70+
err := GetAllResources(j.config, j.js)
71+
LogErr(err)
72+
}
73+
74+
func NewKubePreUpgradeJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*KubePreUpgradeJob, error) {
75+
return &KubePreUpgradeJob{
76+
config: config,
77+
js: js,
78+
frequency: frequency,
79+
}, nil
80+
}
81+
func (v *KubePreUpgradeJob) CronSpec() string {
82+
return v.frequency
83+
}
84+
85+
func (j *KubePreUpgradeJob) Run() {
86+
// Call the Kubepreupgrade function with the provided config and js
87+
err := GetAllResources(j.config, j.js)
88+
LogErr(err)
89+
}
90+
91+
func NewKubescoreJob(clientset *kubernetes.Clientset, js nats.JetStreamContext, frequency string) (*KubescoreJob, error) {
92+
return &KubescoreJob{
93+
clientset: clientset,
94+
js: js,
95+
frequency: frequency,
96+
}, nil
97+
}
98+
func (v *KubescoreJob) CronSpec() string {
99+
return v.frequency
100+
}
101+
102+
func (j *KubescoreJob) Run() {
103+
// Call the Kubescore function with the provided config and js
104+
err := RunKubeScore(j.clientset, j.js)
105+
LogErr(err)
106+
}
107+
func NewRakkessJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*RakkessJob, error) {
108+
return &RakkessJob{
109+
config: config,
110+
js: js,
111+
frequency: frequency,
112+
}, nil
113+
}
114+
func (v *RakkessJob) CronSpec() string {
115+
return v.frequency
116+
}
117+
118+
func (j *RakkessJob) Run() {
119+
// Call the Rakkes function with the provided config and js
120+
err := RakeesOutput(j.config, j.js)
121+
LogErr(err)
122+
}
123+
func NewTrivyJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*TrivyJob, error) {
124+
return &TrivyJob{
125+
config: config,
126+
js: js,
127+
frequency: frequency,
128+
}, nil
129+
}
130+
func (v *TrivyJob) CronSpec() string {
131+
return v.frequency
132+
}
133+
134+
func (j *TrivyJob) Run() {
135+
// Call the Trivy function with the provided config and js
136+
err := runTrivyScans(j.config, j.js)
137+
LogErr(err)
138+
}

0 commit comments

Comments
 (0)