Skip to content

Commit 2fb0a2c

Browse files
committed
Add Committed access for commands
1 parent 46998ea commit 2fb0a2c

File tree

4 files changed

+25
-18
lines changed

4 files changed

+25
-18
lines changed

internal/command/command.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,17 @@ import (
77

88
type CommandState int
99

10-
// ┌───────┐
11-
// Pending
12-
// └───────┘
13-
// ▼
14-
// ┌─────────┐
15-
// │Committed│
16-
// └─────────┘
17-
// ▼
18-
// ┌────┐
19-
// │Done│
20-
// └────┘
10+
// ┌───────┐
11+
// ┌──────┤Pending│ - Command is pending, has just been added
12+
// └───────┘
13+
//
14+
// ┌─────────┐
15+
// │Committed│ - Command has been committed. Its results (e.g., events) have been checkpointed
16+
// └─────────┘
17+
//
18+
// ┌────┐
19+
// └──────►│Done│ - Command has been marked as done.
20+
// └────┘
2121
const (
2222
CommandState_Pending CommandState = iota
2323
CommandState_Committed
@@ -27,15 +27,17 @@ const (
2727
type Command interface {
2828
ID() int64
2929

30+
Type() string
31+
32+
State() CommandState
33+
34+
Committed() bool
35+
3036
Commit(clock clock.Clock) *CommandResult
3137

3238
// Done marks the command as done. This transitions the state to done and indicates that the result
3339
// of this command has been applied.
3440
Done()
35-
36-
State() CommandState
37-
38-
Type() string
3941
}
4042

4143
type CommandResult struct {
@@ -64,10 +66,15 @@ func (c *command) ID() int64 {
6466
return c.id
6567
}
6668

69+
func (c *command) Committed() bool {
70+
return c.state >= CommandState_Committed
71+
}
72+
6773
func (c *command) State() CommandState {
6874
return c.state
6975
}
7076

77+
// Done marks the command as done
7178
func (c *command) Done() {
7279
c.state = CommandState_Done
7380
}

workflow/activity.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func executeActivity[TResult any](ctx Context, options ActivityOptions, attempt
6666
if c, ok := d.(sync.ChannelInternal[struct{}]); ok {
6767
if _, ok := c.ReceiveNonBlocking(); ok {
6868
// Workflow has been canceled, check if the activity has already been scheduled, no need to schedule otherwise
69-
if cmd.State() != command.CommandState_Committed {
69+
if !cmd.Committed() {
7070
cmd.Done()
7171
wfState.RemoveFuture(scheduleEventID)
7272
f.Set(*new(TResult), sync.Canceled)

workflow/subworkflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func createSubWorkflowInstance[TResult any](ctx sync.Context, options SubWorkflo
7171
// Check if the channel is cancelable
7272
if c, cancelable := ctx.Done().(sync.CancelChannel); cancelable {
7373
c.AddReceiveCallback(func(v struct{}, ok bool) {
74-
if cmd.State() == command.CommandState_Committed {
74+
if cmd.Committed() {
7575
// The command is committed, that means the sub-workflow is already started. Create and add a cancel command
7676
// to stop the sub-workflow execution.
7777
cancelScheduleEventID := wfState.GetNextScheduleEventID()

workflow/timer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func ScheduleTimer(ctx Context, delay time.Duration) Future[struct{}] {
4242
// Register a callback for when it's canceled. The only operation on the `Done` channel
4343
// is that it's closed when the context is canceled.
4444
c.AddReceiveCallback(func(v struct{}, ok bool) {
45-
if timerCmd.State() == command.CommandState_Committed {
45+
if timerCmd.Committed() {
4646
// If the timer command is already committed, create a cancel command to allow the backend
4747
// to clean up the scheduled timer message.
4848
cancelScheduleEventID := wfState.GetNextScheduleEventID()

0 commit comments

Comments
 (0)