Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,5 @@ WORKDIR /
RUN mkdir /global-hooks /modules
ENV MODULES_DIR /modules
ENV GLOBAL_HOOKS_DIR /global-hooks

ENTRYPOINT ["/sbin/tini", "--", "/addon-operator"]
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/dominikbraun/graph v0.23.0
github.com/ettle/strcase v0.2.0
github.com/flant/kube-client v1.3.1
github.com/flant/shell-operator v1.9.1
github.com/flant/shell-operator v1.10.0
github.com/go-chi/chi/v5 v5.2.1
github.com/go-openapi/loads v0.19.5
github.com/go-openapi/spec v0.19.8
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ github.com/flant/go-openapi-validate v0.19.12-flant.1 h1:GuB9XEfiLHq3M7fafRLq1AW
github.com/flant/go-openapi-validate v0.19.12-flant.1/go.mod h1:Rzou8hA/CBw8donlS6WNEUQupNvUZ0waH08tGe6kAQ4=
github.com/flant/kube-client v1.3.1 h1:1SdD799sujXNg2F6Z27le/+qkcKQaKf9Z492YGEhVhc=
github.com/flant/kube-client v1.3.1/go.mod h1:mql6hsZMgBLAhdj3Emb8TrP5MVdXduFQ2NLjzn6IF0Y=
github.com/flant/shell-operator v1.9.1 h1:P4f8kyxSK+TRUS7EYLcY9+IrCvvF0e8MBRPA7hPdCnQ=
github.com/flant/shell-operator v1.9.1/go.mod h1:9gZmjxCuyLDz3hFsmRXYTlw7+hpw199CMYzSodWKk50=
github.com/flant/shell-operator v1.10.0 h1:GaO5lWKgSyLjG5cAVCmhdr+apFzrZqc/1AV0+xDeF4M=
github.com/flant/shell-operator v1.10.0/go.mod h1:9gZmjxCuyLDz3hFsmRXYTlw7+hpw199CMYzSodWKk50=
github.com/flopp/go-findfont v0.1.0 h1:lPn0BymDUtJo+ZkV01VS3661HL6F4qFlkhcJN55u6mU=
github.com/flopp/go-findfont v0.1.0/go.mod h1:wKKxRDjD024Rh7VMwoU90i6ikQRCr+JTHB5n4Ejkqvw=
github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=
Expand Down
15 changes: 13 additions & 2 deletions pkg/addon-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/flant/addon-operator/pkg/module_manager/models/modules/events"
"github.com/flant/addon-operator/pkg/task"
paralleltask "github.com/flant/addon-operator/pkg/task/parallel"
queueutils "github.com/flant/addon-operator/pkg/task/queue"
taskservice "github.com/flant/addon-operator/pkg/task/service"
"github.com/flant/addon-operator/pkg/utils"
"github.com/flant/kube-client/client"
Expand Down Expand Up @@ -418,7 +419,12 @@ func (op *AddonOperator) BootstrapMainQueue(tqs *queue.TaskQueueSet) {
// Prepopulate main queue with 'onStartup' and 'enable kubernetes bindings' tasks for
// global hooks and add a task to discover modules state.
tqs.WithMainName("main")
tqs.NewNamedQueue("main", op.TaskService.Handle)
tqs.NewNamedQueue("main", queue.QueueOpts{
Handler: op.TaskService.Handle,
CompactionCallback: queueutils.CompactionCallback(op.ModuleManager, op.Logger),
CompactableTypes: queueutils.MergeTasks,
Logger: op.Logger.With("operator.component", "mainQueue"),
})

tasks := op.CreateBootstrapTasks(logLabels)
op.logTaskAdd(logEntry, "append", tasks...)
Expand Down Expand Up @@ -562,7 +568,12 @@ func (op *AddonOperator) CreateAndStartQueue(queueName string) {
}

func (op *AddonOperator) startQueue(queueName string, handler func(ctx context.Context, t sh_task.Task) queue.TaskResult) {
op.engine.TaskQueues.NewNamedQueue(queueName, handler)
op.engine.TaskQueues.NewNamedQueue(queueName, queue.QueueOpts{
Handler: handler,
CompactionCallback: queueutils.CompactionCallback(op.ModuleManager, op.Logger),
CompactableTypes: queueutils.MergeTasks,
Logger: op.Logger.With("operator.component", "queue", "queue", queueName),
})
op.engine.TaskQueues.GetByName(queueName).Start(op.ctx)
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/task/hook_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ var (
_ task_metadata.HookNameAccessor = HookMetadata{}
_ task_metadata.BindingContextAccessor = HookMetadata{}
_ task_metadata.MonitorIDAccessor = HookMetadata{}
_ task_metadata.BindingContextSetter = HookMetadata{}
_ task_metadata.MonitorIDSetter = HookMetadata{}
_ task.MetadataDescriptionGetter = HookMetadata{}
_ modules.TaskMetadata = HookMetadata{}
)
Expand Down Expand Up @@ -169,10 +171,20 @@ func (hm HookMetadata) GetBindingContext() []bindingcontext.BindingContext {
return hm.BindingContext
}

func (hm HookMetadata) SetBindingContext(context []bindingcontext.BindingContext) interface{} {
hm.BindingContext = context
return hm
}

func (hm HookMetadata) GetMonitorIDs() []string {
return hm.MonitorIDs
}

func (hm HookMetadata) SetMonitorIDs(monitorIDs []string) interface{} {
hm.MonitorIDs = monitorIDs
return hm
}

func (hm HookMetadata) IsSynchronization() bool {
return len(hm.BindingContext) > 0 && hm.BindingContext[0].IsSynchronization()
}
13 changes: 9 additions & 4 deletions pkg/task/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,17 @@ func NewService(ctx context.Context, cfg *ServiceConfig, logger *log.Logger) *Se

// CreateAndStartQueue creates a named queue with default handler and starts it.
// It returns false is queue is already created
func (s *Service) CreateAndStartQueue(queueName string) {
s.startQueue(queueName, s.Handle)
func (s *Service) CreateAndStartQueue(queueName string, callback Callback) {
s.startQueue(queueName, s.Handle, callback)
}

func (s *Service) startQueue(queueName string, handler func(ctx context.Context, t sh_task.Task) queue.TaskResult) {
s.engine.TaskQueues.NewNamedQueue(queueName, handler)
func (s *Service) startQueue(queueName string, handler func(ctx context.Context, t sh_task.Task) queue.TaskResult, callback Callback) {
s.engine.TaskQueues.NewNamedQueue(queueName, queue.QueueOpts{
Handler: handler,
CompactionCallback: callback,
CompactableTypes: MergeTasks,
Logger: s.logger.With("operator.component", "queue", "queue", queueName),
})
s.engine.TaskQueues.GetByName(queueName).Start(s.ctx)
}

Expand Down
47 changes: 47 additions & 0 deletions pkg/task/queue/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package queue

import (
"log/slog"

"github.com/deckhouse/deckhouse/pkg/log"

"github.com/flant/addon-operator/pkg/module_manager/models/modules"
"github.com/flant/addon-operator/pkg/task"
sh_task "github.com/flant/shell-operator/pkg/task"
)

var MergeTasks = []sh_task.TaskType{task.GlobalHookRun, task.ModuleHookRun}

type ModuleManager interface {
GlobalSynchronizationState() *modules.SynchronizationState
GetModule(moduleName string) *modules.BasicModule
}

type Callback func(compactedTasks []sh_task.Task, targetTask sh_task.Task)

func CompactionCallback(moduleManager ModuleManager, logger *log.Logger) Callback {
return func(compactedTasks []sh_task.Task, _ sh_task.Task) {
for _, compactedTask := range compactedTasks {
thm := task.HookMetadataAccessor(compactedTask)
if thm.IsSynchronization() {
logger.Debug("Compacted synchronization task, marking as Done",
slog.String("hook", thm.HookName),
slog.String("binding", thm.Binding),
slog.String("id", thm.KubernetesBindingId))

if thm.ModuleName == "" {
if moduleManager != nil && moduleManager.GlobalSynchronizationState() != nil {
moduleManager.GlobalSynchronizationState().DoneForBinding(thm.KubernetesBindingId)
}
} else {
if moduleManager != nil {
baseModule := moduleManager.GetModule(thm.ModuleName)
if baseModule != nil && baseModule.Synchronization() != nil {
baseModule.Synchronization().DoneForBinding(thm.KubernetesBindingId)
}
}
}
}
}
}
}
4 changes: 1 addition & 3 deletions pkg/task/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,7 @@ func (s *TaskHandlerService) ParallelHandle(ctx context.Context, t sh_task.Task)
if res.Status == queue.Fail {
if s.queueService.GetQueueLength(t.GetQueueName()) > 1 {
res.Status = queue.Success
if err := s.queueService.AddLastTaskToQueue(t.GetQueueName(), t); err != nil {
s.logger.Error("add last task to queue", slog.String("queue", t.GetQueueName()), slog.Any("error", err))
}
res.TailTasks = append(res.TailTasks, t)
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/task/tasks/converge-modules/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,7 @@ func (s *Task) Handle(ctx context.Context) queue.TaskResult {
s.logger.Debug("ConvergeModules: main queue has pending tasks, pass them")
res.Status = queue.Success
res.DelayBeforeNextTask = 0
if err := s.queueService.AddLastTaskToMain(s.shellTask); err != nil {
s.logger.Error("add last task to queue", slog.String("queue", "main"), slog.Any("error", err))
}
res.TailTasks = append(res.TailTasks, s.shellTask)
}

return res
Expand Down
4 changes: 2 additions & 2 deletions pkg/task/tasks/module-run/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (s *Task) CreateAndStartQueuesForModuleHooks(moduleName string) {
for _, hook := range scheduleHooks {
for _, hookBinding := range hook.GetHookConfig().Schedules {
if !s.queueService.IsQueueExists(hookBinding.Queue) {
s.queueService.CreateAndStartQueue(hookBinding.Queue)
s.queueService.CreateAndStartQueue(hookBinding.Queue, taskqueue.CompactionCallback(s.moduleManager, s.logger))

log.Debug("Queue started for module 'schedule'",
slog.String("queue", hookBinding.Queue),
Expand All @@ -437,7 +437,7 @@ func (s *Task) CreateAndStartQueuesForModuleHooks(moduleName string) {
for _, hook := range kubeEventsHooks {
for _, hookBinding := range hook.GetHookConfig().OnKubernetesEvents {
if !s.queueService.IsQueueExists(hookBinding.Queue) {
s.queueService.CreateAndStartQueue(hookBinding.Queue)
s.queueService.CreateAndStartQueue(hookBinding.Queue, taskqueue.CompactionCallback(s.moduleManager, s.logger))

log.Debug("Queue started for module 'kubernetes'",
slog.String("queue", hookBinding.Queue),
Expand Down
4 changes: 2 additions & 2 deletions pkg/task/tasks/parallel-module-run/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (s *Task) CreateAndStartQueuesForModuleHooks(moduleName string) {
for _, hook := range scheduleHooks {
for _, hookBinding := range hook.GetHookConfig().Schedules {
if !s.queueService.IsQueueExists(hookBinding.Queue) {
s.queueService.CreateAndStartQueue(hookBinding.Queue)
s.queueService.CreateAndStartQueue(hookBinding.Queue, taskqueue.CompactionCallback(s.moduleManager, s.logger))

log.Debug("Queue started for module 'schedule'",
slog.String("queue", hookBinding.Queue),
Expand All @@ -221,7 +221,7 @@ func (s *Task) CreateAndStartQueuesForModuleHooks(moduleName string) {
for _, hook := range kubeEventsHooks {
for _, hookBinding := range hook.GetHookConfig().OnKubernetesEvents {
if !s.queueService.IsQueueExists(hookBinding.Queue) {
s.queueService.CreateAndStartQueue(hookBinding.Queue)
s.queueService.CreateAndStartQueue(hookBinding.Queue, taskqueue.CompactionCallback(s.moduleManager, s.logger))

log.Debug("Queue started for module 'kubernetes'",
slog.String("queue", hookBinding.Queue),
Expand Down
Loading