Skip to content

Commit 8f4daea

Browse files
authored
Merge pull request #292 from cschleiden/redis-fix-active-key
Use correct key for checking whether an instance exists
2 parents 20ddb2f + c37cb99 commit 8f4daea

File tree

2 files changed

+74
-75
lines changed

2 files changed

+74
-75
lines changed

backend/redis/scripts/complete_workflow_task.lua

Lines changed: 48 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,38 @@ for i = 1, executedEvents do
5252
lastSequenceId = tonumber(sequenceId)
5353
end
5454

55+
-- Remove executed pending events
56+
local lastPendingEventMessageId = getArgv()
57+
redis.call("XTRIM", pendingEventsKey, "MINID", lastPendingEventMessageId)
58+
redis.call("XDEL", pendingEventsKey, lastPendingEventMessageId)
59+
60+
-- Update instance state
61+
local now = getArgv()
62+
local state = tonumber(getArgv())
63+
64+
-- State constants
65+
local ContinuedAsNew = tonumber(getArgv())
66+
local Finished = tonumber(getArgv())
67+
68+
instance["state"] = state
69+
70+
-- If workflow instance finished, remove active execution
71+
local activeInstanceExecutionKey = getKey()
72+
if state == ContinuedAsNew or state == Finished then
73+
-- Remove active execution
74+
redis.call("DEL", activeInstanceExecutionKey)
75+
76+
instance["completed_at"] = now
77+
78+
redis.call("SREM", activeInstancesKey, instanceSegment)
79+
end
80+
81+
if lastSequenceId > 0 then
82+
instance["last_sequence_id"] = lastSequenceId
83+
end
84+
85+
redis.call("SET", instanceKey, cjson.encode(instance))
86+
5587
-- Remove canceled timers
5688
local timersToCancel = tonumber(getArgv())
5789
for i = 1, timersToCancel do
@@ -80,6 +112,20 @@ for i = 1, timersToSchedule do
80112
storePayload(eventId, payloadData)
81113
end
82114

115+
-- Schedule activities
116+
local activities = tonumber(getArgv())
117+
local activitySetKey = getKey()
118+
local activityStreamKey = getKey()
119+
for i = 1, activities do
120+
local activityId = getArgv()
121+
local activityData = getArgv()
122+
123+
local added = redis.call("SADD", activitySetKey, activityId)
124+
if added == 1 then
125+
redis.call("XADD", activityStreamKey, "*", "id", activityId, "data", activityData)
126+
end
127+
end
128+
83129
-- Send events to other workflow instances
84130
local otherWorkflowInstances = tonumber(getArgv())
85131
for i = 1, otherWorkflowInstances do
@@ -102,7 +148,7 @@ for i = 1, otherWorkflowInstances do
102148
local conflictEventPayloadData = getArgv()
103149

104150
-- Does the instance exist already?
105-
local instanceExists = redis.call("EXISTS", targetActiveInstanceExecutionState)
151+
local instanceExists = redis.call("EXISTS", targetActiveInstanceExecutionKey)
106152
if instanceExists == 1 then
107153
redis.call("XADD", pendingEventsKey, "*", "event", conflictEventData)
108154
storePayload(conflictEventId, conflictEventPayloadData)
@@ -147,61 +193,14 @@ for i = 1, otherWorkflowInstances do
147193
end
148194
end
149195

150-
-- Update instance state
151-
local now = getArgv()
152-
local state = tonumber(getArgv())
153-
154-
-- State constants
155-
local ContinuedAsNew = tonumber(getArgv())
156-
local Finished = tonumber(getArgv())
157-
158-
instance["state"] = state
159-
160-
-- If workflow instance finished, remove active execution
161-
local activeInstanceExecutionKey = getKey()
162-
if state == ContinuedAsNew or state == Finished then
163-
-- Remove active execution
164-
redis.call("DEL", activeInstanceExecutionKey)
165-
166-
instance["completed_at"] = now
167-
168-
redis.call("SREM", activeInstancesKey, instanceSegment)
169-
end
170-
171-
if lastSequenceId > 0 then
172-
instance["last_sequence_id"] = lastSequenceId
173-
end
174-
175-
redis.call("SET", instanceKey, cjson.encode(instance))
176-
177-
-- Schedule activities
178-
local activities = tonumber(getArgv())
179-
local activitySetKey = getKey()
180-
local activityStreamKey = getKey()
181-
for i = 1, activities do
182-
local activityId = getArgv()
183-
local activityData = getArgv()
184-
185-
local added = redis.call("SADD", activitySetKey, activityId)
186-
if added == 1 then
187-
redis.call("XADD", activityStreamKey, "*", "id", activityId, "data", activityData)
188-
end
189-
end
190-
191-
-- Remove executed pending events
192-
local lastPendingEventMessageId = getArgv()
193-
redis.call("XTRIM", pendingEventsKey, "MINID", lastPendingEventMessageId)
194-
redis.call("XDEL", pendingEventsKey, lastPendingEventMessageId)
195-
196-
-- Complete workflow task and unlock instance
196+
-- Complete workflow task and mark instance task as completed
197197
local taskId = getArgv()
198198
local groupName = getArgv()
199199
local task = redis.call("XRANGE", workflowStreamKey, taskId, taskId)
200200
if #task ~= 0 then
201201
local id = task[1][2][2]
202202
redis.call("SREM", workflowSetKey, id)
203203
redis.call("XACK", workflowStreamKey, groupName, taskId)
204-
205204
redis.call("XDEL", workflowStreamKey, taskId)
206205
end
207206

