Skip to content

Commit e1a243d

Browse files
authored
Merge branch 'main' into backend-stats
2 parents 8b59709 + 6f51384 commit e1a243d

File tree

1 file changed

+37
-12
lines changed

1 file changed

+37
-12
lines changed

backend/redis/queue.go

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,11 @@ func newTaskQueue[T any](rdb redis.UniversalClient, tasktype string) (*taskQueue
4343
workerName: uuid.NewString(),
4444
}
4545

46-
// Create the consumer group
47-
_, err := rdb.XGroupCreateMkStream(context.Background(), tq.streamKey, tq.groupName, "0").Result()
48-
if err != nil {
49-
// Ugly, check since there is no UPSERT for consumer groups. Might replace with a script
50-
// using XINFO & XGROUP CREATE atomically
51-
if err.Error() != "BUSYGROUP Consumer Group name already exists" {
52-
return nil, fmt.Errorf("creating task queue: %w", err)
53-
}
54-
}
55-
5646
// Pre-load script
5747
cmds := map[string]*redis.StringCmd{
58-
"enqueueCmd": enqueueCmd.Load(context.Background(), rdb),
59-
"completeCmd": completeCmd.Load(context.Background(), rdb),
48+
"createGroupCmd": createGroupCmd.Load(context.Background(), rdb),
49+
"enqueueCmd": enqueueCmd.Load(context.Background(), rdb),
50+
"completeCmd": completeCmd.Load(context.Background(), rdb),
6051
}
6152

6253
for name, cmd := range cmds {
@@ -67,6 +58,12 @@ func newTaskQueue[T any](rdb redis.UniversalClient, tasktype string) (*taskQueue
6758
}
6859
}
6960

61+
// Create the consumer group
62+
err := createGroupCmd.Run(context.Background(), rdb, []string{tq.streamKey, tq.groupName}).Err()
63+
if err != nil {
64+
return nil, fmt.Errorf("creating task queue: %w", err)
65+
}
66+
7067
return tq, nil
7168
}
7269

@@ -95,6 +92,34 @@ var enqueueCmd = redis.NewScript(
9592
return true
9693
`)
9794

95+
var createGroupCmd = redis.NewScript(`
96+
local streamKey = KEYS[1]
97+
local groupName = KEYS[2]
98+
local exists = false
99+
local res = redis.pcall('XINFO', 'GROUPS', streamKey)
100+
101+
if res and type(res) == 'table' then
102+
for _, groupInfo in ipairs(res) do
103+
if type(groupInfo) == 'table' then
104+
for i = 1, #groupInfo, 2 do
105+
if groupInfo[i] == 'name' and groupInfo[i+1] == groupName then
106+
exists = true
107+
break
108+
end
109+
end
110+
end
111+
if exists then break end
112+
end
113+
end
114+
115+
if not exists then
116+
redis.call('XGROUP', 'CREATE', streamKey, groupName, '$', 'MKSTREAM')
117+
end
118+
119+
return true
120+
`)
121+
122+
98123
func (q *taskQueue[T]) Enqueue(ctx context.Context, p redis.Pipeliner, id string, data *T) error {
99124
ds, err := json.Marshal(data)
100125
if err != nil {

0 commit comments

Comments
 (0)