Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var (
CRDsFilters = "doc-,_"

// NumberOfParallelQueues defines the number of precreated parallel queues for parallel execution
NumberOfParallelQueues = 15
NumberOfParallelQueues = 20
ParallelQueuePrefix = "parallel_queue"
ParallelQueueNamePattern = ParallelQueuePrefix + "_%d"
)
Expand Down
13 changes: 13 additions & 0 deletions pkg/module_manager/module_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,19 @@ func (mm *ModuleManager) GetModule(name string) *modules.BasicModule {
return mm.modules.Get(name)
}

func (mm *ModuleManager) GetCritical(moduleName string) bool {
module := mm.GetModule(moduleName)
if module == nil {
return false
}

return module.GetCritical()
}

func (mm *ModuleManager) GetFunctionalDependencies() map[string][]string {
return mm.moduleScheduler.GetFunctionalDependencies()
}

func (mm *ModuleManager) GetModuleNames() []string {
return mm.modules.NamesInOrder()
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/module_manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,42 @@ func (s *Scheduler) GetUpdatedByExtender(moduleName string) (string, error) {
return vertex.GetUpdatedBy(), err
}

func (s *Scheduler) GetFunctionalDependencies() map[string][]string {
result := make(map[string][]string, len(s.modules))

for _, module := range s.modules {
for _, ext := range s.extenders {
if tope, ok := ext.ext.(extenders.TopologicalExtender); ok {
if parents := tope.GetTopologicalHints(module.GetName()); len(parents) > 0 {
var deps []string
for _, parent := range parents {
if !s.IsModuleCritical(parent) {
deps = append(deps, parent)
}
}

result[module.GetName()] = deps
}
break
}
}
}

return result
}

func (s *Scheduler) IsModuleCritical(moduleName string) bool {
s.l.Lock()
defer s.l.Unlock()

vertex, err := s.dag.Vertex(moduleName)
if err != nil {
return false
}

return vertex.GetModule().GetCritical()
}

func (s *Scheduler) IsModuleEnabled(moduleName string) bool {
s.l.Lock()
defer s.l.Unlock()
Expand Down
212 changes: 212 additions & 0 deletions pkg/task/functional/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package functional

import (
"context"
"fmt"
"log/slog"
"sync"

"github.com/deckhouse/deckhouse/pkg/log"

"github.com/flant/addon-operator/pkg/app"
"github.com/flant/addon-operator/pkg/task"
sh_task "github.com/flant/shell-operator/pkg/task"
)

const (
// size for done and process channels
channelsBuffer = 32

// Root triggers modules without dependencies
Root = ""
)

// Scheduler is used to process functional modules,
// it waits until modules` dependencies are processed and then
// runs ModuleRun tasks for them in parallel queues
type Scheduler struct {
queueService queueService
logger *log.Logger

mtx sync.Mutex
requests map[string]*Request
done map[string]struct{}
scheduled map[string]struct{}

doneCh chan string
processCh chan *Request
}

type queueService interface {
AddLastTaskToQueue(queueName string, task sh_task.Task) error
}

// Request describes a module and run task options
type Request struct {
Name string
Description string
Dependencies []string
IsReloadAll bool
DoStartup bool
Labels map[string]string
}

// NewScheduler creates a scheduler instance and starts it
func NewScheduler(ctx context.Context, qService queueService, logger *log.Logger) *Scheduler {
s := &Scheduler{
queueService: qService,
logger: logger,
scheduled: make(map[string]struct{}),
requests: make(map[string]*Request),
done: make(map[string]struct{}),
doneCh: make(chan string, channelsBuffer),
processCh: make(chan *Request, channelsBuffer),
}

go func() {
s.runScheduleLoop(ctx)
}()

go func() {
s.runProcessLoop(ctx)
}()

return s
}

// runScheduleLoop launches the scheduling loop for a batch.
func (s *Scheduler) runScheduleLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case name := <-s.doneCh:
s.reschedule(name)
}
}
}

// runProcessLoop waits for requests to be processed
func (s *Scheduler) runProcessLoop(ctx context.Context) {
var idx int
for {
select {
case <-ctx.Done():
return
case req := <-s.processCh:
s.handleRequest(idx, req)
idx++
}
}
}

// Add adds requests to process
func (s *Scheduler) Add(reqs ...*Request) {
s.mtx.Lock()
defer s.mtx.Unlock()

for _, req := range reqs {
s.logger.Debug("add request", slog.Any("request", req))
// update module
s.requests[req.Name] = req
// undone module
delete(s.done, req.Name)
// unschedule module
delete(s.scheduled, req.Name)
}
}

// Remove removes module from done
// TODO(ipaqsa): stop module run task
func (s *Scheduler) Remove(name string) {
s.mtx.Lock()
defer s.mtx.Unlock()

// undone module
delete(s.done, name)
// unschedule module
delete(s.scheduled, name)
// remove module
delete(s.requests, name)
}

// reschedule marks module done and schedule new modules to be processed
func (s *Scheduler) reschedule(done string) {
s.mtx.Lock()
defer s.mtx.Unlock()

if done != Root {
// skip unscheduled
if _, ok := s.scheduled[done]; !ok {
return
}

// mark done
s.done[done] = struct{}{}

// delete from scheduled
delete(s.scheduled, done)
}

for _, req := range s.requests {
// skip processed
if _, ok := s.done[req.Name]; ok {
continue
}

// skip scheduled
if _, ok := s.scheduled[req.Name]; ok {
continue
}

// check if all dependencies done
ready := true
for _, dep := range req.Dependencies {
if _, ok := s.done[dep]; !ok {
ready = false
break
}
}

// schedule module if ready
if ready {
s.logger.Debug("trigger scheduling", slog.String("scheduled", req.Name), slog.Any("done", done))
s.scheduled[req.Name] = struct{}{}
s.processCh <- req
}
}
}

// handleRequest creates a ModuleRun task for request in a parallel queue
func (s *Scheduler) handleRequest(idx int, req *Request) {
queueName := fmt.Sprintf(app.ParallelQueueNamePattern, idx%(app.NumberOfParallelQueues-1))

moduleTask := sh_task.NewTask(task.ModuleRun).
WithLogLabels(req.Labels).
WithQueueName(queueName).
WithMetadata(task.HookMetadata{
EventDescription: req.Description,
ModuleName: req.Name,
DoModuleStartup: req.DoStartup,
IsReloadAll: req.IsReloadAll,
})

if err := s.queueService.AddLastTaskToQueue(queueName, moduleTask); err != nil {
s.logger.Error("add last task to queue", slog.String("queue", queueName), slog.Any("error", err))
}
}

// Done sends signal that module processing done
func (s *Scheduler) Done(name string) {
if s.doneCh != nil {
s.doneCh <- name
}
}

// Finished defines if processing done
func (s *Scheduler) Finished() bool {
s.mtx.Lock()
defer s.mtx.Unlock()

return len(s.done) == len(s.requests)
}
Loading
Loading