@@ -262,12 +262,20 @@ func (e *executor) handleWorkflowTaskStarted(event history.Event, a *history.Wor
262
262
263
263
func (e * executor ) handleActivityScheduled (event history.Event , a * history.ActivityScheduledAttributes ) error {
264
264
c := e .workflowState .RemoveCommandByEventID (event .ScheduleEventID )
265
- if c != nil {
266
- // Ensure the same activity was scheduled again
267
- ca := c .Attr .(* command.ScheduleActivityTaskCommandAttr )
268
- if a .Name != ca .Name {
269
- return fmt .Errorf ("previous workflow execution scheduled different type of activity: %s, %s" , a .Name , ca .Name )
270
- }
265
+
266
+ // Ensure activity
267
+ if c == nil {
268
+ return fmt .Errorf ("previous workflow execution scheduled an activity which could not be found" )
269
+ }
270
+
271
+ if c .Type != command .CommandType_ScheduleActivity {
272
+ return fmt .Errorf ("previous workflow execution scheduled an activity, this time: %v" , c .Type )
273
+ }
274
+
275
+ // Ensure the same activity was scheduled again
276
+ ca := c .Attr .(* command.ScheduleActivityTaskCommandAttr )
277
+ if a .Name != ca .Name {
278
+ return fmt .Errorf ("previous workflow execution scheduled different type of activity: %s, %s" , a .Name , ca .Name )
271
279
}
272
280
273
281
return nil
@@ -276,7 +284,7 @@ func (e *executor) handleActivityScheduled(event history.Event, a *history.Activ
276
284
func (e * executor ) handleActivityCompleted (event history.Event , a * history.ActivityCompletedAttributes ) error {
277
285
f , ok := e .workflowState .FutureByScheduleEventID (event .ScheduleEventID )
278
286
if ! ok {
279
- return nil
287
+ return fmt . Errorf ( "could not find pending future for activity completion" )
280
288
}
281
289
282
290
e .workflowState .RemoveCommandByEventID (event .ScheduleEventID )
@@ -291,7 +299,7 @@ func (e *executor) handleActivityCompleted(event history.Event, a *history.Activ
291
299
func (e * executor ) handleActivityFailed (event history.Event , a * history.ActivityFailedAttributes ) error {
292
300
f , ok := e .workflowState .FutureByScheduleEventID (event .ScheduleEventID )
293
301
if ! ok {
294
- return errors .New ("no pending future found for activity failed event" )
302
+ return errors .New ("no pending future for activity failed event" )
295
303
}
296
304
297
305
e .workflowState .RemoveCommandByEventID (event .ScheduleEventID )
@@ -316,7 +324,14 @@ func (e *executor) handleTimerFired(event history.Event, a *history.TimerFiredAt
316
324
return nil
317
325
}
318
326
319
- e .workflowState .RemoveCommandByEventID (event .ScheduleEventID )
327
+ c := e .workflowState .RemoveCommandByEventID (event .ScheduleEventID )
328
+ if c == nil {
329
+ return fmt .Errorf ("previous workflow execution scheduled a timer" )
330
+ }
331
+
332
+ if c .Type != command .CommandType_ScheduleTimer {
333
+ return fmt .Errorf ("previous workflow execution scheduled a timer, this time: %v" , c .Type )
334
+ }
320
335
321
336
if err := f (nil , nil ); err != nil {
322
337
return fmt .Errorf ("setting result: %w" , err )
@@ -343,27 +358,35 @@ func (e *executor) handleTimerCanceled(event history.Event, a *history.TimerCanc
343
358
344
359
func (e * executor ) handleSubWorkflowScheduled (event history.Event , a * history.SubWorkflowScheduledAttributes ) error {
345
360
c := e .workflowState .RemoveCommandByEventID (event .ScheduleEventID )
346
- if c != nil {
347
- ca := c .Attr .(* command.ScheduleSubWorkflowCommandAttr )
348
- if a .Name != ca .Name {
349
- return errors .New ("previous workflow execution scheduled a different sub workflow" )
350
- }
361
+ if c == nil {
362
+ return fmt .Errorf ("previous workflow execution scheduled a sub workflow" )
363
+ }
351
364
352
- // Set correct InstanceID here.
353
- // TODO: see if we can provide better support for commands here and find a better place to store
354
- // this message.
355
- ca .Instance = a .SubWorkflowInstance
365
+ if c .Type != command .CommandType_ScheduleSubWorkflow {
366
+ return fmt .Errorf ("previous workflow execution scheduled a sub workflow, this time: %v" , c .Type )
356
367
}
357
368
358
- // TOOD: If the command cannot be found, raise error?
369
+ ca := c .Attr .(* command.ScheduleSubWorkflowCommandAttr )
370
+ if a .Name != ca .Name {
371
+ return fmt .Errorf ("previous workflow execution scheduled different type of sub workflow: %s, %s" , a .Name , ca .Name )
372
+ }
373
+
374
+ // Set correct InstanceID here.
375
+ // TODO: see if we can provide better support for commands here and find a better place to store
376
+ // this message.
377
+ ca .Instance = a .SubWorkflowInstance
359
378
360
379
return nil
361
380
}
362
381
363
382
func (e * executor ) handleSubWorkflowCancellationRequest (event history.Event , a * history.SubWorkflowCancellationRequestedAttributes ) error {
364
383
c := e .workflowState .RemoveCommandByEventID (event .ScheduleEventID )
365
- if c != nil {
366
- return fmt .Errorf ("expected to find a sub workflow cancellation event, instead got: %v" , event .Type )
384
+ if c == nil {
385
+ return fmt .Errorf ("previous workflow execution cancelled a sub-workflow execution" )
386
+ }
387
+
388
+ if c .Type != command .CommandType_CancelSubWorkflow {
389
+ return fmt .Errorf ("previous workflow execution cancelled a sub-workflow execution, this time: %v" , c .Type )
367
390
}
368
391
369
392
return e .workflow .Continue (e .workflowCtx )
@@ -441,7 +464,7 @@ func (e *executor) processCommands(ctx context.Context, t *task.Workflow) (bool,
441
464
c .State = command .CommandState_Committed
442
465
443
466
switch c .Type {
444
- case command .CommandType_ScheduleActivityTask :
467
+ case command .CommandType_ScheduleActivity :
445
468
a := c .Attr .(* command.ScheduleActivityTaskCommandAttr )
446
469
447
470
scheduleActivityEvent := e .createNewEvent (
0 commit comments