Skip to content

Commit 920d7b0

Browse files
authored
Merge pull request #43 from cschleiden/remove-pending-commands
Ignore future event when instance already ended or not found
2 parents dc7abfb + 43db5ef commit 920d7b0

File tree

1 file changed

+17
-4
lines changed

1 file changed

+17
-4
lines changed

backend/redis/workflow.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,31 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
4444

4545
for _, eventR := range result.([]interface{}) {
4646
eventStr := eventR.(string)
47-
var event futureEvent
48-
if err := json.Unmarshal([]byte(eventStr), &event); err != nil {
47+
var futureEvent futureEvent
48+
if err := json.Unmarshal([]byte(eventStr), &futureEvent); err != nil {
4949
return nil, errors.Wrap(err, "could not unmarshal event")
5050
}
5151

52-
msgID, err := addEventToStream(ctx, rb.rdb, pendingEventsKey(event.Instance.InstanceID), event.Event)
52+
instanceState, err := readInstance(ctx, rb.rdb, futureEvent.Instance.InstanceID)
53+
if err != nil {
54+
if err == backend.ErrInstanceNotFound {
55+
rb.options.Logger.Debug("Ignoring future event for non-existing instance", "instance_id", futureEvent.Instance.InstanceID, "event_id", futureEvent.Event.ID)
56+
} else {
57+
return nil, errors.Wrap(err, "could not read instance")
58+
}
59+
}
60+
61+
if instanceState.State != backend.WorkflowStateActive {
62+
rb.options.Logger.Debug("Ignoring future event for already completed instance", "instance_id", futureEvent.Instance.InstanceID, "event_id", futureEvent.Event.ID)
63+
}
64+
65+
msgID, err := addEventToStream(ctx, rb.rdb, pendingEventsKey(futureEvent.Instance.InstanceID), futureEvent.Event)
5366
if err != nil {
5467
return nil, errors.Wrap(err, "could not add future event to stream")
5568
}
5669

5770
// Instance now has at least one pending event, try to queue task
58-
if _, err := rb.workflowQueue.Enqueue(ctx, event.Instance.InstanceID, &workflowTaskData{
71+
if _, err := rb.workflowQueue.Enqueue(ctx, futureEvent.Instance.InstanceID, &workflowTaskData{
5972
LastPendingEventMessageID: *msgID,
6073
}); err != nil {
6174
if err != taskqueue.ErrTaskAlreadyInQueue {

0 commit comments

Comments
 (0)