Skip to content

Commit d8bee34

Browse files
committed
Remove unnecessary command removal
1 parent ad7df0d commit d8bee34

File tree

1 file changed

+11
-24
lines changed

1 file changed

+11
-24
lines changed

internal/workflow/executor.go

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ func (e *executor) handleActivityScheduled(event history.Event, a *history.Activ
269269
}
270270

271271
if c.Type != command.CommandType_ScheduleActivity {
272-
return fmt.Errorf("previous workflow execution scheduled an activity, this time: %v", c.Type)
272+
return fmt.Errorf("previous workflow execution scheduled an activity, not: %v", c.Type)
273273
}
274274

275275
// Ensure the same activity was scheduled again
@@ -287,7 +287,6 @@ func (e *executor) handleActivityCompleted(event history.Event, a *history.Activ
287287
return fmt.Errorf("could not find pending future for activity completion")
288288
}
289289

290-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
291290
err := f(a.Result, nil)
292291
if err != nil {
293292
return fmt.Errorf("setting result: %w", err)
@@ -302,8 +301,6 @@ func (e *executor) handleActivityFailed(event history.Event, a *history.Activity
302301
return errors.New("no pending future for activity failed event")
303302
}
304303

305-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
306-
307304
if err := f(nil, errors.New(a.Reason)); err != nil {
308305
return fmt.Errorf("setting result: %w", err)
309306
}
@@ -312,7 +309,14 @@ func (e *executor) handleActivityFailed(event history.Event, a *history.Activity
312309
}
313310

314311
func (e *executor) handleTimerScheduled(event history.Event, a *history.TimerScheduledAttributes) error {
315-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
312+
c := e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
313+
if c == nil {
314+
return fmt.Errorf("previous workflow execution scheduled a timer")
315+
}
316+
317+
if c.Type != command.CommandType_ScheduleTimer {
318+
return fmt.Errorf("previous workflow execution scheduled a timer, not: %v", c.Type)
319+
}
316320

317321
return nil
318322
}
@@ -324,15 +328,6 @@ func (e *executor) handleTimerFired(event history.Event, a *history.TimerFiredAt
324328
return nil
325329
}
326330

327-
c := e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
328-
if c == nil {
329-
return fmt.Errorf("previous workflow execution scheduled a timer")
330-
}
331-
332-
if c.Type != command.CommandType_ScheduleTimer {
333-
return fmt.Errorf("previous workflow execution scheduled a timer, this time: %v", c.Type)
334-
}
335-
336331
if err := f(nil, nil); err != nil {
337332
return fmt.Errorf("setting result: %w", err)
338333
}
@@ -347,8 +342,6 @@ func (e *executor) handleTimerCanceled(event history.Event, a *history.TimerCanc
347342
return nil
348343
}
349344

350-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
351-
352345
if err := f(nil, nil); err != nil {
353346
return fmt.Errorf("setting result: %w", err)
354347
}
@@ -363,7 +356,7 @@ func (e *executor) handleSubWorkflowScheduled(event history.Event, a *history.Su
363356
}
364357

365358
if c.Type != command.CommandType_ScheduleSubWorkflow {
366-
return fmt.Errorf("previous workflow execution scheduled a sub workflow, this time: %v", c.Type)
359+
return fmt.Errorf("previous workflow execution scheduled a sub workflow, not: %v", c.Type)
367360
}
368361

369362
ca := c.Attr.(*command.ScheduleSubWorkflowCommandAttr)
@@ -386,7 +379,7 @@ func (e *executor) handleSubWorkflowCancellationRequest(event history.Event, a *
386379
}
387380

388381
if c.Type != command.CommandType_CancelSubWorkflow {
389-
return fmt.Errorf("previous workflow execution cancelled a sub-workflow execution, this time: %v", c.Type)
382+
return fmt.Errorf("previous workflow execution cancelled a sub-workflow execution, not: %v", c.Type)
390383
}
391384

392385
return e.workflow.Continue(e.workflowCtx)
@@ -398,8 +391,6 @@ func (e *executor) handleSubWorkflowFailed(event history.Event, a *history.SubWo
398391
return errors.New("no pending future found for sub workflow failed event")
399392
}
400393

401-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
402-
403394
if err := f(nil, errors.New(a.Error)); err != nil {
404395
return fmt.Errorf("setting result: %w", err)
405396
}
@@ -413,8 +404,6 @@ func (e *executor) handleSubWorkflowCompleted(event history.Event, a *history.Su
413404
return errors.New("no pending future found for sub workflow completed event")
414405
}
415406

416-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
417-
418407
if err := f(a.Result, nil); err != nil {
419408
return fmt.Errorf("setting result: %w", err)
420409
}
@@ -426,8 +415,6 @@ func (e *executor) handleSignalReceived(event history.Event, a *history.SignalRe
426415
// Send signal to workflow channel
427416
workflowstate.ReceiveSignal(e.workflowCtx, e.workflowState, a.Name, a.Arg)
428417

429-
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
430-
431418
return e.workflow.Continue(e.workflowCtx)
432419
}
433420

0 commit comments

Comments
 (0)