Skip to content

Commit ea3e55d

Browse files
authored
Merge pull request #51 from cschleiden/workflow-errors
Transition workflow to an error state when encountering a panic or error
2 parents e3a31d6 + 0b303df commit ea3e55d

File tree

5 files changed

+101
-55
lines changed

5 files changed

+101
-55
lines changed

internal/sync/coroutine.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ func newState() *coState {
8585
return &coState{
8686
blocking: make(chan bool, 1),
8787
unblock: make(chan bool),
88-
logger: log.New(io.Discard, "[co]", log.LstdFlags),
88+
// Mostly used while debugging issues, default to discarding log messages
89+
logger: log.New(io.Discard, "[co]", log.LstdFlags),
8990
// logger: log.New(os.Stderr, fmt.Sprintf("[co %v]", i), log.Lmsgprefix|log.Ltime),
9091
deadlockDetection: DeadlockDetection,
9192
}

internal/tester/tester_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/cschleiden/go-workflows/internal/sync"
1011
"github.com/cschleiden/go-workflows/workflow"
1112
"github.com/stretchr/testify/mock"
1213
"github.com/stretchr/testify/require"
@@ -37,7 +38,7 @@ func Test_WorkflowBlocked(t *testing.T) {
3738
}
3839

3940
func workflowBlocked(ctx workflow.Context) error {
40-
var f workflow.Future[int]
41+
f := sync.NewFuture[int]()
4142
f.Get(ctx)
4243

4344
return nil

internal/workflow/executor.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
6767

6868
e.workflowState.ClearCommands()
6969

70+
skipNewEvents := false
71+
7072
if t.LastSequenceID > e.lastSequenceID {
7173
e.logger.Debug("Task has newer history than current state, fetching and replaying history", "task_sequence_id", t.LastSequenceID, "sequence_id", e.lastSequenceID)
7274

@@ -77,7 +79,10 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
7779

7880
if err := e.replayHistory(h); err != nil {
7981
e.logger.Error("Error while replaying history", "error", err)
80-
return nil, fmt.Errorf("replaying history: %w", err)
82+
83+
// Fail workflow with an error. Skip executing new events, but still go through the commands
84+
e.workflowCompleted(nil, err)
85+
skipNewEvents = true
8186
}
8287

8388
if t.LastSequenceID != e.lastSequenceID {
@@ -87,18 +92,24 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
8792
return nil, fmt.Errorf("task has older history than current state, cannot execute")
8893
}
8994

90-
// Always pad the received events with WorkflowTaskStarted/Finished evnets to indicate the execution
95+
// Always add a WorkflowTaskStarted event before executing new tasks
9196
toExecute := []history.Event{e.createNewEvent(history.EventType_WorkflowTaskStarted, &history.WorkflowTaskStartedAttributes{})}
97+
executedEvents := toExecute
98+
9299
toExecute = append(toExecute, t.NewEvents...)
93100

94101
// Execute new events received from the backend
95-
if err := e.executeNewEvents(toExecute); err != nil {
96-
e.logger.Error("Error while executing new events", "error", err)
97-
return nil, fmt.Errorf("executing new events: %w", err)
98-
}
102+
if !skipNewEvents {
103+
var err error
104+
executedEvents, err = e.executeNewEvents(toExecute)
105+
if err != nil {
106+
e.logger.Error("Error while executing new events", "error", err)
99107

100-
executedEvents := toExecute
108+
e.workflowCompleted(nil, err)
109+
}
110+
}
101111

112+
// Process any commands added while executing new events
102113
completed, newCommandEvents, activityEvents, workflowEvents, err := e.processCommands(ctx, t)
103114
if err != nil {
104115
return nil, fmt.Errorf("processing commands: %w", err)
@@ -115,6 +126,8 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
115126
"task_id", t.ID,
116127
"instance_id", t.WorkflowInstance.InstanceID,
117128
"executed", len(executedEvents),
129+
"last_sequence_id", e.lastSequenceID,
130+
"completed", e.workflow.Completed(),
118131
)
119132

120133
return &ExecutionResult{
@@ -129,7 +142,7 @@ func (e *executor) replayHistory(history []history.Event) error {
129142
e.workflowState.SetReplaying(true)
130143
for _, event := range history {
131144
if err := e.executeEvent(event); err != nil {
132-
return fmt.Errorf("executeing history event: %w", err)
145+
return err
133146
}
134147

135148
e.lastSequenceID = event.SequenceID
@@ -138,22 +151,20 @@ func (e *executor) replayHistory(history []history.Event) error {
138151
return nil
139152
}
140153

141-
func (e *executor) executeNewEvents(newEvents []history.Event) error {
154+
func (e *executor) executeNewEvents(newEvents []history.Event) ([]history.Event, error) {
142155
e.workflowState.SetReplaying(false)
143156

144-
for _, event := range newEvents {
157+
for i, event := range newEvents {
145158
if err := e.executeEvent(event); err != nil {
146-
return fmt.Errorf("executing event: %w", err)
159+
return newEvents[:i], err
147160
}
148161
}
149162

150163
if e.workflow.Completed() {
151-
if err := e.workflowCompleted(e.workflow.Result(), e.workflow.Error()); err != nil {
152-
return err
153-
}
164+
e.workflowCompleted(e.workflow.Result(), e.workflow.Error())
154165
}
155166

156-
return nil
167+
return newEvents, nil
157168
}
158169

159170
func (e *executor) Close() {
@@ -373,13 +384,11 @@ func (e *executor) handleSideEffectResult(event history.Event, a *history.SideEf
373384
return e.workflow.Continue(e.workflowCtx)
374385
}
375386

376-
func (e *executor) workflowCompleted(result payload.Payload, err error) error {
387+
func (e *executor) workflowCompleted(result payload.Payload, err error) {
377388
eventId := e.workflowState.GetNextScheduleEventID()
378389

379390
cmd := command.NewCompleteWorkflowCommand(eventId, result, err)
380391
e.workflowState.AddCommand(&cmd)
381-
382-
return nil
383392
}
384393

385394
func (e *executor) processCommands(ctx context.Context, t *task.Workflow) (bool, []history.Event, []history.Event, []history.WorkflowEvent, error) {

internal/workflow/executor_test.go

Lines changed: 68 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/cschleiden/go-workflows/internal/command"
1212
"github.com/cschleiden/go-workflows/internal/converter"
1313
"github.com/cschleiden/go-workflows/internal/core"
14+
"github.com/cschleiden/go-workflows/internal/fn"
1415
"github.com/cschleiden/go-workflows/internal/history"
1516
"github.com/cschleiden/go-workflows/internal/logger"
1617
"github.com/cschleiden/go-workflows/internal/payload"
@@ -29,14 +30,14 @@ func (t *testHistoryProvider) GetWorkflowInstanceHistory(ctx context.Context, in
2930
return t.history, nil
3031
}
3132

32-
func newExecutor(r *Registry, i *core.WorkflowInstance, historyProvider WorkflowHistoryProvider) *executor {
33+
func newExecutor(r *Registry, i *core.WorkflowInstance, workflow interface{}, historyProvider WorkflowHistoryProvider) *executor {
3334
logger := logger.NewDefaultLogger()
3435
s := workflowstate.NewWorkflowState(i, logger, clock.New())
3536
wfCtx, cancel := sync.WithCancel(workflowstate.WithWorkflowState(sync.Background(), s))
3637

3738
return &executor{
3839
registry: r,
39-
workflow: NewWorkflow(reflect.ValueOf(workflow1)),
40+
workflow: NewWorkflow(reflect.ValueOf(workflow)),
4041
historyProvider: historyProvider,
4142
workflowState: s,
4243
workflowCtx: wfCtx,
@@ -52,15 +53,13 @@ func activity1(ctx context.Context, r int) (int, error) {
5253
return r, nil
5354
}
5455

55-
var workflowHits int
56-
57-
func workflow1(ctx sync.Context) error {
58-
workflowHits++
59-
60-
return nil
61-
}
62-
6356
func Test_ExecuteWorkflow(t *testing.T) {
57+
var workflowHits int
58+
workflow1 := func(ctx sync.Context) error {
59+
workflowHits++
60+
return nil
61+
}
62+
6463
r := NewRegistry()
6564

6665
r.RegisterWorkflow(workflow1)
@@ -74,14 +73,14 @@ func Test_ExecuteWorkflow(t *testing.T) {
7473
time.Now(),
7574
history.EventType_WorkflowExecutionStarted,
7675
&history.ExecutionStartedAttributes{
77-
Name: "workflow1",
76+
Name: fn.Name(workflow1),
7877
Inputs: []payload.Payload{},
7978
},
8079
),
8180
},
8281
}
8382

84-
e := newExecutor(r, task.WorkflowInstance, &testHistoryProvider{})
83+
e := newExecutor(r, task.WorkflowInstance, workflow1, &testHistoryProvider{})
8584

8685
_, err := e.ExecuteTask(context.Background(), task)
8786
require.NoError(t, err)
@@ -125,13 +124,13 @@ func Test_ReplayWorkflowWithActivityResult(t *testing.T) {
125124
LastSequenceID: 3,
126125
}
127126

128-
e := newExecutor(r, task.WorkflowInstance, &testHistoryProvider{[]history.Event{
127+
e := newExecutor(r, task.WorkflowInstance, workflowWithActivity, &testHistoryProvider{[]history.Event{
129128
history.NewHistoryEvent(
130129
1,
131130
time.Now(),
132131
history.EventType_WorkflowExecutionStarted,
133132
&history.ExecutionStartedAttributes{
134-
Name: "workflowWithActivity",
133+
Name: fn.Name(workflowWithActivity),
135134
Inputs: []payload.Payload{inputs},
136135
},
137136
),
@@ -181,14 +180,14 @@ func Test_ExecuteWorkflowWithActivityCommand(t *testing.T) {
181180
time.Now(),
182181
history.EventType_WorkflowExecutionStarted,
183182
&history.ExecutionStartedAttributes{
184-
Name: "workflowWithActivity",
183+
Name: fn.Name(workflowWithActivity),
185184
Inputs: []payload.Payload{},
186185
},
187186
),
188187
},
189188
}
190189

191-
e := newExecutor(r, task.WorkflowInstance, &testHistoryProvider{})
190+
e := newExecutor(r, task.WorkflowInstance, workflowWithActivity, &testHistoryProvider{})
192191

193192
e.ExecuteTask(context.Background(), task)
194193

@@ -237,14 +236,14 @@ func Test_ExecuteWorkflowWithTimer(t *testing.T) {
237236
time.Now(),
238237
history.EventType_WorkflowExecutionStarted,
239238
&history.ExecutionStartedAttributes{
240-
Name: "workflowWithTimer",
239+
Name: fn.Name(workflowWithTimer),
241240
Inputs: []payload.Payload{},
242241
},
243242
),
244243
},
245244
}
246245

247-
e := newExecutor(r, task.WorkflowInstance, &testHistoryProvider{})
246+
e := newExecutor(r, task.WorkflowInstance, workflowWithTimer, &testHistoryProvider{})
248247

249248
e.ExecuteTask(context.Background(), task)
250249

@@ -293,14 +292,14 @@ func Test_ExecuteWorkflowWithSelector(t *testing.T) {
293292
time.Now(),
294293
history.EventType_WorkflowExecutionStarted,
295294
&history.ExecutionStartedAttributes{
296-
Name: "workflowWithSelector",
295+
Name: fn.Name(workflowWithSelector),
297296
Inputs: []payload.Payload{},
298297
},
299298
),
300299
},
301300
}
302301

303-
e := newExecutor(r, task.WorkflowInstance, &testHistoryProvider{})
302+
e := newExecutor(r, task.WorkflowInstance, workflowWithSelector, &testHistoryProvider{})
304303

305304
e.ExecuteTask(context.Background(), task)
306305

@@ -330,7 +329,7 @@ func Test_ExecuteNewEvents(t *testing.T) {
330329
time.Now(),
331330
history.EventType_WorkflowExecutionStarted,
332331
&history.ExecutionStartedAttributes{
333-
Name: "workflowWithActivity",
332+
Name: fn.Name(workflowWithActivity),
334333
Inputs: []payload.Payload{inputs},
335334
},
336335
),
@@ -346,7 +345,7 @@ func Test_ExecuteNewEvents(t *testing.T) {
346345
},
347346
}
348347

349-
e := newExecutor(r, oldTask.WorkflowInstance, &testHistoryProvider{[]history.Event{}})
348+
e := newExecutor(r, oldTask.WorkflowInstance, workflowWithActivity, &testHistoryProvider{[]history.Event{}})
350349

351350
taskResult, err := e.ExecuteTask(context.Background(), oldTask)
352351

@@ -385,21 +384,21 @@ func Test_ExecuteNewEvents(t *testing.T) {
385384
require.Len(t, e.workflowState.Commands(), 1)
386385
}
387386

388-
var workflowSignalHits int
387+
func Test_ExecuteWorkflowWithSignal(t *testing.T) {
388+
r := NewRegistry()
389389

390-
func workflowWithSignal1(ctx sync.Context) error {
391-
c := wf.NewSignalChannel[string](ctx, "signal1")
392-
c.Receive(ctx)
390+
var workflowSignalHits int
393391

394-
workflowSignalHits++
392+
workflowWithSignal := func(ctx sync.Context) error {
393+
c := wf.NewSignalChannel[string](ctx, "signal1")
394+
c.Receive(ctx)
395395

396-
return nil
397-
}
396+
workflowSignalHits++
398397

399-
func Test_ExecuteWorkflowWithSignal(t *testing.T) {
400-
r := NewRegistry()
398+
return nil
399+
}
401400

402-
r.RegisterWorkflow(workflowWithSignal1)
401+
r.RegisterWorkflow(workflowWithSignal)
403402

404403
s, err := converter.DefaultConverter.To("")
405404
require.NoError(t, err)
@@ -412,7 +411,7 @@ func Test_ExecuteWorkflowWithSignal(t *testing.T) {
412411
time.Now(),
413412
history.EventType_WorkflowExecutionStarted,
414413
&history.ExecutionStartedAttributes{
415-
Name: "workflowWithSignal1",
414+
Name: fn.Name(workflowWithSignal),
416415
Inputs: []payload.Payload{},
417416
},
418417
),
@@ -427,7 +426,7 @@ func Test_ExecuteWorkflowWithSignal(t *testing.T) {
427426
},
428427
}
429428

430-
e := newExecutor(r, task.WorkflowInstance, &testHistoryProvider{})
429+
e := newExecutor(r, task.WorkflowInstance, workflowWithSignal, &testHistoryProvider{})
431430

432431
_, err = e.ExecuteTask(context.Background(), task)
433432
require.NoError(t, err)
@@ -437,6 +436,40 @@ func Test_ExecuteWorkflowWithSignal(t *testing.T) {
437436
require.Len(t, e.workflowState.Commands(), 1)
438437
}
439438

439+
func Test_CompletesWorkflowOnError(t *testing.T) {
440+
r := NewRegistry()
441+
442+
workflowPanic := func(ctx sync.Context) error {
443+
panic("wf error")
444+
}
445+
446+
r.RegisterWorkflow(workflowPanic)
447+
448+
task1 := &task.Workflow{
449+
ID: "taskid",
450+
WorkflowInstance: core.NewWorkflowInstance("instanceID", "executionID"),
451+
NewEvents: []history.Event{
452+
history.NewPendingEvent(
453+
time.Now(),
454+
history.EventType_WorkflowExecutionStarted,
455+
&history.ExecutionStartedAttributes{
456+
Name: fn.Name(workflowPanic),
457+
Inputs: []payload.Payload{},
458+
},
459+
),
460+
},
461+
}
462+
463+
historyProvider := &testHistoryProvider{[]history.Event{}}
464+
e := newExecutor(r, task1.WorkflowInstance, workflowPanic, historyProvider)
465+
466+
r1, err := e.ExecuteTask(context.Background(), task1)
467+
require.NoError(t, err)
468+
require.True(t, e.workflow.Completed())
469+
require.Len(t, e.workflowState.Commands(), 1)
470+
require.True(t, r1.Completed)
471+
}
472+
440473
func Test_ClearCommandsBetweenRuns(t *testing.T) {
441474
r := NewRegistry()
442475

@@ -461,7 +494,7 @@ func Test_ClearCommandsBetweenRuns(t *testing.T) {
461494
}
462495

463496
historyProvider := &testHistoryProvider{[]history.Event{}}
464-
e := newExecutor(r, task1.WorkflowInstance, historyProvider)
497+
e := newExecutor(r, task1.WorkflowInstance, workflowWithActivity, historyProvider)
465498

466499
r1, err := e.ExecuteTask(context.Background(), task1)
467500
require.NoError(t, err)

0 commit comments

Comments
 (0)