diff --git a/pkg/app/app.go b/pkg/app/app.go index 7fb49255e..11c85fdfb 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -53,7 +53,7 @@ var ( CRDsFilters = "doc-,_" // NumberOfParallelQueues defines the number of precreated parallel queues for parallel execution - NumberOfParallelQueues = 15 + NumberOfParallelQueues = 20 ParallelQueuePrefix = "parallel_queue" ParallelQueueNamePattern = ParallelQueuePrefix + "_%d" ) diff --git a/pkg/module_manager/module_manager.go b/pkg/module_manager/module_manager.go index 70e787343..56fd49180 100644 --- a/pkg/module_manager/module_manager.go +++ b/pkg/module_manager/module_manager.go @@ -604,6 +604,19 @@ func (mm *ModuleManager) GetModule(name string) *modules.BasicModule { return mm.modules.Get(name) } +func (mm *ModuleManager) GetCritical(moduleName string) bool { + module := mm.GetModule(moduleName) + if module == nil { + return false + } + + return module.GetCritical() +} + +func (mm *ModuleManager) GetFunctionalDependencies() map[string][]string { + return mm.moduleScheduler.GetFunctionalDependencies() +} + func (mm *ModuleManager) GetModuleNames() []string { return mm.modules.NamesInOrder() } diff --git a/pkg/module_manager/scheduler/scheduler.go b/pkg/module_manager/scheduler/scheduler.go index da2cd440e..e839f2088 100644 --- a/pkg/module_manager/scheduler/scheduler.go +++ b/pkg/module_manager/scheduler/scheduler.go @@ -550,6 +550,42 @@ func (s *Scheduler) GetUpdatedByExtender(moduleName string) (string, error) { return vertex.GetUpdatedBy(), err } +func (s *Scheduler) GetFunctionalDependencies() map[string][]string { + result := make(map[string][]string, len(s.modules)) + + for _, module := range s.modules { + for _, ext := range s.extenders { + if tope, ok := ext.ext.(extenders.TopologicalExtender); ok { + if parents := tope.GetTopologicalHints(module.GetName()); len(parents) > 0 { + var deps []string + for _, parent := range parents { + if !s.IsModuleCritical(parent) { + deps = append(deps, parent) + } + } + + result[module.GetName()] = deps + } + break + } + } + } + + return result +} + +func (s *Scheduler) IsModuleCritical(moduleName string) bool { + s.l.Lock() + defer s.l.Unlock() + + vertex, err := s.dag.Vertex(moduleName) + if err != nil { + return false + } + + return vertex.GetModule().GetCritical() +} + func (s *Scheduler) IsModuleEnabled(moduleName string) bool { s.l.Lock() defer s.l.Unlock() diff --git a/pkg/task/functional/scheduler.go b/pkg/task/functional/scheduler.go new file mode 100644 index 000000000..9609068ed --- /dev/null +++ b/pkg/task/functional/scheduler.go @@ -0,0 +1,212 @@ +package functional + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "github.com/deckhouse/deckhouse/pkg/log" + + "github.com/flant/addon-operator/pkg/app" + "github.com/flant/addon-operator/pkg/task" + sh_task "github.com/flant/shell-operator/pkg/task" +) + +const ( + // size for done and process channels + channelsBuffer = 32 + + // Root triggers modules without dependencies + Root = "" +) + +// Scheduler is used to process functional modules, +// it waits until modules` dependencies are processed and then +// runs ModuleRun tasks for them in parallel queues +type Scheduler struct { + queueService queueService + logger *log.Logger + + mtx sync.Mutex + requests map[string]*Request + done map[string]struct{} + scheduled map[string]struct{} + + doneCh chan string + processCh chan *Request +} + +type queueService interface { + AddLastTaskToQueue(queueName string, task sh_task.Task) error +} + +// Request describes a module and run task options +type Request struct { + Name string + Description string + Dependencies []string + IsReloadAll bool + DoStartup bool + Labels map[string]string +} + +// NewScheduler creates a scheduler instance and starts it +func NewScheduler(ctx context.Context, qService queueService, logger *log.Logger) *Scheduler { + s := &Scheduler{ + queueService: qService, + logger: logger, + scheduled: make(map[string]struct{}), + requests: make(map[string]*Request), + done: make(map[string]struct{}), + doneCh: make(chan string, channelsBuffer), + processCh: make(chan *Request, channelsBuffer), + } + + go func() { + s.runScheduleLoop(ctx) + }() + + go func() { + s.runProcessLoop(ctx) + }() + + return s +} + +// runScheduleLoop launches the scheduling loop for a batch. +func (s *Scheduler) runScheduleLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case name := <-s.doneCh: + s.reschedule(name) + } + } +} + +// runProcessLoop waits for requests to be processed +func (s *Scheduler) runProcessLoop(ctx context.Context) { + var idx int + for { + select { + case <-ctx.Done(): + return + case req := <-s.processCh: + s.handleRequest(idx, req) + idx++ + } + } +} + +// Add adds requests to process +func (s *Scheduler) Add(reqs ...*Request) { + s.mtx.Lock() + defer s.mtx.Unlock() + + for _, req := range reqs { + s.logger.Debug("add request", slog.Any("request", req)) + // update module + s.requests[req.Name] = req + // undone module + delete(s.done, req.Name) + // unschedule module + delete(s.scheduled, req.Name) + } +} + +// Remove removes module from done +// TODO(ipaqsa): stop module run task +func (s *Scheduler) Remove(name string) { + s.mtx.Lock() + defer s.mtx.Unlock() + + // undone module + delete(s.done, name) + // unschedule module + delete(s.scheduled, name) + // remove module + delete(s.requests, name) +} + +// reschedule marks module done and schedule new modules to be processed +func (s *Scheduler) reschedule(done string) { + s.mtx.Lock() + defer s.mtx.Unlock() + + if done != Root { + // skip unscheduled + if _, ok := s.scheduled[done]; !ok { + return + } + + // mark done + s.done[done] = struct{}{} + + // delete from scheduled + delete(s.scheduled, done) + } + + for _, req := range s.requests { + // skip processed + if _, ok := s.done[req.Name]; ok { + continue + } + + // skip scheduled + if _, ok := s.scheduled[req.Name]; ok { + continue + } + + // check if all dependencies done + ready := true + for _, dep := range req.Dependencies { + if _, ok := s.done[dep]; !ok { + ready = false + break + } + } + + // schedule module if ready + if ready { + s.logger.Debug("trigger scheduling", slog.String("scheduled", req.Name), slog.Any("done", done)) + s.scheduled[req.Name] = struct{}{} + s.processCh <- req + } + } +} + +// handleRequest creates a ModuleRun task for request in a parallel queue +func (s *Scheduler) handleRequest(idx int, req *Request) { + queueName := fmt.Sprintf(app.ParallelQueueNamePattern, idx%(app.NumberOfParallelQueues-1)) + + moduleTask := sh_task.NewTask(task.ModuleRun). + WithLogLabels(req.Labels). + WithQueueName(queueName). + WithMetadata(task.HookMetadata{ + EventDescription: req.Description, + ModuleName: req.Name, + DoModuleStartup: req.DoStartup, + IsReloadAll: req.IsReloadAll, + }) + + if err := s.queueService.AddLastTaskToQueue(queueName, moduleTask); err != nil { + s.logger.Error("add last task to queue", slog.String("queue", queueName), slog.Any("error", err)) + } +} + +// Done sends signal that module processing done +func (s *Scheduler) Done(name string) { + if s.doneCh != nil { + s.doneCh <- name + } +} + +// Finished defines if processing done +func (s *Scheduler) Finished() bool { + s.mtx.Lock() + defer s.mtx.Unlock() + + return len(s.done) == len(s.requests) +} diff --git a/pkg/task/functional/scheduler_test.go b/pkg/task/functional/scheduler_test.go new file mode 100644 index 000000000..e8dbb904f --- /dev/null +++ b/pkg/task/functional/scheduler_test.go @@ -0,0 +1,226 @@ +package functional + +import ( + "context" + "log/slog" + "sync" + "testing" + "time" + + "github.com/deckhouse/deckhouse/pkg/log" + "github.com/stretchr/testify/require" + + sh_task "github.com/flant/shell-operator/pkg/task" +) + +// mockQueueService records AddLastTaskToQueue calls. +type mockQueueService struct { + mu sync.Mutex + calls []string +} + +// TestScheduler_MissingDependencyLaterAdded ensures that a request blocked by a +// missing dependency becomes schedulable once the dependency is later added and completed. +func TestScheduler_MissingDependencyLaterAdded(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mqs := &mockQueueService{} + s := NewScheduler(ctx, mqs, log.NewNop()) + + // D waits for X which is unknown at the moment. + d := &Request{Name: "D", Dependencies: []string{"X"}} + s.Add(d) + s.Done(Root) + + require.Eventually(t, func() bool { return mqs.len() == 0 }, 200*time.Millisecond, 20*time.Millisecond) + + // Now introduce X and kick scheduler again. + x := &Request{Name: "X"} + s.Add(x) + s.Done(Root) + + // X should be scheduled first. + require.Eventually(t, func() bool { return mqs.len() == 1 }, time.Second, 10*time.Millisecond) + + // Complete X; D should follow. + s.Done("X") + require.Eventually(t, func() bool { return mqs.len() == 2 }, time.Second, 10*time.Millisecond) + + s.Done("D") + require.Eventually(t, s.Finished, time.Second, 10*time.Millisecond) +} + +// TestScheduler_DuplicateAddIgnored confirms that adding the same request twice +// results in a single queue entry. +func TestScheduler_DuplicateAddIgnored(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mqs := &mockQueueService{} + s := NewScheduler(ctx, mqs, log.NewNop()) + + r := &Request{Name: "uniq"} + s.Add(r) + s.Add(r) // duplicate + + s.Done(Root) + + require.Eventually(t, func() bool { return mqs.len() == 1 }, time.Second, 10*time.Millisecond) + s.Done("uniq") + require.Eventually(t, s.Finished, time.Second, 10*time.Millisecond) +} + +func (m *mockQueueService) AddLastTaskToQueue(_ string, task sh_task.Task) error { + m.mu.Lock() + defer m.mu.Unlock() + + m.calls = append(m.calls, task.GetDescription()) + return nil +} + +func (m *mockQueueService) len() int { + m.mu.Lock() + defer m.mu.Unlock() + + return len(m.calls) +} + +func TestScheduler_LinearDependencies(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mqs := &mockQueueService{} + s := NewScheduler(ctx, mqs, log.NewLogger(log.WithLevel(slog.LevelDebug))) + + // A → B + a := &Request{Name: "A"} + b := &Request{Name: "B", Dependencies: []string{"A"}} + + s.Add(a, b) + + // trigger first scheduling. + s.Done(Root) + + // wait for A to be sent to queue. + require.Eventually(t, func() bool { return mqs.len() == 1 }, time.Second, 10*time.Millisecond) + + // mark A done – expect B scheduled. + s.Done("A") + require.Eventually(t, func() bool { return mqs.len() == 2 }, time.Second, 10*time.Millisecond) + + // mark B done - expect nothing scheduled + s.Done("B") + require.Eventually(t, func() bool { return mqs.len() == 2 }, time.Second, 10*time.Millisecond) + + time.Sleep(time.Second) + + require.True(t, s.Finished()) +} + +func TestScheduler_MultipleDependencies(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mqs := &mockQueueService{} + s := NewScheduler(ctx, mqs, log.NewLogger(log.WithLevel(slog.LevelDebug))) + + // A, B → C + a := &Request{Name: "A"} + b := &Request{Name: "B"} + c := &Request{Name: "C", Dependencies: []string{"A", "B"}} + + s.Add(a, b, c) + s.Done(Root) + + // A and B expected to be scheduled + require.Eventually(t, func() bool { return mqs.len() == 2 }, time.Second, 10*time.Millisecond) + + // mark A done - expect nothing scheduled + s.Done("A") + require.Never(t, func() bool { return mqs.len() == 3 }, 200*time.Millisecond, 20*time.Millisecond, "C must wait for B") + + // mark B done, both deps done so expect C scheduled + s.Done("B") + require.Eventually(t, func() bool { return mqs.len() == 3 }, time.Second, 10*time.Millisecond) + + // mark C done - expect nothing scheduled + s.Done("C") + require.Eventually(t, func() bool { return mqs.len() == 3 }, time.Second, 10*time.Millisecond) + + time.Sleep(time.Second) + + require.True(t, s.Finished()) +} + +func TestScheduler_MissingDependencyBlocks(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + defer cancel() + + mqs := &mockQueueService{} + s := NewScheduler(ctx, mqs, log.NewNop()) + + // D depends on never-done X + d := &Request{Name: "D", Dependencies: []string{"X"}} + s.Add(d) + s.Done(Root) + + // scheduler should not call queue. + require.Eventually(t, func() bool { return mqs.len() == 0 }, 300*time.Millisecond, 20*time.Millisecond) + + require.False(t, s.Finished()) +} + +func TestScheduler_RemoveClearsDone(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mqs := &mockQueueService{} + s := NewScheduler(ctx, mqs, log.NewLogger(log.WithLevel(slog.LevelDebug))) + + // build a chain: operator-prometheus → prometheus → monitoring-application + a := &Request{Name: "operator-prometheus"} + b := &Request{Name: "prometheus", Dependencies: []string{"operator-prometheus"}} + c := &Request{Name: "monitoring-application", Dependencies: []string{"prometheus"}} + + s.Add(a, b, c) + s.Done(Root) + + // A should be the only task queued. + require.Eventually(t, func() bool { return mqs.len() == 1 }, time.Second, 10*time.Millisecond) + + // mark operator-prometheus done → expect prometheus scheduled. + s.Done("operator-prometheus") + require.Eventually(t, func() bool { return mqs.len() == 2 }, time.Second, 10*time.Millisecond) + + // mark prometheus done → expect monitoring-application scheduled. + s.Done("prometheus") + require.Eventually(t, func() bool { return mqs.len() == 3 }, time.Second, 10*time.Millisecond) + + // mark monitoring-application done - expect nothing scheduled + s.Done("monitoring-application") + require.Eventually(t, func() bool { return mqs.len() == 3 }, time.Second, 10*time.Millisecond) + + // scheduler reports finished. + require.True(t, s.Finished()) + + s.Remove("operator-prometheus") + + // trigger prometheus done + s.Done("prometheus") + + // expect NO additional tasks to be scheduled because operator-prometheus removed. + require.Never(t, func() bool { return mqs.len() > 3 }, 300*time.Millisecond, 20*time.Millisecond, "No new tasks must be scheduled when the dependency chain is broken") +} diff --git a/pkg/task/hook_metadata.go b/pkg/task/hook_metadata.go index 121946209..f449b327b 100644 --- a/pkg/task/hook_metadata.go +++ b/pkg/task/hook_metadata.go @@ -27,6 +27,7 @@ type HookMetadata struct { BindingType types.BindingType BindingContext []bindingcontext.BindingContext AllowFailure bool // Task considered as 'ok' if hook failed. False by default. Can be true for some schedule hooks. + Critical bool DoModuleStartup bool // Execute onStartup and kubernetes@Synchronization hooks for module IsReloadAll bool // ModuleRun task is a part of 'Reload all modules' process. diff --git a/pkg/task/queue/queue.go b/pkg/task/queue/queue.go index 4d38453ee..ccf08cf50 100644 --- a/pkg/task/queue/queue.go +++ b/pkg/task/queue/queue.go @@ -68,6 +68,15 @@ func (s *Service) AddLastTaskToMain(t sh_task.Task) error { return nil } +func (s *Service) GetQueueLength(queueName string) int { + q := s.engine.TaskQueues.GetByName(queueName) + if q == nil { + return 0 + } + + return q.Length() +} + func (s *Service) AddLastTaskToQueue(queueName string, t sh_task.Task) error { q := s.engine.TaskQueues.GetByName(queueName) if q == nil { diff --git a/pkg/task/service/converge.go b/pkg/task/service/converge.go index 797ab7d61..7eef202e0 100644 --- a/pkg/task/service/converge.go +++ b/pkg/task/service/converge.go @@ -106,7 +106,7 @@ func (s *TaskHandlerService) UpdateFirstConvergeStatus(convergeTasks int) { case converge.FirstStarted: // Mark as done when all convergence tasks are completed if convergeTasks == 0 { - log.Info("First converge is finished. Operator is ready now.") + s.logger.Info("first converge is finished. Operator is ready now.") s.convergeState.SetFirstRunPhase(converge.FirstDone) } } diff --git a/pkg/task/service/service.go b/pkg/task/service/service.go index b7c806f75..858a7f05e 100644 --- a/pkg/task/service/service.go +++ b/pkg/task/service/service.go @@ -14,6 +14,7 @@ import ( "github.com/flant/addon-operator/pkg/module_manager" "github.com/flant/addon-operator/pkg/task" discovercrds "github.com/flant/addon-operator/pkg/task/discover-crds" + "github.com/flant/addon-operator/pkg/task/functional" paralleltask "github.com/flant/addon-operator/pkg/task/parallel" taskqueue "github.com/flant/addon-operator/pkg/task/queue" applykubeconfigvalues "github.com/flant/addon-operator/pkg/task/tasks/apply-kube-config-values" @@ -57,6 +58,8 @@ type TaskHandlerService struct { // a map of channels to communicate with parallel queues and its lock parallelTaskChannels *paralleltask.TaskChannels + functionalScheduler *functional.Scheduler + helm *helm.ClientFactory // helmResourcesManager monitors absent resources created for modules. @@ -106,6 +109,8 @@ func NewTaskHandlerService(ctx context.Context, config *TaskHandlerServiceConfig Handle: svc.Handle, }, logger.Named("task-queue-service")) + svc.functionalScheduler = functional.NewScheduler(ctx, svc.queueService, logger.Named("functional-scheduler")) + svc.initFactory() return svc @@ -175,8 +180,35 @@ func (s *TaskHandlerService) ParallelHandle(ctx context.Context, t sh_task.Task) s.logTaskEnd(t, res, logger) hm := task.HookMetadataAccessor(t) + + s.logger.Debug("parallel task done", + slog.String("task_type", string(t.GetType())), + slog.String("module", hm.ModuleName), + slog.Bool("critical", hm.Critical), + slog.String("result", string(res.Status))) + + if !hm.Critical { + if t.GetType() == task.ModuleRun { + if res.Status == queue.Success && len(res.AfterTasks) == 0 { + s.functionalScheduler.Done(hm.ModuleName) + } + + if res.Status == queue.Fail { + if s.queueService.GetQueueLength(t.GetQueueName()) > 1 { + res.Status = queue.Success + if err := s.queueService.AddLastTaskToQueue(t.GetQueueName(), t); err != nil { + s.logger.Error("add last task to queue", slog.String("queue", t.GetQueueName()), slog.Any("error", err)) + } + } + } + } + + return res + } + if hm.ParallelRunMetadata == nil || len(hm.ParallelRunMetadata.ChannelId) == 0 { s.logger.Warn("Parallel task had no communication channel set") + return res } if parallelChannel, ok := s.parallelTaskChannels.Get(hm.ParallelRunMetadata.ChannelId); ok { @@ -242,6 +274,10 @@ func (s *TaskHandlerService) GetQueueService() *taskqueue.Service { return s.queueService } +func (s *TaskHandlerService) GetFunctionalScheduler() *functional.Scheduler { + return s.functionalScheduler +} + func (s *TaskHandlerService) GetConvergeState() *converge.ConvergeState { return s.convergeState } diff --git a/pkg/task/tasks/converge-modules/task.go b/pkg/task/tasks/converge-modules/task.go index 643842691..c3f1136a2 100644 --- a/pkg/task/tasks/converge-modules/task.go +++ b/pkg/task/tasks/converge-modules/task.go @@ -16,6 +16,7 @@ import ( "github.com/flant/addon-operator/pkg/module_manager" "github.com/flant/addon-operator/pkg/module_manager/models/modules/events" "github.com/flant/addon-operator/pkg/task" + "github.com/flant/addon-operator/pkg/task/functional" "github.com/flant/addon-operator/pkg/task/helpers" taskqueue "github.com/flant/addon-operator/pkg/task/queue" "github.com/flant/addon-operator/pkg/utils" @@ -27,6 +28,10 @@ import ( const ( taskName = "converge-modules" + + // repeatInterval is the interval at which the convergence status is checked + // while waiting until the functional scheduler done + repeatInterval = 3 * time.Second ) // TaskDependencies defines the external dependencies required by the ConvergeModules task. @@ -35,6 +40,7 @@ type TaskDependencies interface { GetMetricStorage() metric.Storage GetConvergeState() *converge.ConvergeState GetQueueService() *taskqueue.Service + GetFunctionalScheduler() *functional.Scheduler } // RegisterTaskHandler creates a factory function that instantiates ConvergeModules tasks. @@ -45,6 +51,7 @@ func RegisterTaskHandler(deps TaskDependencies) func(t sh_task.Task, logger *log deps.GetModuleManager(), deps.GetMetricStorage(), deps.GetConvergeState(), + deps.GetFunctionalScheduler(), deps.GetQueueService(), logger.Named("converge-modules"), ) @@ -55,10 +62,11 @@ func RegisterTaskHandler(deps TaskDependencies) func(t sh_task.Task, logger *log type Task struct { shellTask sh_task.Task - moduleManager *module_manager.ModuleManager - metricStorage metric.Storage - convergeState *converge.ConvergeState - queueService *taskqueue.Service + moduleManager *module_manager.ModuleManager + metricStorage metric.Storage + convergeState *converge.ConvergeState + queueService *taskqueue.Service + functionalScheduler *functional.Scheduler logger *log.Logger } @@ -69,16 +77,18 @@ func NewTask( moduleManager *module_manager.ModuleManager, metricStorage metric.Storage, convergeState *converge.ConvergeState, + functionalScheduler *functional.Scheduler, queueService *taskqueue.Service, logger *log.Logger, ) *Task { return &Task{ - shellTask: shellTask, - moduleManager: moduleManager, - metricStorage: metricStorage, - convergeState: convergeState, - queueService: queueService, - logger: logger, + shellTask: shellTask, + moduleManager: moduleManager, + metricStorage: metricStorage, + convergeState: convergeState, + queueService: queueService, + functionalScheduler: functionalScheduler, + logger: logger, } } @@ -174,6 +184,27 @@ func (s *Task) Handle(ctx context.Context) queue.TaskResult { } if s.convergeState.GetPhase() == converge.WaitDeleteAndRunModules { + // trigger functional converge + s.functionalScheduler.Done(functional.Root) + + // wait until functional converge done + if !s.functionalScheduler.Finished() { + s.logger.Warn("ConvergeModules: functional scheduler not finished") + res.Status = queue.Keep + res.DelayBeforeNextTask = repeatInterval + + if s.queueService.GetQueueLength(queue.MainQueueName) > 1 { + s.logger.Debug("ConvergeModules: main queue has pending tasks, pass them") + res.Status = queue.Success + res.DelayBeforeNextTask = 0 + if err := s.queueService.AddLastTaskToMain(s.shellTask); err != nil { + s.logger.Error("add last task to queue", slog.String("queue", "main"), slog.Any("error", err)) + } + } + + return res + } + s.logger.Info("ConvergeModules: ModuleRun tasks done, execute AfterAll global hooks") // Put AfterAll tasks before current task. tasks, handleErr := s.CreateAfterAllTasks(s.shellTask.GetLogLabels(), hm.EventDescription) @@ -311,7 +342,8 @@ func (s *Task) CreateAfterAllTasks(logLabels map[string]string, eventDescription // - ModuleDelete tasks for modules that need to be disabled // - ModuleRun tasks for individual modules that need to be enabled or rerun // - ParallelModuleRun tasks for groups of modules that can be processed in parallel -func (s *Task) CreateConvergeModulesTasks(state *module_manager.ModulesState, logLabels map[string]string, eventDescription string) []sh_task.Task { +func (s *Task) CreateConvergeModulesTasks(state *module_manager.ModulesState, logLabels map[string]string, eventDescription string, +) []sh_task.Task { modulesTasks := make([]sh_task.Task, 0, len(state.ModulesToDisable)+len(state.AllEnabledModules)) resultingTasks := make([]sh_task.Task, 0, len(state.ModulesToDisable)+len(state.AllEnabledModules)) queuedAt := time.Now() @@ -337,6 +369,9 @@ func (s *Task) CreateConvergeModulesTasks(state *module_manager.ModulesState, lo ModuleName: moduleName, }) modulesTasks = append(modulesTasks, newTask.WithQueuedAt(queuedAt)) + + // undone, unschedule and remove the disabled module from the functional scheduler + s.functionalScheduler.Remove(moduleName) } // Add ModuleRun tasks to install or reload enabled modules. @@ -344,7 +379,18 @@ func (s *Task) CreateConvergeModulesTasks(state *module_manager.ModulesState, lo log.Debug("The following modules are going to be enabled/rerun", slog.String("modules", fmt.Sprintf("%v", state.AllEnabledModulesByOrder))) + var functionalModules []string for _, modules := range state.AllEnabledModulesByOrder { + if len(modules) == 0 { + continue + } + + // skip functional modules + if !s.moduleManager.GetCritical(modules[0]) { + functionalModules = append(functionalModules, modules...) + continue + } + newLogLabels := utils.MergeLabels(logLabels) delete(newLogLabels, "task.id") switch { @@ -385,6 +431,7 @@ func (s *Task) CreateConvergeModulesTasks(state *module_manager.ModulesState, lo EventDescription: eventDescription, ModuleName: fmt.Sprintf("Parallel run for %s", strings.Join(modules, ", ")), IsReloadAll: true, + Critical: true, ParallelRunMetadata: ¶llelRunMetadata, }) modulesTasks = append(modulesTasks, newTask.WithQueuedAt(queuedAt)) @@ -420,6 +467,7 @@ func (s *Task) CreateConvergeModulesTasks(state *module_manager.ModulesState, lo ModuleName: modules[0], DoModuleStartup: doModuleStartup, IsReloadAll: true, + Critical: true, }) modulesTasks = append(modulesTasks, newTask.WithQueuedAt(queuedAt)) @@ -428,6 +476,54 @@ func (s *Task) CreateConvergeModulesTasks(state *module_manager.ModulesState, lo slog.String("state", fmt.Sprintf("%v", state))) } } + + deps := s.moduleManager.GetFunctionalDependencies() + + // handle functional modules + schedulerRequests := make([]*functional.Request, len(functionalModules)) + for idx, module := range functionalModules { + // notify about module enabling + ev := events.ModuleEvent{ + ModuleName: module, + EventType: events.ModuleEnabled, + } + s.moduleManager.SendModuleEvent(ev) + + newLogLabels := utils.MergeLabels(logLabels) + delete(newLogLabels, "task.id") + newLogLabels["module"] = module + + doModuleStartup := false + // add EnsureCRDs task if module is about to be enabled + if _, has := newlyEnabled[module]; has { + if s.moduleManager.ModuleHasCRDs(module) { + resultingTasks = append(resultingTasks, sh_task.NewTask(task.ModuleEnsureCRDs). + WithLogLabels(newLogLabels). + WithQueueName("main"). + WithMetadata(task.HookMetadata{ + EventDescription: "EnsureCRDs", + ModuleName: module, + IsReloadAll: true, + }).WithQueuedAt(queuedAt)) + } + doModuleStartup = true + } + + schedulerRequests[idx] = &functional.Request{ + Name: module, + Dependencies: deps[module], + Description: eventDescription, + IsReloadAll: true, + DoStartup: doModuleStartup, + Labels: newLogLabels, + } + } + + // schedule functional modules in parallel queues + if len(schedulerRequests) > 0 { + s.functionalScheduler.Add(schedulerRequests...) + } + // as resultingTasks contains new ensureCRDsTasks we invalidate // ConvregeState.CRDsEnsured if there are new ensureCRDsTasks to execute if s.convergeState.CRDsEnsured && len(resultingTasks) > 0 { diff --git a/pkg/task/tasks/parallel-module-run/task.go b/pkg/task/tasks/parallel-module-run/task.go index f1a7551ec..91486d65e 100644 --- a/pkg/task/tasks/parallel-module-run/task.go +++ b/pkg/task/tasks/parallel-module-run/task.go @@ -113,6 +113,7 @@ func (s *Task) Handle(ctx context.Context) queue.TaskResult { ModuleName: moduleName, DoModuleStartup: moduleMetadata.DoModuleStartup, IsReloadAll: hm.IsReloadAll, + Critical: hm.Critical, ParallelRunMetadata: &task.ParallelRunMetadata{ ChannelId: s.shellTask.GetId(), },