Skip to content

Commit 9af048d

Browse files
authored
Merge pull request #60 from cschleiden/improve-dx
Improve error messages when workflow replays yield different commands
2 parents 5815866 + d8bee34 commit 9af048d

File tree

5 files changed

+72
-41
lines changed

5 files changed

+72
-41
lines changed

internal/command/command.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ type CommandType int
1313
const (
1414
_ CommandType = iota
1515

16-
CommandType_ScheduleActivityTask
16+
CommandType_ScheduleActivity
1717

1818
CommandType_ScheduleSubWorkflow
1919
CommandType_CancelSubWorkflow
@@ -26,6 +26,27 @@ const (
2626
CommandType_CompleteWorkflow
2727
)
2828

29+
func (ct CommandType) String() string {
30+
switch ct {
31+
case CommandType_ScheduleActivity:
32+
return "ScheduleActivityTask"
33+
case CommandType_ScheduleSubWorkflow:
34+
return "ScheduleSubWorkflow"
35+
case CommandType_CancelSubWorkflow:
36+
return "CancelSubWorkflow"
37+
case CommandType_ScheduleTimer:
38+
return "ScheduleTimer"
39+
case CommandType_CancelTimer:
40+
return "CancelTimer"
41+
case CommandType_SideEffect:
42+
return "SideEffect"
43+
case CommandType_CompleteWorkflow:
44+
return "CompleteWorkflow"
45+
}
46+
47+
return ""
48+
}
49+
2950
type CommandState int
3051

3152
const (
@@ -52,7 +73,7 @@ type ScheduleActivityTaskCommandAttr struct {
5273
func NewScheduleActivityTaskCommand(id int64, name string, inputs []payload.Payload) Command {
5374
return Command{
5475
ID: id,
55-
Type: CommandType_ScheduleActivityTask,
76+
Type: CommandType_ScheduleActivity,
5677
Attr: &ScheduleActivityTaskCommandAttr{
5778
Name: name,
5879
Inputs: inputs,

internal/sync/coroutine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func newState() *coState {
8585
return &coState{
8686
blocking: make(chan bool, 1),
8787
unblock: make(chan bool),
88-
// Mostly used while debugging issues, default to discarding log messages
88+
// Only used while debugging issues, default to discarding log messages
8989
logger: log.New(io.Discard, "[co]", log.LstdFlags),
9090
// logger: log.New(os.Stderr, fmt.Sprintf("[co %v]", i), log.Lmsgprefix|log.Ltime),
9191
deadlockDetection: DeadlockDetection,

internal/sync/future.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ type Future[T any] interface {
1212
type SettableFuture[T any] interface {
1313
Future[T]
1414

15-
// Set stores the value and unblocks any waiting consumers
15+
// Set stores the value
1616
Set(v T, err error) error
1717
}
1818

internal/workflow/executor.go

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -262,12 +262,20 @@ func (e *executor) handleWorkflowTaskStarted(event history.Event, a *history.Wor
262262

263263
func (e *executor) handleActivityScheduled(event history.Event, a *history.ActivityScheduledAttributes) error {
264264
c := e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
265-
if c != nil {
266-
// Ensure the same activity was scheduled again
267-
ca := c.Attr.(*command.ScheduleActivityTaskCommandAttr)
268-
if a.Name != ca.Name {
269-
return fmt.Errorf("previous workflow execution scheduled different type of activity: %s, %s", a.Name, ca.Name)
270-
}
265+
266+
// Ensure activity
267+
if c == nil {
268+
return fmt.Errorf("previous workflow execution scheduled an activity which could not be found")
269+
}
270+
271+
if c.Type != command.CommandType_ScheduleActivity {
272+
return fmt.Errorf("previous workflow execution scheduled an activity, not: %v", c.Type)
273+
}
274+
275+
// Ensure the same activity was scheduled again
276+
ca := c.Attr.(*command.ScheduleActivityTaskCommandAttr)
277+
if a.Name != ca.Name {
278+
return fmt.Errorf("previous workflow execution scheduled different type of activity: %s, %s", a.Name, ca.Name)
271279
}
272280

273281
return nil
@@ -276,10 +284,9 @@ func (e *executor) handleActivityScheduled(event history.Event, a *history.Activ
276284
func (e *executor) handleActivityCompleted(event history.Event, a *history.ActivityCompletedAttributes) error {
277285
f, ok := e.workflowState.FutureByScheduleEventID(event.ScheduleEventID)
278286
if !ok {
279-
return nil
287+
return fmt.Errorf("could not find pending future for activity completion")
280288
}
281289

282-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
283290
err := f(a.Result, nil)
284291
if err != nil {
285292
return fmt.Errorf("setting result: %w", err)
@@ -291,11 +298,9 @@ func (e *executor) handleActivityCompleted(event history.Event, a *history.Activ
291298
func (e *executor) handleActivityFailed(event history.Event, a *history.ActivityFailedAttributes) error {
292299
f, ok := e.workflowState.FutureByScheduleEventID(event.ScheduleEventID)
293300
if !ok {
294-
return errors.New("no pending future found for activity failed event")
301+
return errors.New("no pending future for activity failed event")
295302
}
296303

297-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
298-
299304
if err := f(nil, errors.New(a.Reason)); err != nil {
300305
return fmt.Errorf("setting result: %w", err)
301306
}
@@ -304,7 +309,14 @@ func (e *executor) handleActivityFailed(event history.Event, a *history.Activity
304309
}
305310

306311
func (e *executor) handleTimerScheduled(event history.Event, a *history.TimerScheduledAttributes) error {
307-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
312+
c := e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
313+
if c == nil {
314+
return fmt.Errorf("previous workflow execution scheduled a timer")
315+
}
316+
317+
if c.Type != command.CommandType_ScheduleTimer {
318+
return fmt.Errorf("previous workflow execution scheduled a timer, not: %v", c.Type)
319+
}
308320

309321
return nil
310322
}
@@ -316,8 +328,6 @@ func (e *executor) handleTimerFired(event history.Event, a *history.TimerFiredAt
316328
return nil
317329
}
318330

319-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
320-
321331
if err := f(nil, nil); err != nil {
322332
return fmt.Errorf("setting result: %w", err)
323333
}
@@ -332,8 +342,6 @@ func (e *executor) handleTimerCanceled(event history.Event, a *history.TimerCanc
332342
return nil
333343
}
334344

335-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
336-
337345
if err := f(nil, nil); err != nil {
338346
return fmt.Errorf("setting result: %w", err)
339347
}
@@ -343,27 +351,35 @@ func (e *executor) handleTimerCanceled(event history.Event, a *history.TimerCanc
343351

344352
func (e *executor) handleSubWorkflowScheduled(event history.Event, a *history.SubWorkflowScheduledAttributes) error {
345353
c := e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
346-
if c != nil {
347-
ca := c.Attr.(*command.ScheduleSubWorkflowCommandAttr)
348-
if a.Name != ca.Name {
349-
return errors.New("previous workflow execution scheduled a different sub workflow")
350-
}
354+
if c == nil {
355+
return fmt.Errorf("previous workflow execution scheduled a sub workflow")
356+
}
357+
358+
if c.Type != command.CommandType_ScheduleSubWorkflow {
359+
return fmt.Errorf("previous workflow execution scheduled a sub workflow, not: %v", c.Type)
360+
}
351361

352-
// Set correct InstanceID here.
353-
// TODO: see if we can provide better support for commands here and find a better place to store
354-
// this message.
355-
ca.Instance = a.SubWorkflowInstance
362+
ca := c.Attr.(*command.ScheduleSubWorkflowCommandAttr)
363+
if a.Name != ca.Name {
364+
return fmt.Errorf("previous workflow execution scheduled different type of sub workflow: %s, %s", a.Name, ca.Name)
356365
}
357366

358-
// TOOD: If the command cannot be found, raise error?
367+
// Set correct InstanceID here.
368+
// TODO: see if we can provide better support for commands here and find a better place to store
369+
// this message.
370+
ca.Instance = a.SubWorkflowInstance
359371

360372
return nil
361373
}
362374

363375
func (e *executor) handleSubWorkflowCancellationRequest(event history.Event, a *history.SubWorkflowCancellationRequestedAttributes) error {
364376
c := e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
365-
if c != nil {
366-
return fmt.Errorf("expected to find a sub workflow cancellation event, instead got: %v", event.Type)
377+
if c == nil {
378+
return fmt.Errorf("previous workflow execution cancelled a sub-workflow execution")
379+
}
380+
381+
if c.Type != command.CommandType_CancelSubWorkflow {
382+
return fmt.Errorf("previous workflow execution cancelled a sub-workflow execution, not: %v", c.Type)
367383
}
368384

369385
return e.workflow.Continue(e.workflowCtx)
@@ -375,8 +391,6 @@ func (e *executor) handleSubWorkflowFailed(event history.Event, a *history.SubWo
375391
return errors.New("no pending future found for sub workflow failed event")
376392
}
377393

378-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
379-
380394
if err := f(nil, errors.New(a.Error)); err != nil {
381395
return fmt.Errorf("setting result: %w", err)
382396
}
@@ -390,8 +404,6 @@ func (e *executor) handleSubWorkflowCompleted(event history.Event, a *history.Su
390404
return errors.New("no pending future found for sub workflow completed event")
391405
}
392406

393-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
394-
395407
if err := f(a.Result, nil); err != nil {
396408
return fmt.Errorf("setting result: %w", err)
397409
}
@@ -403,8 +415,6 @@ func (e *executor) handleSignalReceived(event history.Event, a *history.SignalRe
403415
// Send signal to workflow channel
404416
workflowstate.ReceiveSignal(e.workflowCtx, e.workflowState, a.Name, a.Arg)
405417

406-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
407-
408418
return e.workflow.Continue(e.workflowCtx)
409419
}
410420

@@ -441,7 +451,7 @@ func (e *executor) processCommands(ctx context.Context, t *task.Workflow) (bool,
441451
c.State = command.CommandState_Committed
442452

443453
switch c.Type {
444-
case command.CommandType_ScheduleActivityTask:
454+
case command.CommandType_ScheduleActivity:
445455
a := c.Attr.(*command.ScheduleActivityTaskCommandAttr)
446456

447457
scheduleActivityEvent := e.createNewEvent(

internal/workflow/executor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ func Test_ExecuteWorkflowWithActivityCommand(t *testing.T) {
200200
require.Equal(t, command.Command{
201201
ID: 1,
202202
State: command.CommandState_Committed,
203-
Type: command.CommandType_ScheduleActivityTask,
203+
Type: command.CommandType_ScheduleActivity,
204204
Attr: &command.ScheduleActivityTaskCommandAttr{
205205
Name: "activity1",
206206
Inputs: []payload.Payload{inputs},
@@ -309,7 +309,7 @@ func Test_ExecuteWorkflowWithSelector(t *testing.T) {
309309
require.Len(t, e.workflowState.Commands(), 2)
310310

311311
require.Equal(t, command.CommandType_ScheduleTimer, e.workflowState.Commands()[0].Type)
312-
require.Equal(t, command.CommandType_ScheduleActivityTask, e.workflowState.Commands()[1].Type)
312+
require.Equal(t, command.CommandType_ScheduleActivity, e.workflowState.Commands()[1].Type)
313313
}
314314

315315
func Test_ExecuteNewEvents(t *testing.T) {

0 commit comments

Comments
 (0)