Skip to content

Commit dddec49

Browse files
authored
Enqueue wf instance again
1 parent 2ed849c commit dddec49

File tree

1 file changed

+11
-2
lines changed

1 file changed

+11
-2
lines changed

backend/redis/workflow.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
137137
}
138138

139139
// Store activity data
140-
// TODO: Use pipeline?
141140
for _, activityEvent := range activityEvents {
142141
if _, err := rb.activityQueue.Enqueue(ctx, activityEvent.ID, &activityData{
143142
InstanceID: instance.GetInstanceID(),
@@ -153,7 +152,17 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
153152
return errors.Wrap(err, "could not complete workflow task")
154153
}
155154

156-
log.Println("Unlocked", instance.GetInstanceID())
155+
// If there are pending events, enqueue the instance again
156+
// TODO: Check for pending events
157+
if state != backend.WorkflowStateFinished {
158+
if _, err := rb.workflowQueue.Enqueue(ctx, instance.GetInstanceID(), nil); err != nil {
159+
if err != taskqueue.ErrTaskAlreadyInQueue {
160+
return errors.Wrap(err, "could not queue workflow")
161+
}
162+
}
163+
}
164+
165+
log.Println("Unlocked workflow task", instance.GetInstanceID())
157166

158167
return nil
159168
}

0 commit comments

Comments
 (0)