Skip to content

Commit 426ec8b

Browse files
committed
[feature] separate function and critical modules
Signed-off-by: Stepan Paksashvili <stepan.paksashvili@flant.com>
1 parent 9145cb2 commit 426ec8b

File tree

10 files changed

+404
-14
lines changed

10 files changed

+404
-14
lines changed

pkg/app/app.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ var (
5353
CRDsFilters = "doc-,_"
5454

5555
// NumberOfParallelQueues defines the number of precreated parallel queues for parallel execution
56-
NumberOfParallelQueues = 15
56+
NumberOfParallelQueues = 20
5757
ParallelQueuePrefix = "parallel_queue"
5858
ParallelQueueNamePattern = ParallelQueuePrefix + "_%d"
5959
)

pkg/module_manager/module_manager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,10 @@ func (mm *ModuleManager) GetModule(name string) *modules.BasicModule {
604604
return mm.modules.Get(name)
605605
}
606606

607+
func (mm *ModuleManager) GetFunctionalDependencies() map[string][]string {
608+
return mm.moduleScheduler.GetFunctionalDependencies()
609+
}
610+
607611
func (mm *ModuleManager) GetModuleNames() []string {
608612
return mm.modules.NamesInOrder()
609613
}

pkg/module_manager/scheduler/scheduler.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,42 @@ func (s *Scheduler) GetUpdatedByExtender(moduleName string) (string, error) {
550550
return vertex.GetUpdatedBy(), err
551551
}
552552

