@@ -50,12 +50,6 @@ func newTaskQueue[T any](rdb redis.UniversalClient, tasktype string) (*taskQueue
50
50
"completeCmd" : completeCmd .Load (context .Background (), rdb ),
51
51
}
52
52
53
- // Create the consumer group
54
- _ , err := createGroupCmd .Run (context .Background (), rdb , []string {tq .streamKey , tq .groupName }).Result ()
55
- if err != nil {
56
- return nil , fmt .Errorf ("creating task queue: %w" , err )
57
- }
58
-
59
53
for name , cmd := range cmds {
60
54
// fmt.Println(name, cmd.Val())
61
55
@@ -64,6 +58,12 @@ func newTaskQueue[T any](rdb redis.UniversalClient, tasktype string) (*taskQueue
64
58
}
65
59
}
66
60
61
+ // Create the consumer group
62
+ _ , err := createGroupCmd .Run (context .Background (), rdb , []string {tq .streamKey , tq .groupName }).Result ()
63
+ if err != nil {
64
+ return nil , fmt .Errorf ("creating task queue: %w" , err )
65
+ }
66
+
67
67
return tq , nil
68
68
}
69
69
@@ -89,22 +89,33 @@ var enqueueCmd = redis.NewScript(
89
89
` )
90
90
91
91
var createGroupCmd = redis .NewScript (`
92
- local streamKey = KEYS[1]
93
- local groupName = KEYS[2]
94
- local status, groups = pcall(redis.call, 'XINFO', 'GROUPS', streamKey)
95
-
96
- if status then
97
- for i = 1, #groups do
98
- if groups[i].name == groupName then
99
- return false
100
- end
101
- end
102
- end
103
-
104
- redis.call('XGROUP', 'CREATE', streamKey, groupName, '0', 'MKSTREAM')
105
- return true
92
+ local streamKey = KEYS[1]
93
+ local groupName = KEYS[2]
94
+ local exists = false
95
+ local res = redis.pcall('XINFO', 'GROUPS', streamKey)
96
+
97
+ if res and type(res) == 'table' then
98
+ for _, groupInfo in ipairs(res) do
99
+ if type(groupInfo) == 'table' then
100
+ for i = 1, #groupInfo, 2 do
101
+ if groupInfo[i] == 'name' and groupInfo[i+1] == groupName then
102
+ exists = true
103
+ break
104
+ end
105
+ end
106
+ end
107
+ if exists then break end
108
+ end
109
+ end
110
+
111
+ if not exists then
112
+ redis.pcall('XGROUP', 'CREATE', streamKey, groupName, '$', 'MKSTREAM')
113
+ end
114
+
115
+ return true
106
116
` )
107
117
118
+
108
119
func (q * taskQueue [T ]) Enqueue (ctx context.Context , p redis.Pipeliner , id string , data * T ) error {
109
120
ds , err := json .Marshal (data )
110
121
if err != nil {
0 commit comments