Skip to content

Commit 6f51384

Browse files
authored
Merge pull request #219 from yaananth/yaananth-patch-1
consumer group create only if it doesn't exists
2 parents 9177bbf + 2715fba commit 6f51384

File tree

1 file changed

+37
-12
lines changed

1 file changed

+37
-12
lines changed

backend/redis/queue.go

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,11 @@ func newTaskQueue[T any](rdb redis.UniversalClient, tasktype string) (*taskQueue
4343
workerName: uuid.NewString(),
4444
}
4545

46-
// Create the consumer group
47-
_, err := rdb.XGroupCreateMkStream(context.Background(), tq.streamKey, tq.groupName, "0").Result()
48-
if err != nil {
49-
// Ugly, check since there is no UPSERT for consumer groups. Might replace with a script
50-
// using XINFO & XGROUP CREATE atomically
51-
if err.Error() != "BUSYGROUP Consumer Group name already exists" {
52-
return nil, fmt.Errorf("creating task queue: %w", err)
53-
}
54-
}
55-
5646
// Pre-load script
5747
cmds := map[string]*redis.StringCmd{
58-
"enqueueCmd": enqueueCmd.Load(context.Background(), rdb),
59-
"completeCmd": completeCmd.Load(context.Background(), rdb),
48+
"createGroupCmd": createGroupCmd.Load(context.Background(), rdb),
49+
"enqueueCmd": enqueueCmd.Load(context.Background(), rdb),
50+
"completeCmd": completeCmd.Load(context.Background(), rdb),
6051
}
6152

6253
for name, cmd := range cmds {
@@ -67,6 +58,12 @@ func newTaskQueue[T any](rdb redis.UniversalClient, tasktype string) (*taskQueue
6758
}
6859
}
6960

61+
// Create the consumer group
62+
err := createGroupCmd.Run(context.Background(), rdb, []string{tq.streamKey, tq.groupName}).Err()
63+
if err != nil {
64+
return nil, fmt.Errorf("creating task queue: %w", err)
65+
}
66+
7067
return tq, nil
7168
}
7269

@@ -91,6 +88,34 @@ var enqueueCmd = redis.NewScript(
9188
return true
9289
`)
9390

91+
var createGroupCmd = redis.NewScript(`
92+
local streamKey = KEYS[1]
93+
local groupName = KEYS[2]
94+
local exists = false
95+
local res = redis.pcall('XINFO', 'GROUPS', streamKey)
96+
97+
if res and type(res) == 'table' then
98+
for _, groupInfo in ipairs(res) do
99+
if type(groupInfo) == 'table' then
100+
for i = 1, #groupInfo, 2 do
101+
if groupInfo[i] == 'name' and groupInfo[i+1] == groupName then
102+
exists = true
103+
break
104+
end
105+
end
106+
end
107+
if exists then break end
108+
end
109+
end
110+
111+
if not exists then
112+
redis.call('XGROUP', 'CREATE', streamKey, groupName, '$', 'MKSTREAM')
113+
end
114+
115+
return true
116+
`)
117+
118+
94119
func (q *taskQueue[T]) Enqueue(ctx context.Context, p redis.Pipeliner, id string, data *T) error {
95120
ds, err := json.Marshal(data)
96121
if err != nil {

0 commit comments

Comments
 (0)