Skip to content

Commit 8d46b94

Browse files
authored
[shell-operator] feat/add ctx to handle task (#745)
Signed-off-by: Pavel Okhlopkov <pavel.okhlopkov@flant.com> Co-authored-by: Pavel Okhlopkov <pavel.okhlopkov@flant.com>
1 parent 59e8151 commit 8d46b94

File tree

6 files changed

+30
-23
lines changed

6 files changed

+30
-23
lines changed

pkg/hook/types/bindings.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package types
22

33
import (
4+
"fmt"
45
"time"
56

67
kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager"
@@ -9,8 +10,14 @@ import (
910
"github.com/flant/shell-operator/pkg/webhook/conversion"
1011
)
1112

13+
var _ fmt.Stringer = (*BindingType)(nil)
14+
1215
type BindingType string
1316

17+
func (bt BindingType) String() string {
18+
return string(bt)
19+
}
20+
1421
const (
1522
Schedule BindingType = "schedule"
1623
OnStartup BindingType = "onStartup"

pkg/shell-operator/combine_binding_context_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func Test_CombineBindingContext_MultipleHooks(t *testing.T) {
3333

3434
TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
3535
TaskQueues.WithContext(context.Background())
36-
TaskQueues.NewNamedQueue("test_multiple_hooks", func(_ task.Task) queue.TaskResult {
36+
TaskQueues.NewNamedQueue("test_multiple_hooks", func(_ context.Context, _ task.Task) queue.TaskResult {
3737
return queue.TaskResult{
3838
Status: "Success",
3939
}
@@ -138,7 +138,7 @@ func Test_CombineBindingContext_Nil_On_NoCombine(t *testing.T) {
138138

139139
TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
140140
TaskQueues.WithContext(context.Background())
141-
TaskQueues.NewNamedQueue("test_no_combine", func(_ task.Task) queue.TaskResult {
141+
TaskQueues.NewNamedQueue("test_no_combine", func(_ context.Context, _ task.Task) queue.TaskResult {
142142
return queue.TaskResult{
143143
Status: "Success",
144144
}
@@ -208,7 +208,7 @@ func Test_CombineBindingContext_Group_Compaction(t *testing.T) {
208208

209209
TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
210210
TaskQueues.WithContext(context.Background())
211-
TaskQueues.NewNamedQueue("test_multiple_hooks", func(_ task.Task) queue.TaskResult {
211+
TaskQueues.NewNamedQueue("test_multiple_hooks", func(_ context.Context, _ task.Task) queue.TaskResult {
212212
return queue.TaskResult{
213213
Status: "Success",
214214
}
@@ -321,7 +321,7 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) {
321321

322322
TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
323323
TaskQueues.WithContext(context.Background())
324-
TaskQueues.NewNamedQueue("test_multiple_hooks", func(_ task.Task) queue.TaskResult {
324+
TaskQueues.NewNamedQueue("test_multiple_hooks", func(_ context.Context, _ task.Task) queue.TaskResult {
325325
return queue.TaskResult{
326326
Status: "Success",
327327
}

pkg/shell-operator/operator.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (op *ShellOperator) Start() {
100100
// Create 'main' queue and add onStartup tasks and enable bindings tasks.
101101
op.bootstrapMainQueue(op.TaskQueues)
102102
// Start main task queue handler
103-
op.TaskQueues.StartMain()
103+
op.TaskQueues.StartMain(op.ctx)
104104
op.initAndStartHookQueues()
105105

106106
// Start emit "live" metrics
@@ -255,7 +255,7 @@ func (op *ShellOperator) initValidatingWebhookManager() error {
255255
return nil, fmt.Errorf("no hook found for '%s' '%s'", event.ConfigurationId, event.WebhookId)
256256
}
257257

258-
res := op.taskHandler(admissionTask)
258+
res := op.taskHandler(op.ctx, admissionTask)
259259

260260
if res.Status == "Fail" {
261261
return &admission.Response{
@@ -361,7 +361,7 @@ func (op *ShellOperator) conversionEventHandler(crdName string, request *v1.Conv
361361
return nil, fmt.Errorf("no hook found for '%s' event for crd/%s", string(types.KubernetesConversion), crdName)
362362
}
363363

364-
res := op.taskHandler(convTask)
364+
res := op.taskHandler(op.ctx, convTask)
365365

366366
if res.Status == "Fail" {
367367
return &conversion.Response{
@@ -409,7 +409,7 @@ func (op *ShellOperator) conversionEventHandler(crdName string, request *v1.Conv
409409
}
410410

411411
// taskHandler
412-
func (op *ShellOperator) taskHandler(t task.Task) queue.TaskResult {
412+
func (op *ShellOperator) taskHandler(_ context.Context, t task.Task) queue.TaskResult {
413413
logEntry := op.logger.With("operator.component", "taskRunner")
414414
hookMeta := task_metadata.HookMetadataAccessor(t)
415415
var res queue.TaskResult
@@ -882,7 +882,7 @@ func (op *ShellOperator) initAndStartHookQueues() {
882882
for _, hookBinding := range h.Config.Schedules {
883883
if op.TaskQueues.GetByName(hookBinding.Queue) == nil {
884884
op.TaskQueues.NewNamedQueue(hookBinding.Queue, op.taskHandler)
885-
op.TaskQueues.GetByName(hookBinding.Queue).Start()
885+
op.TaskQueues.GetByName(hookBinding.Queue).Start(op.ctx)
886886
}
887887
}
888888
}
@@ -893,7 +893,7 @@ func (op *ShellOperator) initAndStartHookQueues() {
893893
for _, hookBinding := range h.Config.OnKubernetesEvents {
894894
if op.TaskQueues.GetByName(hookBinding.Queue) == nil {
895895
op.TaskQueues.NewNamedQueue(hookBinding.Queue, op.taskHandler)
896-
op.TaskQueues.GetByName(hookBinding.Queue).Start()
896+
op.TaskQueues.GetByName(hookBinding.Queue).Start(op.ctx)
897897
}
898898
}
899899
}

pkg/task/queue/queue_set.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,14 @@ func (tqs *TaskQueueSet) Stop() {
5454
tqs.m.RUnlock()
5555
}
5656

57-
func (tqs *TaskQueueSet) StartMain() {
58-
tqs.GetByName(tqs.MainName).Start()
57+
func (tqs *TaskQueueSet) StartMain(ctx context.Context) {
58+
tqs.GetByName(tqs.MainName).Start(ctx)
5959
}
6060

61-
func (tqs *TaskQueueSet) Start() {
61+
func (tqs *TaskQueueSet) Start(ctx context.Context) {
6262
tqs.m.RLock()
6363
for _, q := range tqs.Queues {
64-
q.Start()
64+
q.Start(ctx)
6565
}
6666

6767
tqs.m.RUnlock()
@@ -73,7 +73,7 @@ func (tqs *TaskQueueSet) Add(queue *TaskQueue) {
7373
tqs.m.Unlock()
7474
}
7575

76-
func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(task.Task) TaskResult) {
76+
func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult) {
7777
q := NewTasksQueue()
7878
q.WithName(name)
7979
q.WithHandler(handler)

pkg/task/queue/task_queue.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ type TaskQueue struct {
7272
debug bool
7373

7474
Name string
75-
Handler func(task.Task) TaskResult
75+
Handler func(ctx context.Context, t task.Task) TaskResult
7676
Status string
7777

7878
measureActionFn func()
@@ -113,7 +113,7 @@ func (q *TaskQueue) WithName(name string) *TaskQueue {
113113
return q
114114
}
115115

116-
func (q *TaskQueue) WithHandler(fn func(task.Task) TaskResult) *TaskQueue {
116+
func (q *TaskQueue) WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) *TaskQueue {
117117
q.Handler = fn
118118
return q
119119
}
@@ -408,7 +408,7 @@ func (q *TaskQueue) Stop() {
408408
}
409409
}
410410

411-
func (q *TaskQueue) Start() {
411+
func (q *TaskQueue) Start(ctx context.Context) {
412412
if q.started {
413413
return
414414
}
@@ -438,7 +438,7 @@ func (q *TaskQueue) Start() {
438438
// Now the task can be handled!
439439
var nextSleepDelay time.Duration
440440
q.SetStatus("run first task")
441-
taskRes := q.Handler(t)
441+
taskRes := q.Handler(ctx, t)
442442

443443
// Check Done channel after long-running operation.
444444
select {

pkg/task/queue/task_queue_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func Test_ExponentialBackoff(t *testing.T) {
122122
const fails = 10
123123
failsCount := fails
124124
queueStopCh := make(chan struct{}, 1)
125-
q.WithHandler(func(t task.Task) TaskResult {
125+
q.WithHandler(func(_ context.Context, t task.Task) TaskResult {
126126
var res TaskResult
127127
runsAt = append(runsAt, time.Now())
128128
failureCounts = append(failureCounts, t.GetFailureCount())
@@ -148,7 +148,7 @@ func Test_ExponentialBackoff(t *testing.T) {
148148
return mockExponentialDelay
149149
}
150150

151-
q.Start()
151+
q.Start(context.TODO())
152152

153153
// Expect taskHandler returns Success result.
154154
g.Eventually(queueStopCh, "5s", "20ms").Should(BeClosed(), "Should handle first task in queue successfully")
@@ -221,7 +221,7 @@ func Test_CancelDelay(t *testing.T) {
221221
endedAt := startedAt
222222
delayStartsCh := make(chan struct{}, 1)
223223
healingDoneCh := make(chan struct{}, 1)
224-
q.WithHandler(func(t task.Task) TaskResult {
224+
q.WithHandler(func(_ context.Context, t task.Task) TaskResult {
225225
var res TaskResult
226226
if t.GetId() == ErrTask.GetId() {
227227
res.Status = Fail
@@ -255,7 +255,7 @@ func Test_CancelDelay(t *testing.T) {
255255
}
256256

257257
// Start handling 'erroneous' task.
258-
q.Start()
258+
q.Start(context.TODO())
259259

260260
// Expect taskHandler returns Success result.
261261
g.Eventually(delayStartsCh, "5s", "20ms").Should(BeClosed(), "Should handle failed task and starts a delay")

0 commit comments

Comments
 (0)