Skip to content

Commit 5fc2bb5

Browse files
authored
consumer group creation
1 parent bcd44f2 commit 5fc2bb5

File tree

1 file changed

+26
-12
lines changed

1 file changed

+26
-12
lines changed

backend/redis/queue.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,17 @@ func newTaskQueue[T any](rdb redis.UniversalClient, tasktype string) (*taskQueue
4343
workerName: uuid.NewString(),
4444
}
4545

46-
// Create the consumer group
47-
_, err := rdb.XGroupCreateMkStream(context.Background(), tq.streamKey, tq.groupName, "0").Result()
48-
if err != nil {
49-
// Ugly, check since there is no UPSERT for consumer groups. Might replace with a script
50-
// using XINFO & XGROUP CREATE atomically
51-
if err.Error() != "BUSYGROUP Consumer Group name already exists" {
52-
return nil, fmt.Errorf("creating task queue: %w", err)
53-
}
54-
}
55-
5646
// Pre-load script
5747
cmds := map[string]*redis.StringCmd{
58-
"enqueueCmd": enqueueCmd.Load(context.Background(), rdb),
59-
"completeCmd": completeCmd.Load(context.Background(), rdb),
48+
"createGroupCmd": createGroupCmd.Load(context.Background(), rdb),
49+
"enqueueCmd": enqueueCmd.Load(context.Background(), rdb),
50+
"completeCmd": completeCmd.Load(context.Background(), rdb),
51+
}
52+
53+
// Create the consumer group
54+
_, err := createGroupCmd.Run(context.Background(), rdb, []string{tq.streamKey, tq.groupName}).Result()
55+
if err != nil {
56+
return nil, fmt.Errorf("creating task queue: %w", err)
6057
}
6158

6259
for name, cmd := range cmds {
@@ -91,6 +88,23 @@ var enqueueCmd = redis.NewScript(
9188
return true
9289
`)
9390

91+
var createGroupCmd = redis.NewScript(`
92+
local streamKey = KEYS[1]
93+
local groupName = KEYS[2]
94+
local status, groups = pcall(redis.call, 'XINFO', 'GROUPS', streamKey)
95+
96+
if status then
97+
for i = 1, #groups do
98+
if groups[i].name == groupName then
99+
return false
100+
end
101+
end
102+
end
103+
104+
redis.call('XGROUP', 'CREATE', streamKey, groupName, '0', 'MKSTREAM')
105+
return true
106+
`)
107+
94108
func (q *taskQueue[T]) Enqueue(ctx context.Context, p redis.Pipeliner, id string, data *T) error {
95109
ds, err := json.Marshal(data)
96110
if err != nil {

0 commit comments

Comments
 (0)