Skip to content

Commit e8946b6

Browse files
authored
Merge pull request #118 from cschleiden/fix-error-during-replay
Ensure errors encountered during replay can be saved & Fix SideEffect determinism check
2 parents bd41a4a + 6b87cbd commit e8946b6

File tree

5 files changed

+94
-23
lines changed

5 files changed

+94
-23
lines changed

backend/redis/events.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string,
3131
// ARGV[1] - event data as serialized strings
3232
var addEventsToStreamCmd = redis.NewScript(`
3333
local msgID = ""
34-
for i = 1, #ARGV,2 do
34+
for i = 1, #ARGV, 2 do
3535
msgID = redis.call("XADD", KEYS[1], ARGV[i], "event", ARGV[i + 1])
3636
end
3737
return msgID
@@ -45,6 +45,8 @@ func addEventsToHistoryStreamP(ctx context.Context, p redis.Pipeliner, streamKey
4545
return err
4646
}
4747

48+
// log.Println("addEventsToHistoryStreamP:", event.SequenceID, string(eventData))
49+
4850
eventsData = append(eventsData, historyID(event.SequenceID))
4951
eventsData = append(eventsData, string(eventData))
5052
}

backend/test/e2e.go

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import (
2020

2121
func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBackend)) {
2222
tests := []struct {
23-
name string
24-
f func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend)
23+
name string
24+
withoutCache bool // If set, test will only be run when the cache is disabled
25+
f func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend)
2526
}{
2627
{
2728
name: "SimpleWorkflow",
@@ -125,6 +126,35 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
125126
require.ErrorContains(t, err, "converting activity inputs: mismatched argument count: expected 2, got 1")
126127
},
127128
},
129+
{
130+
name: "SideEffect_Simple",
131+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
132+
i := 2
133+
wf := func(ctx workflow.Context) (int, error) {
134+
r1, _ := workflow.SideEffect(ctx, func(ctx workflow.Context) int {
135+
i++
136+
return i
137+
}).Get(ctx)
138+
139+
// Do something to force the task to end
140+
workflow.Sleep(ctx, time.Millisecond*1)
141+
142+
r2, _ := workflow.SideEffect(ctx, func(ctx workflow.Context) int {
143+
i++
144+
return i
145+
}).Get(ctx)
146+
147+
return r1 + r2, nil
148+
}
149+
register(t, ctx, w, []interface{}{wf}, nil)
150+
151+
instance := runWorkflow(t, ctx, c, wf)
152+
153+
r, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*5)
154+
require.NoError(t, err)
155+
require.Equal(t, 7, r)
156+
},
157+
},
128158
{
129159
name: "SubWorkflow_Simple",
130160
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
@@ -363,10 +393,46 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
363393
require.Len(t, futureEvents, 0, "no future events should be scheduled")
364394
},
365395
},
396+
{
397+
name: "NonDeterminism",
398+
withoutCache: true,
399+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
400+
i := 0
401+
wf := func(ctx workflow.Context) (int, error) {
402+
var r int
403+
404+
i++
405+
if i%2 == 0 {
406+
r, _ = workflow.SideEffect(ctx, func(ctx workflow.Context) int {
407+
return 1
408+
}).Get(ctx)
409+
} else {
410+
workflow.Sleep(ctx, time.Millisecond*1)
411+
}
412+
413+
// Do something to force the task to end
414+
workflow.Sleep(ctx, time.Millisecond*1)
415+
416+
return r, nil
417+
}
418+
register(t, ctx, w, []interface{}{wf}, nil)
419+
420+
instance := runWorkflow(t, ctx, c, wf)
421+
422+
r, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*5)
423+
require.NoError(t, err)
424+
require.Equal(t, 0, r)
425+
},
426+
},
366427
}
367428

368429
run := func(suffix string, workerOptions *worker.Options) {
369430
for _, tt := range tests {
431+
if tt.withoutCache && workerOptions.WorkflowExecutorCache != nil {
432+
// Skip test
433+
continue
434+
}
435+
370436
t.Run(tt.name+suffix, func(t *testing.T) {
371437
b := setup()
372438
ctx := context.Background()

internal/command/sideeffect.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,19 @@ type SideEffectCommand struct {
1414

1515
var _ Command = (*SideEffectCommand)(nil)
1616

17-
func NewSideEffectCommand(id int64, result payload.Payload) *SideEffectCommand {
17+
func NewSideEffectCommand(id int64) *SideEffectCommand {
1818
return &SideEffectCommand{
1919
command: command{
2020
state: CommandState_Pending,
2121
id: id,
2222
},
23-
result: result,
2423
}
2524
}
2625

26+
func (c *SideEffectCommand) SetResult(result payload.Payload) {
27+
c.result = result
28+
}
29+
2730
func (c *SideEffectCommand) Type() string {
2831
return "SideEffect"
2932
}

internal/workflow/executor.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
115115
// Fail workflow with an error. Skip executing new events, but still go through the commands
116116
e.workflowCompleted(nil, err)
117117
skipNewEvents = true
118+
119+
// With an error occurred during replay, we need to ensure new events don't get duplicate sequence ids
120+
e.lastSequenceID = t.LastSequenceID
118121
} else if t.LastSequenceID != e.lastSequenceID {
119122
logger.Error("After replaying history, task still has newer history than current state", "task_sequence_id", t.LastSequenceID, "local_sequence_id", e.lastSequenceID)
120123

@@ -494,12 +497,12 @@ func (e *executor) handleSignalReceived(event history.Event, a *history.SignalRe
494497
func (e *executor) handleSideEffectResult(event history.Event, a *history.SideEffectResultAttributes) error {
495498
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
496499
if c == nil {
497-
return fmt.Errorf("previous workflow execution scheduled a sub workflow")
500+
return fmt.Errorf("previous workflow execution scheduled a side effect")
498501
}
499502

500503
sec, ok := c.(*command.SideEffectCommand)
501504
if !ok {
502-
return fmt.Errorf("previous workflow execution scheduled a sub workflow, not: %v", c.Type())
505+
return fmt.Errorf("previous workflow execution scheduled a side effect, not: %v", c.Type())
503506
}
504507

505508
sec.Done()

workflow/sideeffect.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,23 @@ func SideEffect[TResult any](ctx Context, f func(ctx Context) TResult) Future[TR
2222
wfState := workflowstate.WorkflowState(ctx)
2323
scheduleEventID := wfState.GetNextScheduleEventID()
2424

25-
if Replaying(ctx) {
26-
// There has to be a message in the history with the result, create a new future
27-
// and block on it
28-
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(future))
29-
return future
30-
}
25+
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(future))
3126

32-
// Execute side effect
33-
r := f(ctx)
27+
cmd := command.NewSideEffectCommand(scheduleEventID)
28+
wfState.AddCommand(cmd)
3429

35-
// Create command to add it to the history
36-
payload, err := converter.DefaultConverter.To(r)
37-
if err != nil {
38-
future.Set(*new(TResult), err)
39-
}
30+
if !Replaying(ctx) {
31+
// Execute side effect
32+
r := f(ctx)
4033

41-
cmd := command.NewSideEffectCommand(scheduleEventID, payload)
42-
wfState.AddCommand(cmd)
34+
payload, err := converter.DefaultConverter.To(r)
35+
if err != nil {
36+
future.Set(*new(TResult), err)
37+
}
4338

44-
future.Set(r, nil)
39+
cmd.SetResult(payload)
40+
future.Set(r, nil)
41+
}
4542

4643
return future
4744
}

0 commit comments

Comments
 (0)