Skip to content

Commit fd20cdb

Browse files
committed
fix: cron scheduler exec context to avoid canceled watcher ctx (#6977)
* Fix cron scheduler exec context to avoid canceled watcher ctx * add context cancel tests
1 parent e0b3af1 commit fd20cdb

File tree

3 files changed

+108
-12
lines changed

3 files changed

+108
-12
lines changed

cmd/api-server/main.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -784,11 +784,8 @@ func main() {
784784
leaderTasks = append(leaderTasks, leader.Task{
785785
Name: "cron-scheduler",
786786
Start: func(taskCtx context.Context) error {
787+
scheduleManager.Start()
787788
go func() {
788-
// Start the schedule manager.
789-
scheduleManager.Start()
790-
// If we're no longer the leader then stop the manager.
791-
// This probably won't happen as losing leadership likely means we died.
792789
<-taskCtx.Done()
793790
scheduleManager.Stop()
794791
}()

internal/cronjob/robfig/manager.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,28 +28,36 @@ type Manager struct {
2828
cron *cron.Cron
2929
cronEntries map[string]map[string]cron.EntryID
3030
executor Executor
31+
execCtx context.Context
32+
execCancel context.CancelFunc
3133
}
3234

33-
func New(logger *zap.SugaredLogger, executor Executor, proModeEnabled bool) Manager {
34-
return Manager{
35+
func New(logger *zap.SugaredLogger, executor Executor, proModeEnabled bool) *Manager {
36+
ctx, cancel := context.WithCancel(context.Background())
37+
return &Manager{
3538
proModeEnabled: proModeEnabled,
3639
logger: logger,
3740
cron: cron.New(),
3841
cronEntries: make(map[string]map[string]cron.EntryID),
3942
executor: executor,
43+
execCtx: ctx,
44+
execCancel: cancel,
4045
}
4146
}
4247

4348
// Start the cron manager in its own goroutine, or no-op if already started.
44-
func (m Manager) Start() {
49+
func (m *Manager) Start() {
4550
m.logger.Infow("cron manager starting")
51+
m.execCancel()
52+
m.execCtx, m.execCancel = context.WithCancel(context.Background())
4653
m.cron.Start()
4754
m.logger.Infow("cron manager started")
4855
}
4956

5057
// Stop stops the cron manager if it is running; otherwise it does nothing.
51-
func (m Manager) Stop() {
58+
func (m *Manager) Stop() {
5259
m.logger.Infow("cron manager stopping")
60+
m.execCancel()
5361
m.cron.Stop()
5462
m.logger.Infow("cron manager stopped")
5563
}
@@ -62,7 +70,7 @@ func cronSpec(config testkube.TestWorkflowCronJobConfig) string {
6270
return spec
6371
}
6472

65-
func (m Manager) ReplaceWorkflowSchedules(ctx context.Context, workflow cronjob.Workflow, configs []testkube.TestWorkflowCronJobConfig) error {
73+
func (m *Manager) ReplaceWorkflowSchedules(ctx context.Context, workflow cronjob.Workflow, configs []testkube.TestWorkflowCronJobConfig) error {
6674
log := m.logger.With("workflow", workflow.Name)
6775
// Delete all existing schedules for this workflow.
6876
// This is because we may not know when a schedule is removed from
@@ -93,7 +101,7 @@ func (m Manager) ReplaceWorkflowSchedules(ctx context.Context, workflow cronjob.
93101
"cron", config.Cron,
94102
)
95103
}
96-
entryId, err := m.cron.AddJob(spec, m.testWorkflowExecuteJob(ctx, workflow.Name, spec, config))
104+
entryId, err := m.cron.AddJob(spec, m.testWorkflowExecuteJob(workflow.Name, spec, config))
97105
if err != nil {
98106
m.logger.Errorw("Error adding cron for workflow, continuing processing",
99107
"cron", spec,
@@ -115,8 +123,9 @@ func (m Manager) ReplaceWorkflowSchedules(ctx context.Context, workflow cronjob.
115123
return nil
116124
}
117125

118-
func (m Manager) testWorkflowExecuteJob(ctx context.Context, workflow, cronSpec string, config testkube.TestWorkflowCronJobConfig) cron.FuncJob {
126+
func (m *Manager) testWorkflowExecuteJob(workflow, cronSpec string, config testkube.TestWorkflowCronJobConfig) cron.FuncJob {
119127
return cron.FuncJob(func() {
128+
execCtx := m.execCtx
120129
var targets []*cloud.ExecutionTarget
121130
if config.Target != nil {
122131
targets = commonmapper.MapAllTargetsApiToGrpc([]testkube.ExecutionTarget{*config.Target})
@@ -141,7 +150,7 @@ func (m Manager) testWorkflowExecuteJob(ctx context.Context, workflow, cronSpec
141150
)
142151
log.Info("executing scheduled workflow")
143152

144-
results, err := m.executor.Execute(ctx, request)
153+
results, err := m.executor.Execute(execCtx, request)
145154
if err != nil {
146155
log.Errorw("unable to execute scheduled workflow",
147156
"error", err)
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package robfig
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"go.uber.org/zap"
8+
9+
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
10+
"github.com/kubeshop/testkube/pkg/cloud"
11+
"github.com/kubeshop/testkube/pkg/cronjob"
12+
)
13+
14+
type recordingExecutor struct {
15+
calls int
16+
ctx context.Context
17+
req *cloud.ScheduleRequest
18+
}
19+
20+
func (e *recordingExecutor) Execute(ctx context.Context, req *cloud.ScheduleRequest) ([]testkube.TestWorkflowExecution, error) {
21+
e.calls++
22+
e.ctx = ctx
23+
e.req = req
24+
return nil, nil
25+
}
26+
27+
func TestManagerStartStopResetsExecContext(t *testing.T) {
28+
logger := zap.NewNop().Sugar()
29+
executor := &recordingExecutor{}
30+
manager := New(logger, executor, false)
31+
32+
oldCtx := manager.execCtx
33+
manager.Start()
34+
35+
select {
36+
case <-oldCtx.Done():
37+
default:
38+
t.Fatal("expected previous exec context to be canceled on start")
39+
}
40+
41+
if err := manager.execCtx.Err(); err != nil {
42+
t.Fatalf("expected new exec context to be active, got %v", err)
43+
}
44+
45+
currentCtx := manager.execCtx
46+
manager.Stop()
47+
48+
select {
49+
case <-currentCtx.Done():
50+
default:
51+
t.Fatal("expected exec context to be canceled on stop")
52+
}
53+
}
54+
55+
func TestJobUsesManagerContext(t *testing.T) {
56+
logger := zap.NewNop().Sugar()
57+
executor := &recordingExecutor{}
58+
manager := New(logger, executor, false)
59+
60+
canceledCtx, cancel := context.WithCancel(context.Background())
61+
cancel()
62+
63+
workflow := cronjob.Workflow{Name: "workflow-a", EnvId: "env-1"}
64+
config := testkube.TestWorkflowCronJobConfig{Cron: "* * * * *"}
65+
66+
if err := manager.ReplaceWorkflowSchedules(canceledCtx, workflow, []testkube.TestWorkflowCronJobConfig{config}); err != nil {
67+
t.Fatalf("unexpected error setting schedules: %v", err)
68+
}
69+
70+
entryID := manager.cronEntries[workflow.Name][cronSpec(config)]
71+
entry := manager.cron.Entry(entryID)
72+
if entry.Job == nil {
73+
t.Fatal("expected scheduled job to be registered")
74+
}
75+
76+
entry.Job.Run()
77+
78+
if executor.calls != 1 {
79+
t.Fatalf("expected executor to be called once, got %d", executor.calls)
80+
}
81+
if executor.ctx == nil {
82+
t.Fatal("expected executor context to be set")
83+
}
84+
if err := executor.ctx.Err(); err != nil {
85+
t.Fatalf("expected executor context to be active, got %v", err)
86+
}
87+
if executor.ctx == canceledCtx {
88+
t.Fatal("expected executor to use manager context, got canceled caller context")
89+
}
90+
}

0 commit comments

Comments
 (0)