Skip to content

Commit 2830a14

Browse files
committed
React to updated command statemachine
1 parent fb0d530 commit 2830a14

File tree

5 files changed

+575
-539
lines changed

5 files changed

+575
-539
lines changed

internal/workflow/executor.go

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,14 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
152152
workflowEvents := make([]history.WorkflowEvent, 0)
153153

154154
for _, c := range e.workflowState.Commands() {
155-
if c.State() != command.CommandState_Pending {
155+
if c.State() == command.CommandState_Done {
156156
continue
157157
}
158158

159-
r := c.Commit(e.clock)
159+
r := c.Execute(e.clock)
160+
if r == nil {
161+
continue
162+
}
160163

161164
completed = completed || r.Completed
162165
newCommandEvents = append(newCommandEvents, r.Events...)
@@ -337,7 +340,7 @@ func (e *executor) handleActivityScheduled(event history.Event, a *history.Activ
337340
return fmt.Errorf("previous workflow execution scheduled different type of activity: %s, %s", a.Name, sac.Name)
338341
}
339342

340-
c.Done()
343+
c.Commit()
341344

342345
return nil
343346
}
@@ -353,6 +356,18 @@ func (e *executor) handleActivityCompleted(event history.Event, a *history.Activ
353356
return fmt.Errorf("setting activity completed result: %w", err)
354357
}
355358

359+
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
360+
if c == nil {
361+
return fmt.Errorf("previous workflow execution scheduled an activity which could not be found")
362+
}
363+
364+
sac, ok := c.(*command.ScheduleActivityCommand)
365+
if !ok {
366+
return fmt.Errorf("previous workflow execution scheduled an activity, not: %v", c.Type())
367+
}
368+
369+
sac.Done()
370+
356371
return e.workflow.Continue()
357372
}
358373

@@ -366,6 +381,18 @@ func (e *executor) handleActivityFailed(event history.Event, a *history.Activity
366381
return fmt.Errorf("setting activity failed result: %w", err)
367382
}
368383

384+
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
385+
if c == nil {
386+
return fmt.Errorf("previous workflow execution scheduled an activity which could not be found")
387+
}
388+
389+
sac, ok := c.(*command.ScheduleActivityCommand)
390+
if !ok {
391+
return fmt.Errorf("previous workflow execution scheduled an activity, not: %v", c.Type())
392+
}
393+
394+
sac.Done()
395+
369396
return e.workflow.Continue()
370397
}
371398

@@ -379,7 +406,7 @@ func (e *executor) handleTimerScheduled(event history.Event, a *history.TimerSch
379406
return fmt.Errorf("previous workflow execution scheduled a timer, not: %v", c.Type())
380407
}
381408

382-
c.Done()
409+
c.Commit()
383410

384411
return nil
385412
}
@@ -395,23 +422,34 @@ func (e *executor) handleTimerFired(event history.Event, a *history.TimerFiredAt
395422
return fmt.Errorf("setting timer fired result: %w", err)
396423
}
397424

425+
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
426+
if c == nil {
427+
return fmt.Errorf("no command found for timer fired event")
428+
}
429+
430+
if _, ok := c.(*command.ScheduleTimerCommand); !ok {
431+
return fmt.Errorf("schedule timer command not found, instead: %v", c.Type())
432+
}
433+
434+
c.Done()
435+
398436
return e.workflow.Continue()
399437
}
400438

401439
func (e *executor) handleTimerCanceled(event history.Event, a *history.TimerCanceledAttributes) error {
402-
// Mark command as done and ensure we executed the same command
403440
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
404441
if c == nil {
405442
return fmt.Errorf("previous workflow execution canceled a timer")
406443
}
407444

408-
if _, ok := c.(*command.CancelTimerCommand); !ok {
445+
stc, ok := c.(*command.ScheduleTimerCommand)
446+
if !ok {
409447
return fmt.Errorf("previous workflow execution canceled a timer, not: %v", c.Type())
410448
}
411449

412-
c.Done()
450+
stc.HandleCancel()
413451

414-
// Cancel a pending future
452+
// Cancel the pending future
415453
f, ok := e.workflowState.FutureByScheduleEventID(event.ScheduleEventID)
416454
if !ok {
417455
// Timer already canceled ignore
@@ -444,7 +482,7 @@ func (e *executor) handleSubWorkflowScheduled(event history.Event, a *history.Su
444482
// when the command was originally committed.
445483
sswc.Instance = a.SubWorkflowInstance
446484

447-
c.Done()
485+
c.Commit()
448486

449487
return nil
450488
}
@@ -455,11 +493,12 @@ func (e *executor) handleSubWorkflowCancellationRequest(event history.Event, a *
455493
return fmt.Errorf("previous workflow execution cancelled a sub-workflow execution")
456494
}
457495

458-
if _, ok := c.(*command.CancelSubWorkflowCommand); !ok {
496+
sswc, ok := c.(*command.ScheduleSubWorkflowCommand)
497+
if !ok {
459498
return fmt.Errorf("previous workflow execution cancelled a sub-workflow execution, not: %v", c.Type())
460499
}
461500

462-
c.Done()
501+
sswc.HandleCancel()
463502

464503
return e.workflow.Continue()
465504
}
@@ -474,6 +513,19 @@ func (e *executor) handleSubWorkflowFailed(event history.Event, a *history.SubWo
474513
return fmt.Errorf("setting sub workflow failed result: %w", err)
475514
}
476515

516+
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
517+
if c == nil {
518+
// TODO: Adjust
519+
return fmt.Errorf("previous workflow execution scheduled a sub-workflow execution")
520+
}
521+
522+
if _, ok := c.(*command.ScheduleSubWorkflowCommand); !ok {
523+
// TODO: Adjust
524+
return fmt.Errorf("previous workflow execution cancelled a sub-workflow execution, not: %v", c.Type())
525+
}
526+
527+
c.Done()
528+
477529
return e.workflow.Continue()
478530
}
479531

@@ -487,6 +539,19 @@ func (e *executor) handleSubWorkflowCompleted(event history.Event, a *history.Su
487539
return fmt.Errorf("setting sub workflow completed result: %w", err)
488540
}
489541

542+
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
543+
if c == nil {
544+
// TODO: Adjust
545+
return fmt.Errorf("previous workflow execution cancelled a sub-workflow execution")
546+
}
547+
548+
if _, ok := c.(*command.ScheduleSubWorkflowCommand); !ok {
549+
// TODO: Adjust
550+
return fmt.Errorf("previous workflow execution cancelled a sub-workflow execution, not: %v", c.Type())
551+
}
552+
553+
c.Done()
554+
490555
return e.workflow.Continue()
491556
}
492557

0 commit comments

Comments
 (0)