553+
func (s *Scheduler) GetFunctionalDependencies() map[string][]string {
554+
result := make(map[string][]string, len(s.modules))
555+
556+
for _, module := range s.modules {
557+
for _, ext := range s.extenders {
558+
if tope, ok := ext.ext.(extenders.TopologicalExtender); ok {
559+
if parents := tope.GetTopologicalHints(module.GetName()); len(parents) > 0 {
560+
var deps []string
561+
for _, parent := range parents {
562+
if !s.IsModuleCritical(parent) {
563+
deps = append(deps, parent)
564+
}
565+
}
566+
567+
result[module.GetName()] = deps
568+
}
569+
break
570+
}
571+
}
572+
}
573+
574+
return result
575+
}
576+
577+
func (s *Scheduler) IsModuleCritical(moduleName string) bool {
578+
s.l.Lock()
579+
defer s.l.Unlock()
580+
581+
vertex, err := s.dag.Vertex(moduleName)
582+
if err != nil {
583+
return false
584+
}
585+
586+
return vertex.GetModule().GetCritical()
587+
}
588+
553589
func (s *Scheduler) IsModuleEnabled(moduleName string) bool {
554590
s.l.Lock()
555591
defer s.l.Unlock()

pkg/task/functional/scheduler.go

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
package functional
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"sync"
8+
9+
"github.com/deckhouse/deckhouse/pkg/log"
10+
11+
"github.com/flant/addon-operator/pkg/app"
12+
"github.com/flant/addon-operator/pkg/task"
13+
"github.com/flant/addon-operator/pkg/task/queue"
14+
sh_task "github.com/flant/shell-operator/pkg/task"
15+
)
16+
17+
const (
18+
channelsBuffer = 24
19+
20+
Root = ""
21+
)
22+
23+
type Scheduler struct {
24+
queueService *queue.Service
25+
logger *log.Logger
26+
27+
// batch control
28+
cancel context.CancelFunc
29+
30+
// for safe shutdown on replacement
31+
wg *sync.WaitGroup
32+
33+
mtx sync.Mutex
34+
requests []*Request
35+
done map[string]struct{}
36+
scheduled map[string]struct{}
37+
38+
doneCh chan string
39+
processCh chan *Request
40+
}
41+
42+
type Request struct {
43+
Name string
44+
Description string
45+
Dependencies []string
46+
IsReloadAll bool
47+
DoStartup bool
48+
Labels map[string]string
49+
}
50+
51+
func NewScheduler(qService *queue.Service, logger *log.Logger) *Scheduler {
52+
return &Scheduler{
53+
queueService: qService,
54+
logger: logger,
55+
wg: new(sync.WaitGroup),
56+
}
57+
}
58+
59+
// Start schedules a new batch, canceling the previous one if active.
60+
func (s *Scheduler) Start(ctx context.Context, modules []*Request) {
61+
// cancel the previous batch.
62+
if s.cancel != nil {
63+
s.cancel()
64+
// wait for batch goroutines to finish
65+
s.wg.Wait()
66+
}
67+
68+
s.logger.Debug("following functional modules will be scheduled", slog.Any("modules", modules))
69+
70+
// initialize new batch state.
71+
batchCtx, cancel := context.WithCancel(ctx)
72+
s.cancel = cancel
73+
74+
s.mtx.Lock()
75+
s.done = make(map[string]struct{}, len(modules))
76+
s.scheduled = make(map[string]struct{}, len(modules))
77+
s.requests = modules
78+
s.mtx.Unlock()
79+
80+
s.doneCh = make(chan string, channelsBuffer)
81+
s.processCh = make(chan *Request, channelsBuffer)
82+
83+
s.wg.Add(2)
84+
go func() {
85+
defer s.wg.Done()
86+
s.runScheduleLoop(batchCtx)
87+
}()
88+
89+
go func() {
90+
defer s.wg.Done()
91+
s.runProcessLoop(batchCtx)
92+
}()
93+
}
94+
95+
// runScheduleLoop launches the scheduling loop for a batch.
96+
func (s *Scheduler) runScheduleLoop(ctx context.Context) {
97+
for {
98+
select {
99+
case <-ctx.Done():
100+
return
101+
case name, ok := <-s.doneCh:
102+
if !ok {
103+
return
104+
}
105+
s.reschedule(name)
106+
}
107+
}
108+
}
109+
110+
// runProcessLoop waits for requests to be processed
111+
func (s *Scheduler) runProcessLoop(ctx context.Context) {
112+
var idx int
113+
for {
114+
select {
115+
case <-ctx.Done():
116+
return
117+
case req := <-s.processCh:
118+
s.handleRequest(idx, req)
119+
idx++
120+
}
121+
}
122+
}
123+
124+
// reschedule marks module done and schedule new modules to be processed
125+
func (s *Scheduler) reschedule(name string) {
126+
if name != Root {
127+
// skip modules that not present in the batch
128+
if _, ok := s.scheduled[name]; !ok {
129+
return
130+
}
131+
132+
// mark module done
133+
s.done[name] = struct{}{}
134+
}
135+
136+
for _, req := range s.requests {
137+
// skip already processed
138+
if _, ok := s.done[req.Name]; ok {
139+
continue
140+
}
141+
142+
// skip already scheduled
143+
if _, ok := s.scheduled[req.Name]; ok {
144+
continue
145+
}
146+
147+
// check if all dependencies done
148+
ready := true
149+
for _, dep := range req.Dependencies {
150+
if _, ok := s.done[dep]; !ok {
151+
ready = false
152+
break
153+
}
154+
}
155+
156+
// schedule module if ready
157+
if ready {
158+
s.logger.Debug("trigger module scheduling", slog.String("module", req.Name), slog.Any("trigger", name))
159+
s.scheduled[req.Name] = struct{}{}
160+
s.processCh <- req
161+
}
162+
}
163+
}
164+
165+
// handleRequest creates a ModuleRun task for request in a parallel queue
166+
func (s *Scheduler) handleRequest(idx int, req *Request) {
167+
queueName := fmt.Sprintf(app.ParallelQueueNamePattern, idx%(app.NumberOfParallelQueues-1))
168+
169+
moduleTask := sh_task.NewTask(task.ModuleRun).
170+
WithLogLabels(req.Labels).
171+
WithQueueName(queueName).
172+
WithMetadata(task.HookMetadata{
173+
EventDescription: req.Description,
174+
ModuleName: req.Name,
175+
DoModuleStartup: req.DoStartup,
176+
IsReloadAll: req.IsReloadAll,
177+
})
178+
179+
_ = s.queueService.AddLastTaskToQueue(queueName, moduleTask)
180+
}
181+
182+
// Done sends signal that module processing done
183+
func (s *Scheduler) Done(name string) {
184+
if s.doneCh != nil {
185+
s.doneCh <- name
186+
}
187+
}
188+
189+
// Finished defines if processing done
190+
func (s *Scheduler) Finished() bool {
191+
s.mtx.Lock()
192+
defer s.mtx.Unlock()
193+
194+
return len(s.done) == len(s.requests)
195+
}
196+
197+
// Stop is the graceful shutdown
198+
func (s *Scheduler) Stop() {
199+
s.mtx.Lock()
200+
defer s.mtx.Unlock()
201+
202+
if s.cancel != nil {
203+
s.cancel()
204+
s.wg.Wait()
205+
206+
close(s.doneCh)
207+
close(s.processCh)
208+
}
209+
}

