Skip to content

Commit acb41df

Browse files
committed
Fix timer schedule/cancel race
1 parent 63443d1 commit acb41df

File tree

2 files changed

+28
-20
lines changed

2 files changed

+28
-20
lines changed

backend/redis/events_future.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package redis
33
import (
44
"context"
55
"fmt"
6+
"log"
67
"strconv"
78
"time"
89

@@ -23,24 +24,25 @@ import (
2324
// Note: this does not work with Redis Cluster since not all keys are passed into the script.
2425
var futureEventsCmd = redis.NewScript(`
2526
-- Find events which should become visible now
26-
local events = redis.call("ZRANGE", KEYS[1], "-inf", ARGV[1], "BYSCORE")
27+
local now = ARGV[1]
28+
local events = redis.call("ZRANGE", KEYS[1], "-inf", now, "BYSCORE")
2729
for i = 1, #events do
2830
local instanceSegment = redis.call("HGET", events[i], "instance")
2931
30-
-- Add event to pending event stream
31-
local eventData = redis.call("HGET", events[i], "event")
32-
local pending_events_key = "pending-events:" .. instanceSegment
33-
redis.call("XADD", pending_events_key, "*", "event", eventData)
34-
35-
-- Try to queue workflow task
36-
local already_queued = redis.call("SADD", KEYS[3], instanceSegment)
37-
if already_queued ~= 0 then
32+
-- Try to queue workflow task. If a workflow task is already queued, ignore this event for now.
33+
local added = redis.call("SADD", KEYS[3], instanceSegment)
34+
if added == 1 then
3835
redis.call("XADD", KEYS[2], "*", "id", instanceSegment, "data", "")
39-
end
4036
41-
-- Delete event hash data
42-
redis.call("DEL", events[i])
43-
redis.call("ZREM", KEYS[1], events[i])
37+
-- Add event to pending event stream
38+
local eventData = redis.call("HGET", events[i], "event")
39+
local pending_events_key = "pending-events:" .. instanceSegment
40+
redis.call("XADD", pending_events_key, "*", "event", eventData)
41+
42+
-- Delete event hash data
43+
redis.call("DEL", events[i])
44+
redis.call("ZREM", KEYS[1], events[i])
45+
end
4446
end
4547
4648
return #events
@@ -52,12 +54,14 @@ func scheduleFutureEvents(ctx context.Context, rb *redisBackend) error {
5254

5355
queueKeys := rb.workflowQueue.Keys()
5456

55-
if _, err := futureEventsCmd.Run(ctx, rb.rdb, []string{
57+
if r, err := futureEventsCmd.Run(ctx, rb.rdb, []string{
5658
futureEventsKey(),
5759
queueKeys.StreamKey,
5860
queueKeys.SetKey,
5961
}, nowStr).Result(); err != nil && err != redis.Nil {
6062
return fmt.Errorf("checking future events: %w", err)
63+
} else {
64+
log.Println("Scheduled", r.(int64), "future events")
6165
}
6266

6367
return nil

backend/redis/scripts/complete_workflow_task.lua

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,16 @@ local timersToCancel = tonumber(getArgv())
9191
for i = 1, timersToCancel do
9292
local futureEventKey = getKey()
9393

94-
redis.call("ZREM", futureEventZSetKey, futureEventKey)
95-
-- remove payload
96-
local eventId = redis.call("HGET", futureEventKey, "id")
97-
redis.call("HDEL", payloadHashKey, eventId)
98-
-- remove event hash
99-
redis.call("DEL", futureEventKey)
94+
local eventRemoved = redis.call("ZREM", futureEventZSetKey, futureEventKey)
95+
-- Event might've become visible while this task was being processed, in that
96+
-- case it would be already removed from futureEventZSetKey
97+
if eventRemoved == 1 then
98+
-- remove payload
99+
local eventId = redis.call("HGET", futureEventKey, "id")
100+
redis.call("HDEL", payloadHashKey, eventId)
101+
-- remove event hash
102+
redis.call("DEL", futureEventKey)
103+
end
100104
end
101105

102106
-- Schedule timers

0 commit comments

Comments
 (0)