Skip to content

Commit 59776d3

Browse files
committed
Support key prefix for task queues
1 parent 95a0605 commit 59776d3

File tree

3 files changed

+31
-26
lines changed

3 files changed

+31
-26
lines changed

backend/redis/queue.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,21 @@ type KeyInfo struct {
3434
SetKey string
3535
}
3636

37-
func newTaskQueue[T any](rdb redis.UniversalClient, tasktype string) (*taskQueue[T], error) {
37+
func newTaskQueue[T any](rdb redis.UniversalClient, keyPrefix string, tasktype string) (*taskQueue[T], error) {
38+
// Ensure the key prefix ends with a colon
39+
if keyPrefix != "" && keyPrefix[len(keyPrefix)-1] != ':' {
40+
keyPrefix += ":"
41+
}
42+
3843
tq := &taskQueue[T]{
3944
tasktype: tasktype,
40-
setKey: "task-set:" + tasktype,
41-
streamKey: "task-stream:" + tasktype,
45+
setKey: keyPrefix + "task-set:" + tasktype,
46+
streamKey: keyPrefix + "task-stream:" + tasktype,
4247
groupName: "task-workers",
4348
workerName: uuid.NewString(),
4449
}
4550

46-
// Pre-load script
51+
// Pre-load scripts
4752
cmds := map[string]*redis.StringCmd{
4853
"createGroupCmd": createGroupCmd.Load(context.Background(), rdb),
4954
"enqueueCmd": enqueueCmd.Load(context.Background(), rdb),

backend/redis/queue_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,15 @@ func Test_TaskQueue(t *testing.T) {
3333
{
3434
name: "Create queue",
3535
f: func(t *testing.T) {
36-
q, err := newTaskQueue[any](client, "test")
36+
q, err := newTaskQueue[any](client, "", "test")
3737
require.NoError(t, err)
3838
require.NotNil(t, q)
3939
},
4040
},
4141
{
4242
name: "Simple enqueue/dequeue",
4343
f: func(t *testing.T) {
44-
q, err := newTaskQueue[any](client, "test")
44+
q, err := newTaskQueue[any](client, "prefix", "test")
4545
require.NoError(t, err)
4646

4747
ctx := context.Background()
@@ -60,7 +60,7 @@ func Test_TaskQueue(t *testing.T) {
6060
{
6161
name: "Guarantee uniqueness",
6262
f: func(t *testing.T) {
63-
q, err := newTaskQueue[any](client, "test")
63+
q, err := newTaskQueue[any](client, "prefix", "test")
6464
require.NoError(t, err)
6565

6666
ctx := context.Background()
@@ -101,7 +101,7 @@ func Test_TaskQueue(t *testing.T) {
101101

102102
ctx := context.Background()
103103

104-
q, err := newTaskQueue[foo](client, "test")
104+
q, err := newTaskQueue[foo](client, "prefix", "test")
105105
require.NoError(t, err)
106106

107107
_, err = client.Pipelined(ctx, func(p redis.Pipeliner) error {
@@ -123,7 +123,7 @@ func Test_TaskQueue(t *testing.T) {
123123
{
124124
name: "Simple enqueue/dequeue different worker",
125125
f: func(t *testing.T) {
126-
q, _ := newTaskQueue[any](client, "test")
126+
q, _ := newTaskQueue[any](client, "prefix", "test")
127127

128128
ctx := context.Background()
129129

@@ -132,7 +132,7 @@ func Test_TaskQueue(t *testing.T) {
132132
})
133133
require.NoError(t, err)
134134

135-
q2, _ := newTaskQueue[any](client, "test")
135+
q2, _ := newTaskQueue[any](client, "prefix", "test")
136136
require.NoError(t, err)
137137

138138
// Dequeue using second worker
@@ -145,8 +145,8 @@ func Test_TaskQueue(t *testing.T) {
145145
{
146146
name: "Complete removes task",
147147
f: func(t *testing.T) {
148-
q, _ := newTaskQueue[any](client, "test")
149-
q2, _ := newTaskQueue[any](client, "test")
148+
q, _ := newTaskQueue[any](client, "prefix", "test")
149+
q2, _ := newTaskQueue[any](client, "prefix", "test")
150150

151151
ctx := context.Background()
152152

@@ -177,7 +177,7 @@ func Test_TaskQueue(t *testing.T) {
177177
{
178178
name: "Recover task",
179179
f: func(t *testing.T) {
180-
q, _ := newTaskQueue[any](client, "test")
180+
q, _ := newTaskQueue[any](client, "prefix", "test")
181181

182182
ctx := context.Background()
183183

@@ -186,7 +186,7 @@ func Test_TaskQueue(t *testing.T) {
186186
})
187187
require.NoError(t, err)
188188

189-
q2, _ := newTaskQueue[any](client, "test")
189+
q2, _ := newTaskQueue[any](client, "prefix", "test")
190190
require.NoError(t, err)
191191

192192
task, err := q2.Dequeue(ctx, client, lockTimeout, blockTimeout)
@@ -206,7 +206,7 @@ func Test_TaskQueue(t *testing.T) {
206206
{
207207
name: "Extending task prevents recovering",
208208
f: func(t *testing.T) {
209-
q, _ := newTaskQueue[any](client, "test")
209+
q, _ := newTaskQueue[any](client, "prefix", "test")
210210

211211
ctx := context.Background()
212212

@@ -216,7 +216,7 @@ func Test_TaskQueue(t *testing.T) {
216216
require.NoError(t, err)
217217

218218
// Create second worker (with different name)
219-
q2, _ := newTaskQueue[any](client, "test")
219+
q2, _ := newTaskQueue[any](client, "prefix", "test")
220220
require.NoError(t, err)
221221

222222
task, err := q2.Dequeue(ctx, client, lockTimeout, blockTimeout)

backend/redis/redis.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,6 @@ var (
3030
)
3131

3232
func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (*redisBackend, error) {
33-
workflowQueue, err := newTaskQueue[any](client, "workflows")
34-
if err != nil {
35-
return nil, fmt.Errorf("creating workflow task queue: %w", err)
36-
}
37-
38-
activityQueue, err := newTaskQueue[activityData](client, "activities")
39-
if err != nil {
40-
return nil, fmt.Errorf("creating activity task queue: %w", err)
41-
}
42-
4333
// Default options
4434
options := &RedisOptions{
4535
Options: backend.ApplyOptions(),
@@ -50,6 +40,16 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
5040
opt(options)
5141
}
5242

43+
workflowQueue, err := newTaskQueue[any](client, options.KeyPrefix, "workflows")
44+
if err != nil {
45+
return nil, fmt.Errorf("creating workflow task queue: %w", err)
46+
}
47+
48+
activityQueue, err := newTaskQueue[activityData](client, options.KeyPrefix, "activities")
49+
if err != nil {
50+
return nil, fmt.Errorf("creating activity task queue: %w", err)
51+
}
52+
5353
rb := &redisBackend{
5454
rdb: client,
5555
options: options,

0 commit comments

Comments
 (0)