backend/redis/workflow.go

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,15 @@ func (rb *redisBackend) CompleteWorkflowTask(
132132
args = append(args, event.ID, historyID(event.SequenceID), eventData, payloadData, event.SequenceID)
133133
}
134134

135+
// Remove executed pending events
136+
lastPendingEventMessageID := task.CustomData.(string)
137+
args = append(args, lastPendingEventMessageID)
138+
139+
// Update instance state and update active execution
140+
nowStr := time.Now().Format(time.RFC3339)
141+
args = append(args, string(nowStr), int(state), int(core.WorkflowInstanceStateContinuedAsNew), int(core.WorkflowInstanceStateFinished))
142+
keys = append(keys, activeInstanceExecutionKey(instance.InstanceID))
143+
135144
// Remove canceled timers
136145
timersToCancel := make([]*history.Event, 0)
137146
for _, event := range executedEvents {
@@ -163,14 +172,30 @@ func (rb *redisBackend) CompleteWorkflowTask(
163172
keys = append(keys, futureEventKey(instance, timerEvent.ScheduleEventID))
164173
}
165174

175+
// Schedule activities
176+
args = append(args, len(activityEvents))
177+
activityQueueKeys := rb.activityQueue.Keys()
178+
keys = append(keys, activityQueueKeys.SetKey, activityQueueKeys.StreamKey)
179+
for _, activityEvent := range activityEvents {
180+
activityData, err := json.Marshal(&activityData{
181+
Instance: instance,
182+
ID: activityEvent.ID,
183+
Event: activityEvent,
184+
})
185+
if err != nil {
186+
return fmt.Errorf("marshaling activity data: %w", err)
187+
}
188+
args = append(args, activityEvent.ID, activityData)
189+
}
190+
166191
// Send new workflow events to the respective streams
167192
groupedEvents := history.EventsByWorkflowInstance(workflowEvents)
168193
args = append(args, len(groupedEvents))
169194
for targetInstance, events := range groupedEvents {
170195
keys = append(keys, instanceKey(&targetInstance), activeInstanceExecutionKey(targetInstance.InstanceID))
171196
args = append(args, instanceSegment(&targetInstance), targetInstance.InstanceID)
172197

173-
// Are we creating a new sub-workflow instance?
198+
// Are we creating a new workflow instance?
174199
m := events[0]
175200
createNewInstance := m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted
176201
args = append(args, createNewInstance)
@@ -218,31 +243,6 @@ func (rb *redisBackend) CompleteWorkflowTask(
218243
}
219244
}
220245

221-
// Update instance state and update active execution
222-
nowStr := time.Now().Format(time.RFC3339)
223-
args = append(args, string(nowStr), int(state), int(core.WorkflowInstanceStateContinuedAsNew), int(core.WorkflowInstanceStateFinished))
224-
keys = append(keys, activeInstanceExecutionKey(instance.InstanceID))
225-
226-
// Store activity data
227-
args = append(args, len(activityEvents))
228-
activityQueueKeys := rb.activityQueue.Keys()
229-
keys = append(keys, activityQueueKeys.SetKey, activityQueueKeys.StreamKey)
230-
for _, activityEvent := range activityEvents {
231-
activityData, err := json.Marshal(&activityData{
232-
Instance: instance,
233-
ID: activityEvent.ID,
234-
Event: activityEvent,
235-
})
236-
if err != nil {
237-
return fmt.Errorf("marshaling activity data: %w", err)
238-
}
239-
args = append(args, activityEvent.ID, activityData)
240-
}
241-
242-
// Remove executed pending events
243-
lastPendingEventMessageID := task.CustomData.(string)
244-
args = append(args, lastPendingEventMessageID)
245-
246246
// Complete workflow task and unlock instance.
247247
args = append(args, task.ID, rb.workflowQueue.groupName)
248248

0 commit comments

Comments
 (0)