Skip to content

Commit fa5edac

Browse files
authored
Merge pull request #128 from cschleiden/cancellation-replay
Improve command state machine
2 parents 107d8ea + 2830a14 commit fa5edac

27 files changed

+1446
-921
lines changed

backend/test/e2e.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,49 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
392392
require.Len(t, futureEvents, 0, "no future events should be scheduled")
393393
},
394394
},
395+
{
396+
name: "Timer_CancelTwice",
397+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
398+
a := func(ctx context.Context) error {
399+
return nil
400+
}
401+
wf := func(ctx workflow.Context) error {
402+
tctx, cancel := workflow.WithCancel(ctx)
403+
f := workflow.ScheduleTimer(tctx, time.Second*10)
404+
405+
// Force the checkpoint before continuing the execution
406+
workflow.ExecuteActivity[any](ctx, workflow.DefaultActivityOptions, a).Get(ctx)
407+
408+
// Cancel timer
409+
cancel()
410+
411+
// Force another checkpoint
412+
workflow.ExecuteActivity[any](ctx, workflow.DefaultActivityOptions, a).Get(ctx)
413+
414+
cancel()
415+
416+
// Force another checkpoint
417+
workflow.ExecuteActivity[any](ctx, workflow.DefaultActivityOptions, a).Get(ctx)
418+
419+
if _, err := f.Get(ctx); err != nil && err != workflow.Canceled {
420+
return err
421+
}
422+
423+
return nil
424+
}
425+
register(t, ctx, w, []interface{}{wf}, []interface{}{a})
426+
427+
instance := runWorkflow(t, ctx, c, wf)
428+
_, err := client.GetWorkflowResult[any](ctx, c, instance, time.Second*5)
429+
require.NoError(t, err)
430+
431+
historyContains(ctx, t, b, instance, history.EventType_TimerScheduled, history.EventType_TimerCanceled)
432+
433+
futureEvents, err := b.GetFutureEvents(ctx)
434+
require.NoError(t, err)
435+
require.Len(t, futureEvents, 0, "no future events should be scheduled")
436+
},
437+
},
395438
{
396439
name: "Timer_CancelBeforeFiringRemovesFutureEvent",
397440
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
@@ -408,6 +451,9 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
408451
// Cancel timer
409452
cancel()
410453

454+
// Force another checkpoint
455+
workflow.ExecuteActivity[any](ctx, workflow.DefaultActivityOptions, a).Get(ctx)
456+
411457
if _, err := f.Get(ctx); err != nil && err != workflow.Canceled {
412458
return err
413459
}

docs/commands.drawio.png

38.6 KB
Loading

docs/commands.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Commands
2+
3+
Commands encode decisions during the workflow execution. When a workflow like
4+
5+
```go
6+
func Workflow(ctx workflow.Context) error {
7+
_, err := workflow.ExecuteActivity(ctx, "Activity1").Get(ctx, nil)
8+
9+
return err
10+
}
11+
```
12+
13+
is executed for the first time, a `ScheduleActivity` command is tracked and when the workflow reaches a checkpoint (it yields), this command is `Execute`d, and resulting `EventType_ActivityScheduled` event is processed by the backend. When the workflow is replayed, the command is also generated, but once the workflow reaches a checkpoint and history events are replayed, it is marked as `Committed` since we already have the `EventType_ActivityScheduled` message in the history.
14+
15+
This will cause the command to **not** produce another event when the workflow yields again. Other operations are implemented in a similar way.
16+
17+
18+
## States & Transitions
19+
20+
Commands are a simpel state machine and their states and transitions are the following:
21+
22+
![](./commands.drawio.png)
23+
24+
- Not all commands are cancelable. The ones which are are:
25+
- ScheduleSubWorkflow
26+
- ScheduleTimer
27+
28+
- No all commands are committable, the ones which aren't are:
29+
- Sideeffect

internal/command/cancel_subworkflow.go

Lines changed: 0 additions & 55 deletions
This file was deleted.

internal/command/cancel_timer.go

Lines changed: 0 additions & 43 deletions
This file was deleted.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package command
2+
3+
type CancelableCommand interface {
4+
Command
5+
6+
// Cancel cancels the command.
7+
Cancel()
8+
9+
// HandleCancel handles a cancel event during replay
10+
HandleCancel()
11+
}
12+
13+
type cancelableCommand struct {
14+
command
15+
}
16+
17+
func (c *cancelableCommand) Cancel() {
18+
switch c.state {
19+
case CommandState_Pending, CommandState_Canceled:
20+
c.state = CommandState_Canceled
21+
case CommandState_Committed:
22+
c.state = CommandState_CancelPending
23+
default:
24+
c.invalidStateTransition(CommandState_Canceled)
25+
}
26+
}
27+
28+
func (c *cancelableCommand) HandleCancel() {
29+
switch c.state {
30+
case CommandState_CancelPending:
31+
c.state = CommandState_Canceled
32+
default:
33+
c.invalidStateTransition(CommandState_Canceled)
34+
}
35+
}
36+
37+
func (c *cancelableCommand) Done() {
38+
switch c.state {
39+
case CommandState_Committed, CommandState_Canceled:
40+
c.state = CommandState_Done
41+
default:
42+
c.invalidStateTransition(CommandState_Done)
43+
}
44+
}

internal/command/command.go

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,25 @@
11
package command
22

33
import (
4+
"fmt"
5+
46
"github.com/benbjohnson/clock"
57
"github.com/cschleiden/go-workflows/internal/history"
68
)
79

8-
type CommandState int
9-
10-
// ┌───────┐
11-
// ┌──────┤Pending│ - Command is pending, has just been added
12-
// │ └───────┘
13-
// │ ▼
14-
// │ ┌─────────┐
15-
// │ │Committed│ - Command has been committed. Its results (e.g., events) have been checkpointed
16-
// │ └─────────┘
17-
// │ ▼
18-
// │ ┌────┐
19-
// └──────►│Done│ - Command has been marked as done.
20-
// └────┘
21-
const (
22-
CommandState_Pending CommandState = iota
23-
CommandState_Committed
24-
CommandState_Done
25-
)
26-
2710
type Command interface {
2811
ID() int64
2912

3013
Type() string
3114

15+
// State returns the current state of the command.
3216
State() CommandState
3317

34-
Committed() bool
18+
// Execute processes the command in its current state and moves it to the next state.
19+
Execute(clock.Clock) *CommandResult
3520

36-
Commit(clock clock.Clock) *CommandResult
21+
// Commit marks the command as committed without executing it.
22+
Commit()
3723

3824
// Done marks the command as done. This transitions the state to done and indicates that the result
3925
// of this command has been applied.
@@ -49,32 +35,43 @@ type CommandResult struct {
4935
}
5036

5137
type command struct {
38+
name string
39+
5240
state CommandState
5341

5442
id int64
5543
}
5644

57-
func (c *command) commit() {
58-
if c.state != CommandState_Pending {
59-
panic("command already committed")
60-
}
61-
62-
c.state = CommandState_Committed
63-
}
64-
6545
func (c *command) ID() int64 {
6646
return c.id
6747
}
6848

69-
func (c *command) Committed() bool {
70-
return c.state >= CommandState_Committed
71-
}
72-
7349
func (c *command) State() CommandState {
7450
return c.state
7551
}
7652

77-
// Done marks the command as done
53+
func (c *command) Type() string {
54+
return c.name
55+
}
56+
57+
func (c *command) Commit() {
58+
switch c.state {
59+
case CommandState_Pending:
60+
c.state = CommandState_Committed
61+
default:
62+
c.invalidStateTransition(CommandState_Committed)
63+
}
64+
}
65+
7866
func (c *command) Done() {
79-
c.state = CommandState_Done
67+
switch c.state {
68+
case CommandState_Committed:
69+
c.state = CommandState_Done
70+
default:
71+
c.invalidStateTransition(CommandState_Done)
72+
}
73+
}
74+
75+
func (c *command) invalidStateTransition(state CommandState) {
76+
panic(fmt.Errorf("invalid state transition for command %s: %s -> %s", c.name, c.State().String(), state.String()))
8077
}

internal/command/command_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package command
2+
3+
import (
4+
"testing"
5+
6+
"github.com/benbjohnson/clock"
7+
"github.com/cschleiden/go-workflows/internal/history"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func assertExecuteNoEvent(t *testing.T, c Command, expectedState CommandState) {
12+
r := c.Execute(clock.New())
13+
14+
require.Nil(t, r)
15+
}
16+
17+
func assertExecuteWithEvent(t *testing.T, c Command, expectedState CommandState, expectedEventType history.EventType) *CommandResult {
18+
r := c.Execute(clock.New())
19+
20+
require.NotNil(t, r)
21+
require.Equal(t, expectedState, c.State())
22+
require.Len(t, r.Events, 1)
23+
require.Equal(t, expectedEventType, r.Events[0].Type)
24+
25+
return r
26+
}

0 commit comments

Comments
 (0)