pkg/task/hook_metadata.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type HookMetadata struct {
2727
BindingType types.BindingType
2828
BindingContext []bindingcontext.BindingContext
2929
AllowFailure bool // Task considered as 'ok' if hook failed. False by default. Can be true for some schedule hooks.
30+
Critical bool
3031

3132
DoModuleStartup bool // Execute onStartup and kubernetes@Synchronization hooks for module
3233
IsReloadAll bool // ModuleRun task is a part of 'Reload all modules' process.

pkg/task/queue/queue.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,15 @@ func (s *Service) AddLastTaskToMain(t sh_task.Task) error {
6868
return nil
6969
}
7070

71+
func (s *Service) GetQueueLength(queueName string) int {
72+
q := s.engine.TaskQueues.GetByName(queueName)
73+
if q == nil {
74+
return 0
75+
}
76+
77+
return q.Length()
78+
}
79+
7180
func (s *Service) AddLastTaskToQueue(queueName string, t sh_task.Task) error {
7281
q := s.engine.TaskQueues.GetByName(queueName)
7382
if q == nil {

pkg/task/service/converge.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (s *TaskHandlerService) UpdateFirstConvergeStatus(convergeTasks int) {
106106
case converge.FirstStarted:
107107
// Mark as done when all convergence tasks are completed
108108
if convergeTasks == 0 {
109-
log.Info("First converge is finished. Operator is ready now.")
109+
s.logger.Debug("first converge is finished. Operator is ready now.")
110110
s.convergeState.SetFirstRunPhase(converge.FirstDone)
111111
}
112112
}

pkg/task/service/service.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/flant/addon-operator/pkg/module_manager"
1515
"github.com/flant/addon-operator/pkg/task"
1616
discovercrds "github.com/flant/addon-operator/pkg/task/discover-crds"
17+
"github.com/flant/addon-operator/pkg/task/functional"
1718
paralleltask "github.com/flant/addon-operator/pkg/task/parallel"
1819
taskqueue "github.com/flant/addon-operator/pkg/task/queue"
1920
applykubeconfigvalues "github.com/flant/addon-operator/pkg/task/tasks/apply-kube-config-values"
@@ -57,6 +58,8 @@ type TaskHandlerService struct {
5758
// a map of channels to communicate with parallel queues and its lock
5859
parallelTaskChannels *paralleltask.TaskChannels
5960

61+
functionalScheduler *functional.Scheduler
62+
6063
helm *helm.ClientFactory
6164

6265
// helmResourcesManager monitors absent resources created for modules.
@@ -106,6 +109,8 @@ func NewTaskHandlerService(ctx context.Context, config *TaskHandlerServiceConfig
106109
Handle: svc.Handle,
107110
}, logger.Named("task-queue-service"))
108111

112+
svc.functionalScheduler = functional.NewScheduler(svc.queueService, logger.Named("functional-scheduler"))
113+
109114
svc.initFactory()
110115

111116
return svc
@@ -175,8 +180,33 @@ func (s *TaskHandlerService) ParallelHandle(ctx context.Context, t sh_task.Task)
175180
s.logTaskEnd(t, res, logger)
176181

177182
hm := task.HookMetadataAccessor(t)
183+
184+
s.logger.Debug("parallel task done",
185+
slog.String("task_type", string(t.GetType())),
186+
slog.String("module", hm.ModuleName),
187+
slog.Bool("critical", hm.Critical),
188+
slog.String("result", string(res.Status)))
189+
190+
if !hm.Critical {
191+
if t.GetType() == task.ModuleRun {
192+
if res.Status == queue.Success && len(res.AfterTasks) == 0 {
193+
s.functionalScheduler.Done(hm.ModuleName)
194+
}
195+
196+
if res.Status == queue.Fail {
197+
if s.queueService.GetQueueLength(t.GetQueueName()) > 1 {
198+
res.Status = queue.Success
199+
_ = s.queueService.AddLastTaskToQueue(t.GetQueueName(), t)
200+
}
201+
}
202+
}
203+
204+
return res
205+
}
206+
178207
if hm.ParallelRunMetadata == nil || len(hm.ParallelRunMetadata.ChannelId) == 0 {
179208
s.logger.Warn("Parallel task had no communication channel set")
209+
return res
180210
}
181211

182212
if parallelChannel, ok := s.parallelTaskChannels.Get(hm.ParallelRunMetadata.ChannelId); ok {
@@ -242,6 +272,10 @@ func (s *TaskHandlerService) GetQueueService() *taskqueue.Service {
242272
return s.queueService
243273
}
244274

275+
func (s *TaskHandlerService) GetFunctionalScheduler() *functional.Scheduler {
276+
return s.functionalScheduler
277+
}
278+
245279
func (s *TaskHandlerService) GetConvergeState() *converge.ConvergeState {
246280
return s.convergeState
247281
}

0 commit comments

Comments
 (0)