Skip to content

Commit 009d2b8

Browse files
authored
Merge pull request #130 from cschleiden/allow-multiple-timer-cancelations
Support multiple time cancellations
2 parents fa5edac + 533bf33 commit 009d2b8

File tree

2 files changed

+56
-12
lines changed

2 files changed

+56
-12
lines changed

internal/workflow/executor_test.go

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ func activity1(ctx context.Context, r int) (int, error) {
5151
func Test_Executor(t *testing.T) {
5252
tests := []struct {
5353
name string
54-
f func(r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider)
54+
f func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider)
5555
}{
5656
{
5757
name: "Simple_workflow_to_completion",
58-
f: func(r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
58+
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
5959
workflowHits := 0
6060
wf := func(ctx sync.Context) error {
6161
workflowHits++
@@ -77,7 +77,7 @@ func Test_Executor(t *testing.T) {
7777
},
7878
{
7979
name: "Workflow with activity command",
80-
f: func(r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
80+
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
8181
workflowActivityHit := 0
8282
workflowWithActivity := func(ctx sync.Context) error {
8383
workflowActivityHit++
@@ -123,7 +123,7 @@ func Test_Executor(t *testing.T) {
123123
},
124124
{
125125
name: "Workflow with activity replay",
126-
f: func(r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
126+
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
127127
workflowActivityHit := 0
128128
workflowWithActivity := func(ctx sync.Context) error {
129129
workflowActivityHit++
@@ -188,7 +188,7 @@ func Test_Executor(t *testing.T) {
188188
},
189189
{
190190
name: "Workflow with new events",
191-
f: func(r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
191+
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
192192
workflowActivityHit := 0
193193
workflowWithActivity := func(ctx sync.Context) error {
194194
workflowActivityHit++
@@ -270,7 +270,7 @@ func Test_Executor(t *testing.T) {
270270
},
271271
{
272272
name: "Workflow with selector",
273-
f: func(r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
273+
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
274274
var workflowWithSelectorHits int
275275

276276
workflowWithSelector := func(ctx sync.Context) error {
@@ -326,7 +326,7 @@ func Test_Executor(t *testing.T) {
326326
},
327327
{
328328
name: "Workflow with timer",
329-
f: func(r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
329+
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
330330
workflowTimerHits := 0
331331

332332
workflowWithTimer := func(ctx sync.Context) error {
@@ -370,9 +370,45 @@ func Test_Executor(t *testing.T) {
370370
require.IsType(t, &command.ScheduleTimerCommand{}, e.workflowState.Commands()[0])
371371
},
372372
},
373+
{
374+
name: "Cancel timer multiple times",
375+
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
376+
workflowWithTimer := func(ctx sync.Context) error {
377+
tctx, cancel := wf.WithCancel(ctx)
378+
379+
wf.ScheduleTimer(tctx, time.Millisecond*5)
380+
381+
// Cause checkpoint
382+
wf.ExecuteActivity[any](ctx, wf.DefaultActivityOptions, activity1, 42).Get(ctx)
383+
384+
cancel()
385+
cancel()
386+
387+
return nil
388+
}
389+
390+
r.RegisterWorkflow(workflowWithTimer)
391+
r.RegisterActivity(activity1)
392+
393+
task := startWorkflowTask(i.InstanceID, workflowWithTimer)
394+
395+
result, err := e.ExecuteTask(context.Background(), task)
396+
require.NoError(t, err)
397+
require.NoError(t, e.workflow.err)
398+
require.Len(t, e.workflowState.Commands(), 2)
399+
400+
task2 := continueTask(i.InstanceID, []history.Event{
401+
history.NewPendingEvent(time.Now(), history.EventType_ActivityCompleted, &history.ActivityCompletedAttributes{}, history.ScheduleEventID(2)),
402+
}, result.Executed[len(result.Executed)-1].SequenceID)
403+
404+
result, err = e.ExecuteTask(context.Background(), task2)
405+
require.NoError(t, err)
406+
require.NoError(t, e.workflow.err)
407+
},
408+
},
373409
{
374410
name: "Workflow with signal",
375-
f: func(r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
411+
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
376412
workflowSignalHits := 0
377413

378414
workflowWithSignal := func(ctx sync.Context) error {
@@ -423,7 +459,7 @@ func Test_Executor(t *testing.T) {
423459
},
424460
{
425461
name: "Completes workflow on unhandled error",
426-
f: func(r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
462+
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
427463
workflowPanic := func(ctx sync.Context) error {
428464
panic("wf error")
429465
}
@@ -457,7 +493,7 @@ func Test_Executor(t *testing.T) {
457493
},
458494
{
459495
name: "Schedule subworkflow",
460-
f: func(r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
496+
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
461497
subworkflow := func(ctx wf.Context) error {
462498
return nil
463499
}
@@ -485,7 +521,7 @@ func Test_Executor(t *testing.T) {
485521
},
486522
{
487523
name: "Schedule and cancel subworkflow",
488-
f: func(r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
524+
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
489525
subworkflow := func(ctx wf.Context) error {
490526
return nil
491527
}
@@ -544,7 +580,7 @@ func Test_Executor(t *testing.T) {
544580
i := core.NewWorkflowInstance(uuid.NewString(), "")
545581
hp := &testHistoryProvider{}
546582
e := newExecutor(r, i, hp)
547-
tt.f(r, e, i, hp)
583+
tt.f(t, r, e, i, hp)
548584
})
549585
}
550586
}

workflow/timer.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,15 @@ func ScheduleTimer(ctx Context, delay time.Duration) Future[struct{}] {
4141
if c, cancelable := ctx.Done().(sync.CancelChannel); cancelable {
4242
// Register a callback for when it's canceled. The only operation on the `Done` channel
4343
// is that it's closed when the context is canceled.
44+
canceled := false
45+
4446
c.AddReceiveCallback(func(v struct{}, ok bool) {
47+
// Ignore any future cancelation events for this timer
48+
if canceled {
49+
return
50+
}
51+
canceled = true
52+
4553
timerCmd.Cancel()
4654

4755
// Remove the timer future from the workflow state and mark it as canceled if it hasn't already fired. This is different

0 commit comments

Comments
 (0)