Skip to content

Commit 4312a4b

Browse files
committed
Panic if executor tries to execute event twice
1 parent db4dea4 commit 4312a4b

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

internal/workflow/executor.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,12 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
9797

9898
logger := e.logger.With("task_id", t.ID, "instance_id", t.WorkflowInstance.InstanceID)
9999

100-
logger.Debug("Executing workflow task")
100+
logger.Debug("Executing workflow task", "task_last_sequence_id", t.LastSequenceID)
101101

102102
skipNewEvents := false
103103

104104
if t.LastSequenceID > e.lastSequenceID {
105-
logger.Debug("Task has newer history than current state, fetching and replaying history", "task_sequence_id", t.LastSequenceID, "sequence_id", e.lastSequenceID)
105+
logger.Debug("Task has newer history than current state, fetching and replaying history", "task_sequence_id", t.LastSequenceID, "local_sequence_id", e.lastSequenceID)
106106

107107
h, err := e.historyProvider.GetWorkflowInstanceHistory(ctx, t.WorkflowInstance, &e.lastSequenceID)
108108
if err != nil {
@@ -188,6 +188,10 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
188188
func (e *executor) replayHistory(history []history.Event) error {
189189
e.workflowState.SetReplaying(true)
190190
for _, event := range history {
191+
if event.SequenceID < e.lastSequenceID {
192+
panic("history has older events than current state")
193+
}
194+
191195
if err := e.executeEvent(event); err != nil {
192196
return err
193197
}
@@ -216,6 +220,8 @@ func (e *executor) executeNewEvents(newEvents []history.Event) ([]history.Event,
216220

217221
func (e *executor) Close() {
218222
if e.workflow != nil {
223+
e.logger.Debug("Stopping workflow executor", "instance_id", e.workflowState.Instance().InstanceID)
224+
219225
// End workflow if running to prevent leaking goroutines
220226
e.workflow.Close()
221227
}

0 commit comments

Comments
 (0)