Skip to content

Commit 8fc010b

Browse files
committed
[feature] separate function and critical modules
Signed-off-by: Stepan Paksashvili <stepan.paksashvili@flant.com>
1 parent 426ec8b commit 8fc010b

File tree

4 files changed

+49
-64
lines changed

4 files changed

+49
-64
lines changed

pkg/module_manager/module_manager.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,15 @@ func (mm *ModuleManager) GetModule(name string) *modules.BasicModule {
604604
return mm.modules.Get(name)
605605
}
606606

607+
func (mm *ModuleManager) GetCritical(moduleName string) bool {
608+
module := mm.GetModule(moduleName)
609+
if module == nil {
610+
return false
611+
}
612+
613+
return module.GetCritical()
614+
}
615+
607616
func (mm *ModuleManager) GetFunctionalDependencies() map[string][]string {
608617
return mm.moduleScheduler.GetFunctionalDependencies()
609618
}

pkg/task/functional/scheduler.go

Lines changed: 36 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,8 @@ type Scheduler struct {
2424
queueService *queue.Service
2525
logger *log.Logger
2626

27-
// batch control
28-
cancel context.CancelFunc
29-
30-
// for safe shutdown on replacement
31-
wg *sync.WaitGroup
32-
3327
mtx sync.Mutex
34-
requests []*Request
28+
requests map[string]*Request
3529
done map[string]struct{}
3630
scheduled map[string]struct{}
3731

@@ -48,48 +42,27 @@ type Request struct {
4842
Labels map[string]string
4943
}
5044

51-
func NewScheduler(qService *queue.Service, logger *log.Logger) *Scheduler {
52-
return &Scheduler{
45+
// NewScheduler creates a scheduler instance and starts it
46+
func NewScheduler(ctx context.Context, qService *queue.Service, logger *log.Logger) *Scheduler {
47+
s := &Scheduler{
5348
queueService: qService,
5449
logger: logger,
55-
wg: new(sync.WaitGroup),
56-
}
57-
}
58-
59-
// Start schedules a new batch, canceling the previous one if active.
60-
func (s *Scheduler) Start(ctx context.Context, modules []*Request) {
61-
// cancel the previous batch.
62-
if s.cancel != nil {
63-
s.cancel()
64-
// wait for batch goroutines to finish
65-
s.wg.Wait()
50+
scheduled: make(map[string]struct{}),
51+
requests: make(map[string]*Request),
52+
done: make(map[string]struct{}),
53+
doneCh: make(chan string, channelsBuffer),
54+
processCh: make(chan *Request, channelsBuffer),
6655
}
6756

68-
s.logger.Debug("following functional modules will be scheduled", slog.Any("modules", modules))
69-
70-
// initialize new batch state.
71-
batchCtx, cancel := context.WithCancel(ctx)
72-
s.cancel = cancel
73-
74-
s.mtx.Lock()
75-
s.done = make(map[string]struct{}, len(modules))
76-
s.scheduled = make(map[string]struct{}, len(modules))
77-
s.requests = modules
78-
s.mtx.Unlock()
79-
80-
s.doneCh = make(chan string, channelsBuffer)
81-
s.processCh = make(chan *Request, channelsBuffer)
82-
83-
s.wg.Add(2)
8457
go func() {
85-
defer s.wg.Done()
86-
s.runScheduleLoop(batchCtx)
58+
s.runScheduleLoop(ctx)
8759
}()
8860

8961
go func() {
90-
defer s.wg.Done()
91-
s.runProcessLoop(batchCtx)
62+
s.runProcessLoop(ctx)
9263
}()
64+
65+
return s
9366
}
9467

9568
// runScheduleLoop launches the scheduling loop for a batch.
@@ -121,6 +94,28 @@ func (s *Scheduler) runProcessLoop(ctx context.Context) {
12194
}
12295
}
12396

97+
// Add adds request to process
98+
func (s *Scheduler) Add(reqs ...*Request) {
99+
s.mtx.Lock()
100+
defer s.mtx.Unlock()
101+
102+
for _, req := range reqs {
103+
s.logger.Debug("add request", slog.Any("request", req))
104+
s.requests[req.Name] = req
105+
delete(s.done, req.Name)
106+
delete(s.scheduled, req.Name)
107+
}
108+
}
109+
110+
// Remove removes module from done
111+
// TODO(ipaqsa): add stop module run task
112+
func (s *Scheduler) Remove(name string) {
113+
s.mtx.Lock()
114+
defer s.mtx.Unlock()
115+
116+
delete(s.done, name)
117+
}
118+
124119
// reschedule marks module done and schedule new modules to be processed
125120
func (s *Scheduler) reschedule(name string) {
126121
if name != Root {
@@ -155,7 +150,7 @@ func (s *Scheduler) reschedule(name string) {
155150

156151
// schedule module if ready
157152
if ready {
158-
s.logger.Debug("trigger module scheduling", slog.String("module", req.Name), slog.Any("trigger", name))
153+
s.logger.Debug("trigger scheduling", slog.String("scheduled", req.Name), slog.Any("done", name))
159154
s.scheduled[req.Name] = struct{}{}
160155
s.processCh <- req
161156
}
@@ -193,17 +188,3 @@ func (s *Scheduler) Finished() bool {
193188

194189
return len(s.done) == len(s.requests)
195190
}
196-
197-
// Stop is the graceful shutdown
198-
func (s *Scheduler) Stop() {
199-
s.mtx.Lock()
200-
defer s.mtx.Unlock()
201-
202-
if s.cancel != nil {
203-
s.cancel()
204-
s.wg.Wait()
205-
206-
close(s.doneCh)
207-
close(s.processCh)
208-
}
209-
}

pkg/task/service/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func NewTaskHandlerService(ctx context.Context, config *TaskHandlerServiceConfig
109109
Handle: svc.Handle,
110110
}, logger.Named("task-queue-service"))
111111

112-
svc.functionalScheduler = functional.NewScheduler(svc.queueService, logger.Named("functional-scheduler"))
112+
svc.functionalScheduler = functional.NewScheduler(ctx, svc.queueService, logger.Named("functional-scheduler"))
113113

114114
svc.initFactory()
115115

pkg/task/tasks/converge-modules/task.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -381,14 +381,9 @@ func (s *Task) CreateConvergeModulesTasks(ctx context.Context,
381381
continue
382382
}
383383

384-
basicModule := s.moduleManager.GetModule(modules[0])
385-
if basicModule == nil {
386-
continue
387-
}
388-
389-
if !basicModule.GetCritical() {
384+
// skip functional modules
385+
if !s.moduleManager.GetCritical(modules[0]) {
390386
functionalModules = append(functionalModules, modules...)
391-
continue
392387
}
393388

394389
newLogLabels := utils.MergeLabels(logLabels)
@@ -521,7 +516,7 @@ func (s *Task) CreateConvergeModulesTasks(ctx context.Context,
521516

522517
// schedule functional modules in parallel queues
523518
if len(schedulerRequests) > 0 {
524-
s.functionalScheduler.Start(ctx, schedulerRequests)
519+
s.functionalScheduler.Add(schedulerRequests...)
525520
}
526521

527522
// as resultingTasks contains new ensureCRDsTasks we invalidate

0 commit comments

Comments
 (0)