Skip to content

Commit 118460a

Browse files
committed
Ensure cancel timer and sideeffect commands are created again on replay
1 parent 7eb7727 commit 118460a

File tree

1 file changed

+25
-0
lines changed

1 file changed

+25
-0
lines changed

internal/workflow/executor.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,19 @@ func (e *executor) handleTimerFired(event history.Event, a *history.TimerFiredAt
393393
}
394394

395395
func (e *executor) handleTimerCanceled(event history.Event, a *history.TimerCanceledAttributes) error {
396+
// Mark command as done and ensure we executed the same command
397+
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
398+
if c == nil {
399+
return fmt.Errorf("previous workflow execution canceled a timer")
400+
}
401+
402+
if _, ok := c.(*command.CancelTimerCommand); !ok {
403+
return fmt.Errorf("previous workflow execution canceled a timer, not: %v", c.Type())
404+
}
405+
406+
c.Done()
407+
408+
// Cancel a pending future
396409
f, ok := e.workflowState.FutureByScheduleEventID(event.ScheduleEventID)
397410
if !ok {
398411
// Timer already canceled ignore
@@ -475,6 +488,18 @@ func (e *executor) handleSignalReceived(event history.Event, a *history.SignalRe
475488
}
476489

477490
func (e *executor) handleSideEffectResult(event history.Event, a *history.SideEffectResultAttributes) error {
491+
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
492+
if c == nil {
493+
return fmt.Errorf("previous workflow execution scheduled a sub workflow")
494+
}
495+
496+
sec, ok := c.(*command.SideEffectCommand)
497+
if !ok {
498+
return fmt.Errorf("previous workflow execution scheduled a sub workflow, not: %v", c.Type())
499+
}
500+
501+
sec.Done()
502+
478503
f, ok := e.workflowState.FutureByScheduleEventID(event.ScheduleEventID)
479504
if !ok {
480505
return errors.New("no pending future found for side effect result event")

0 commit comments

Comments
 (0)