Skip to content

Commit 59e8151

Browse files
authored
[shell-operator] feat/new operator event handling mechanism (#735)
Signed-off-by: Pavel Okhlopkov <pavel.okhlopkov@flant.com> Co-authored-by: Pavel Okhlopkov <pavel.okhlopkov@flant.com>
1 parent 7d2cb5b commit 59e8151

File tree

3 files changed

+72
-21
lines changed

3 files changed

+72
-21
lines changed

pkg/hook/controller/hook_controller.go

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager"
1010
kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types"
1111
schedulemanager "github.com/flant/shell-operator/pkg/schedule_manager"
12+
"github.com/flant/shell-operator/pkg/task"
1213
"github.com/flant/shell-operator/pkg/webhook/admission"
1314
"github.com/flant/shell-operator/pkg/webhook/conversion"
1415
)
@@ -161,13 +162,24 @@ func (hc *HookController) HandleEnableKubernetesBindings(createTasksFn func(Bind
161162
return nil
162163
}
163164

164-
func (hc *HookController) HandleKubeEvent(event kemtypes.KubeEvent, createTasksFn func(BindingExecutionInfo)) {
165+
func (hc *HookController) HandleKubeEvent(event kemtypes.KubeEvent, handlerFunc func(BindingExecutionInfo)) {
166+
if hc.KubernetesController != nil {
167+
execInfo := hc.KubernetesController.HandleEvent(event)
168+
if handlerFunc != nil {
169+
handlerFunc(execInfo)
170+
}
171+
}
172+
}
173+
174+
func (hc *HookController) HandleKubeEventWithFormTask(event kemtypes.KubeEvent, createTasksFn func(BindingExecutionInfo) task.Task) task.Task {
165175
if hc.KubernetesController != nil {
166176
execInfo := hc.KubernetesController.HandleEvent(event)
167177
if createTasksFn != nil {
168-
createTasksFn(execInfo)
178+
return createTasksFn(execInfo)
169179
}
170180
}
181+
182+
return nil
171183
}
172184

173185
func (hc *HookController) HandleAdmissionEvent(event admission.Event, createTasksFn func(BindingExecutionInfo)) {
@@ -190,17 +202,40 @@ func (hc *HookController) HandleConversionEvent(crdName string, request *v1.Conv
190202
}
191203
}
192204

193-
func (hc *HookController) HandleScheduleEvent(crontab string, createTasksFn func(BindingExecutionInfo)) {
205+
func (hc *HookController) HandleScheduleEvent(crontab string, handlerFunc func(BindingExecutionInfo)) {
194206
if hc.ScheduleController == nil {
195207
return
196208
}
209+
197210
infos := hc.ScheduleController.HandleEvent(crontab)
198-
if createTasksFn == nil {
211+
if handlerFunc == nil {
199212
return
200213
}
214+
215+
for _, info := range infos {
216+
handlerFunc(info)
217+
}
218+
}
219+
220+
func (hc *HookController) HandleScheduleEventWithFormTasks(crontab string, createTasksFn func(BindingExecutionInfo) task.Task) []task.Task {
221+
if hc.ScheduleController == nil {
222+
return nil
223+
}
224+
225+
infos := hc.ScheduleController.HandleEvent(crontab)
226+
if createTasksFn == nil {
227+
return nil
228+
}
229+
230+
tasks := make([]task.Task, 0)
201231
for _, info := range infos {
202-
createTasksFn(info)
232+
task := createTasksFn(info)
233+
if task != nil {
234+
tasks = append(tasks, task)
235+
}
203236
}
237+
238+
return tasks
204239
}
205240

206241
func (hc *HookController) UnlockKubernetesEvents() {

pkg/hook/hook_manager.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager"
2121
kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types"
2222
schedulemanager "github.com/flant/shell-operator/pkg/schedule_manager"
23+
"github.com/flant/shell-operator/pkg/task"
2324
utils_file "github.com/flant/shell-operator/pkg/utils/file"
2425
"github.com/flant/shell-operator/pkg/webhook/admission"
2526
"github.com/flant/shell-operator/pkg/webhook/conversion"
@@ -284,34 +285,53 @@ func (hm *Manager) GetHooksInOrder(bindingType htypes.BindingType) ([]string, er
284285
return hooksNames, nil
285286
}
286287

287-
func (hm *Manager) HandleKubeEvent(kubeEvent kemtypes.KubeEvent, createTaskFn func(*Hook, controller.BindingExecutionInfo)) {
288+
func (hm *Manager) CreateTasksFromKubeEvent(kubeEvent kemtypes.KubeEvent, createTaskFn func(*Hook, controller.BindingExecutionInfo) task.Task) []task.Task {
288289
kubeHooks, _ := hm.GetHooksInOrder(htypes.OnKubernetesEvent)
290+
tasks := make([]task.Task, 0)
289291

290292
for _, hookName := range kubeHooks {
291293
h := hm.GetHook(hookName)
292294

293295
if h.HookController.CanHandleKubeEvent(kubeEvent) {
294-
h.HookController.HandleKubeEvent(kubeEvent, func(info controller.BindingExecutionInfo) {
296+
task := h.HookController.HandleKubeEventWithFormTask(kubeEvent, func(info controller.BindingExecutionInfo) task.Task {
295297
if createTaskFn != nil {
296-
createTaskFn(h, info)
298+
return createTaskFn(h, info)
297299
}
300+
301+
return nil
298302
})
303+
304+
if task != nil {
305+
tasks = append(tasks, task)
306+
}
299307
}
300308
}
309+
310+
return tasks
301311
}
302312

303-
func (hm *Manager) HandleScheduleEvent(crontab string, createTaskFn func(*Hook, controller.BindingExecutionInfo)) {
313+
func (hm *Manager) HandleCreateTasksFromScheduleEvent(crontab string, createTaskFn func(*Hook, controller.BindingExecutionInfo) task.Task) []task.Task {
304314
schHooks, _ := hm.GetHooksInOrder(htypes.Schedule)
315+
tasks := make([]task.Task, 0)
316+
305317
for _, hookName := range schHooks {
306318
h := hm.GetHook(hookName)
307319
if h.HookController.CanHandleScheduleEvent(crontab) {
308-
h.HookController.HandleScheduleEvent(crontab, func(info controller.BindingExecutionInfo) {
320+
newTasks := h.HookController.HandleScheduleEventWithFormTasks(crontab, func(info controller.BindingExecutionInfo) task.Task {
309321
if createTaskFn != nil {
310-
createTaskFn(h, info)
322+
return createTaskFn(h, info)
311323
}
324+
325+
return nil
312326
})
327+
328+
if len(newTasks) > 0 {
329+
tasks = append(tasks, newTasks...)
330+
}
313331
}
314332
}
333+
334+
return tasks
315335
}
316336

317337
func (hm *Manager) HandleAdmissionEvent(event admission.Event, createTaskFn func(*Hook, controller.BindingExecutionInfo)) {

pkg/shell-operator/operator.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,7 @@ func (op *ShellOperator) initHookManager() error {
139139
logEntry := utils.EnrichLoggerWithLabels(op.logger, logLabels)
140140
logEntry.Debug("Create tasks for 'kubernetes' event", slog.String("name", kubeEvent.String()))
141141

142-
var tasks []task.Task
143-
op.HookManager.HandleKubeEvent(kubeEvent, func(hook *hook.Hook, info controller.BindingExecutionInfo) {
142+
return op.HookManager.CreateTasksFromKubeEvent(kubeEvent, func(hook *hook.Hook, info controller.BindingExecutionInfo) task.Task {
144143
newTask := task.NewTask(task_metadata.HookRun).
145144
WithMetadata(task_metadata.HookMetadata{
146145
HookName: hook.Name,
@@ -152,13 +151,12 @@ func (op *ShellOperator) initHookManager() error {
152151
}).
153152
WithLogLabels(logLabels).
154153
WithQueueName(info.QueueName)
155-
tasks = append(tasks, newTask.WithQueuedAt(time.Now()))
156154

157155
logEntry.With("queue", info.QueueName).
158156
Info("queue task", slog.String("name", newTask.GetDescription()))
159-
})
160157

161-
return tasks
158+
return newTask.WithQueuedAt(time.Now())
159+
})
162160
})
163161
op.ManagerEventsHandler.WithScheduleEventHandler(func(crontab string) []task.Task {
164162
logLabels := map[string]string{
@@ -168,8 +166,7 @@ func (op *ShellOperator) initHookManager() error {
168166
logEntry := utils.EnrichLoggerWithLabels(op.logger, logLabels)
169167
logEntry.Debug("Create tasks for 'schedule' event", slog.String("name", crontab))
170168

171-
var tasks []task.Task
172-
op.HookManager.HandleScheduleEvent(crontab, func(hook *hook.Hook, info controller.BindingExecutionInfo) {
169+
return op.HookManager.HandleCreateTasksFromScheduleEvent(crontab, func(hook *hook.Hook, info controller.BindingExecutionInfo) task.Task {
173170
newTask := task.NewTask(task_metadata.HookRun).
174171
WithMetadata(task_metadata.HookMetadata{
175172
HookName: hook.Name,
@@ -181,13 +178,12 @@ func (op *ShellOperator) initHookManager() error {
181178
}).
182179
WithLogLabels(logLabels).
183180
WithQueueName(info.QueueName)
184-
tasks = append(tasks, newTask.WithQueuedAt(time.Now()))
185181

186182
logEntry.With("queue", info.QueueName).
187183
Info("queue task", slog.String("name", newTask.GetDescription()))
188-
})
189184

190-
return tasks
185+
return newTask.WithQueuedAt(time.Now())
186+
})
191187
})
192188

193189
return nil

0 commit comments

Comments
 (0)