@@ -19,12 +19,14 @@ import (
19
19
// KEYS[2] - workflow task queue stream
20
20
// KEYS[3] - workflow task queue set
21
21
// ARGV[1] - current timestamp for zrange
22
+ // ARGV[2] - redis key prefix
22
23
//
23
24
// Note: this does not work with Redis Cluster since not all keys are passed into the script.
24
25
var futureEventsCmd = redis .NewScript (`
25
26
-- Find events which should become visible now
26
27
local now = ARGV[1]
27
28
local events = redis.call("ZRANGE", KEYS[1], "-inf", now, "BYSCORE")
29
+ local prefix = ARGV[2]
28
30
for i = 1, #events do
29
31
local instanceSegment = redis.call("HGET", events[i], "instance")
30
32
@@ -35,7 +37,7 @@ var futureEventsCmd = redis.NewScript(`
35
37
36
38
-- Add event to pending event stream
37
39
local eventData = redis.call("HGET", events[i], "event")
38
- local pending_events_key = "pending-events:" .. instanceSegment
40
+ local pending_events_key = prefix .. "pending-events:" .. instanceSegment
39
41
redis.call("XADD", pending_events_key, "*", "event", eventData)
40
42
41
43
-- Delete event hash data
@@ -57,7 +59,7 @@ func scheduleFutureEvents(ctx context.Context, rb *redisBackend) error {
57
59
rb .keys .futureEventsKey (),
58
60
queueKeys .StreamKey ,
59
61
queueKeys .SetKey ,
60
- }, nowStr ).Result (); err != nil && err != redis .Nil {
62
+ }, nowStr , rb . keys . prefix ).Result (); err != nil && err != redis .Nil {
61
63
return fmt .Errorf ("checking future events: %w" , err )
62
64
}
63
65
0 commit comments