Skip to content

Commit cc7b798

Browse files
authored
Ensure queues contain only unique tasks
1 parent 8f5425d commit cc7b798

File tree

5 files changed

+105
-63
lines changed

5 files changed

+105
-63
lines changed

backend/redis/README.md

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,19 @@
44

55
Instances and their state (started_at, completed_at etc.) are stored as JSON blobs under the `instances-{instanceID}` keys.
66

7-
## History Events
7+
## History and pending events
88

9-
Events are stored in streams per workflow instance under `events-{instanceID}`. We could use a plain list but with streams we can do `XRANGE` queries for a subset of the history for continuation tasks.
10-
11-
## Pending events
12-
13-
Pending events are stored in streams under the `pending-{instanceID}` key.
9+
Events are stored in streams per workflow instance under the `events-{instanceID}` key. We maintain a cursor in the instance state, that indicates the last event that has been executed. Every event after that in the stream, is a pending event and will be returned to the worker in the next workflow task.
1410

1511
## Timer events
1612

17-
Timer events are stored in a sorted set. Whenever a client checks for a new workflow instance task, the sorted set is checked to see if any of the pending timer events is ready yet. If it is, it's added to the pending events before those are checked for pending workflow tasks.
13+
Timer events are stored in a sorted set (`ZSET`). Whenever a worker checks for a new workflow instance task, the sorted set is checked to see if any of the pending timer events is ready yet. If it is, it's added to the pending events before those are returned for pending workflow tasks.
1814

1915
## Task queues
2016

2117
We need queues for activities and workflow instances. In both cases, we have tasks being enqueued, workers polling for works, and we have to guarantee that every task is eventually processed. So if a worker has dequeued a task and crashed, for example, eventually we need another worker to pick up the task and finish it.
2218

23-
Task queues are implemented using Redis STREAMs.
19+
Task queues are implemented using Redis STREAMs. In addition for queues where we only want a single instance of a task to be in the queue, we maintain an additional `SET`.
2420

2521
<details>
2622
<summary>Alternatives considered</summary>

backend/redis/instance.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/cschleiden/go-workflows/backend"
9+
"github.com/cschleiden/go-workflows/backend/redis/taskqueue"
910
"github.com/cschleiden/go-workflows/internal/core"
1011
"github.com/cschleiden/go-workflows/internal/history"
1112
"github.com/go-redis/redis/v8"
@@ -42,8 +43,10 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, event histor
4243
}
4344

4445
// Add instance to pending instances set
45-
if err := rb.workflowQueue.Enqueue(ctx, event.WorkflowInstance.GetInstanceID(), nil); err != nil {
46-
return errors.Wrap(err, "could not queue workflow task")
46+
if _, err := rb.workflowQueue.Enqueue(ctx, event.WorkflowInstance.GetInstanceID(), nil); err != nil {
47+
if err != taskqueue.ErrTaskAlreadyInQueue {
48+
return errors.Wrap(err, "could not queue workflow task")
49+
}
4750
}
4851

4952
return nil

backend/redis/taskqueue/queue.go

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ import (
1313
type taskQueue[T any] struct {
1414
tasktype string
1515
rdb redis.UniversalClient
16+
setKey string
17+
streamKey string
1618
groupName string
1719
workerName string
18-
streamName string
1920
}
2021

2122
type TaskItem[T any] struct {
@@ -29,24 +30,27 @@ type TaskItem[T any] struct {
2930
Data T
3031
}
3132

33+
var ErrTaskAlreadyInQueue = errors.New("task already in queue")
34+
3235
type TaskQueue[T any] interface {
33-
Enqueue(ctx context.Context, id string, data *T) error
36+
Enqueue(ctx context.Context, id string, data *T) (*string, error)
3437
Dequeue(ctx context.Context, lockTimeout, timeout time.Duration) (*TaskItem[T], error)
3538
Extend(ctx context.Context, taskID string) error
36-
Complete(ctx context.Context, id string) error
39+
Complete(ctx context.Context, taskID string) error
3740
}
3841

3942
func New[T any](rdb redis.UniversalClient, tasktype string) (TaskQueue[T], error) {
4043
tq := &taskQueue[T]{
4144
tasktype: tasktype,
4245
rdb: rdb,
46+
setKey: "task-set:" + tasktype,
47+
streamKey: "task-stream:" + tasktype,
4348
groupName: "task-workers",
4449
workerName: uuid.NewString(),
45-
streamName: "task-stream:" + tasktype,
4650
}
4751

4852
// Create the consumer group
49-
_, err := tq.rdb.XGroupCreateMkStream(context.Background(), tq.streamName, tq.groupName, "0").Result()
53+
_, err := tq.rdb.XGroupCreateMkStream(context.Background(), tq.streamKey, tq.groupName, "0").Result()
5054
if err != nil {
5155
// Ugly, check since there is no UPSERT for consumer groups. Might replace with a script
5256
// using XINFO & XGROUP CREATE atomically
@@ -58,24 +62,37 @@ func New[T any](rdb redis.UniversalClient, tasktype string) (TaskQueue[T], error
5862
return tq, nil
5963
}
6064

61-
func (q *taskQueue[T]) Enqueue(ctx context.Context, id string, data *T) error {
65+
// KEYS[1] = stream
66+
// KEYS[2] = stream
67+
// ARGV[1] = caller provided id of the task
68+
// ARGV[2] = additional data to store with the task
69+
var enqueueCmd = redis.NewScript(`
70+
local exists = redis.call("sadd", KEYS[1], ARGV[1])
71+
if exists == 0 then
72+
return nil
73+
end
74+
75+
local task_id = redis.call("xadd", KEYS[2], "*", "id", ARGV[1], "data", ARGV[2])
76+
return task_id
77+
`)
78+
79+
func (q *taskQueue[T]) Enqueue(ctx context.Context, id string, data *T) (*string, error) {
6280
ds, err := json.Marshal(data)
6381
if err != nil {
64-
return err
82+
return nil, err
6583
}
6684

67-
_, err = q.rdb.XAdd(ctx, &redis.XAddArgs{
68-
Stream: q.streamName,
69-
Values: map[string]interface{}{
70-
"id": id,
71-
"data": string(ds),
72-
},
73-
}).Result()
74-
if err != nil {
75-
return errors.Wrap(err, "could not enqueue task")
85+
taskID, err := enqueueCmd.Run(ctx, q.rdb, []string{q.setKey, q.streamKey}, id, string(ds)).Result()
86+
if err != nil && err != redis.Nil {
87+
return nil, errors.Wrap(err, "could not enqueue task")
7688
}
7789

78-
return nil
90+
if taskID == nil {
91+
return nil, ErrTaskAlreadyInQueue
92+
}
93+
94+
tidStr := taskID.(string)
95+
return &tidStr, nil
7996
}
8097

8198
func (q *taskQueue[T]) Dequeue(ctx context.Context, lockTimeout, timeout time.Duration) (*TaskItem[T], error) {
@@ -91,7 +108,7 @@ func (q *taskQueue[T]) Dequeue(ctx context.Context, lockTimeout, timeout time.Du
91108

92109
// Check for new tasks
93110
ids, err := q.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
94-
Streams: []string{q.streamName, ">"},
111+
Streams: []string{q.streamKey, ">"},
95112
Group: q.groupName,
96113
Consumer: q.workerName,
97114
Count: 1,
@@ -113,7 +130,7 @@ func (q *taskQueue[T]) Extend(ctx context.Context, taskID string) error {
113130
// Claiming a message resets the idle timer. Don't use the `JUSTID` variant, we
114131
// want to increase the retry counter.
115132
_, err := q.rdb.XClaim(ctx, &redis.XClaimArgs{
116-
Stream: q.streamName,
133+
Stream: q.streamKey,
117134
Group: q.groupName,
118135
Consumer: q.workerName,
119136
Messages: []string{taskID},
@@ -126,24 +143,27 @@ func (q *taskQueue[T]) Extend(ctx context.Context, taskID string) error {
126143
return nil
127144
}
128145

129-
func (q *taskQueue[T]) Complete(ctx context.Context, id string) error {
130-
// c, err := q.rdb.XAck(ctx, q.streamName, q.groupName, id).Result()
131-
// if err != nil {
132-
// return errors.Wrap(nil, "could not complete task")
133-
// }
134-
135-
// if c != 1 {
136-
// return errors.New("could find task to complete")
137-
// }
138-
146+
// We need TaskIDs for the stream and caller provided IDs for the set. So first look up
147+
// the ID in the stream using the TaskID, then remove from the set and the stream
148+
// KEYS[1] = set
149+
// KEYS[2] = stream
150+
// ARGV[1] = task id
151+
var completeCmd = redis.NewScript(`
152+
local task = redis.call("XRANGE", KEYS[2], ARGV[1], ARGV[1])
153+
local id = task[1][2][2]
154+
redis.call("SREM", KEYS[1], id)
155+
return redis.call("XDEL", KEYS[2], ARGV[1])
156+
`)
157+
158+
func (q *taskQueue[T]) Complete(ctx context.Context, taskID string) error {
139159
// Delete the task here. Overall we'll keep the stream at a small size, so fragmentation
140160
// is not an issue for us.
141-
c, err := q.rdb.XDel(ctx, q.streamName, id).Result()
142-
if err != nil {
143-
return errors.Wrap(nil, "could not complete task")
161+
c, err := completeCmd.Run(ctx, q.rdb, []string{q.setKey, q.streamKey}, taskID).Result()
162+
if err != nil && err != redis.Nil {
163+
return errors.Wrap(err, "could not complete task")
144164
}
145165

146-
if c != 1 {
166+
if c.(int64) == 0 || err == redis.Nil {
147167
return errors.New("could find task to complete")
148168
}
149169

@@ -154,7 +174,7 @@ func (q *taskQueue[T]) recover(ctx context.Context, idleTimeout time.Duration) (
154174
// Ignore the start argument, we are deleting tasks as they are completed, so we'll always
155175
// start this scan from the beginning.
156176
msgs, _, err := q.rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
157-
Stream: q.streamName,
177+
Stream: q.streamKey,
158178
Group: q.groupName,
159179
Consumer: q.workerName,
160180
MinIdle: idleTimeout,

backend/redis/taskqueue/queue_test.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func Test_TaskQueue(t *testing.T) {
4242
q, err := New[any](client, "test")
4343
require.NoError(t, err)
4444

45-
err = q.Enqueue(context.Background(), "t1", nil)
45+
_, err = q.Enqueue(context.Background(), "t1", nil)
4646
require.NoError(t, err)
4747

4848
task, err := q.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
@@ -51,6 +51,29 @@ func Test_TaskQueue(t *testing.T) {
5151
require.Equal(t, "t1", task.ID)
5252
},
5353
},
54+
{
55+
name: "Guarantee uniqueness",
56+
f: func(t *testing.T) {
57+
q, err := New[any](client, "test")
58+
require.NoError(t, err)
59+
60+
_, err = q.Enqueue(context.Background(), "t1", nil)
61+
require.NoError(t, err)
62+
63+
_, err = q.Enqueue(context.Background(), "t1", nil)
64+
require.Error(t, ErrTaskAlreadyInQueue, err)
65+
66+
task, err := q.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
67+
require.NoError(t, err)
68+
require.NotNil(t, task)
69+
70+
err = q.Complete(context.Background(), task.TaskID)
71+
require.NoError(t, err)
72+
73+
_, err = q.Enqueue(context.Background(), "t1", nil)
74+
require.NoError(t, err)
75+
},
76+
},
5477
{
5578
name: "Store custom data",
5679
f: func(t *testing.T) {
@@ -62,7 +85,7 @@ func Test_TaskQueue(t *testing.T) {
6285
q, err := New[foo](client, "test")
6386
require.NoError(t, err)
6487

65-
err = q.Enqueue(context.Background(), "t1", &foo{
88+
_, err = q.Enqueue(context.Background(), "t1", &foo{
6689
Count: 1,
6790
Name: "bar",
6891
})
@@ -81,7 +104,7 @@ func Test_TaskQueue(t *testing.T) {
81104
f: func(t *testing.T) {
82105
q, _ := New[any](client, "test")
83106

84-
err := q.Enqueue(context.Background(), "t1", nil)
107+
_, err := q.Enqueue(context.Background(), "t1", nil)
85108
require.NoError(t, err)
86109

87110
q2, _ := New[any](client, "test")
@@ -100,7 +123,7 @@ func Test_TaskQueue(t *testing.T) {
100123
q, _ := New[any](client, "test")
101124
q2, _ := New[any](client, "test")
102125

103-
err := q.Enqueue(context.Background(), "t1", nil)
126+
_, err := q.Enqueue(context.Background(), "t1", nil)
104127
require.NoError(t, err)
105128

106129
task, err := q.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
@@ -124,7 +147,7 @@ func Test_TaskQueue(t *testing.T) {
124147
f: func(t *testing.T) {
125148
q, _ := New[any](client, "test")
126149

127-
err := q.Enqueue(context.Background(), "t1", nil)
150+
_, err := q.Enqueue(context.Background(), "t1", nil)
128151
require.NoError(t, err)
129152

130153
q2, _ := New[any](client, "test")
@@ -149,7 +172,7 @@ func Test_TaskQueue(t *testing.T) {
149172
f: func(t *testing.T) {
150173
q, _ := New[any](client, "test")
151174

152-
err := q.Enqueue(context.Background(), "t1", nil)
175+
_, err := q.Enqueue(context.Background(), "t1", nil)
153176
require.NoError(t, err)
154177

155178
q2, _ := New[any](client, "test")

backend/redis/workflow.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"log"
77

88
"github.com/cschleiden/go-workflows/backend"
9+
"github.com/cschleiden/go-workflows/backend/redis/taskqueue"
910
"github.com/cschleiden/go-workflows/internal/core"
1011
"github.com/cschleiden/go-workflows/internal/history"
1112
"github.com/cschleiden/go-workflows/internal/task"
@@ -31,8 +32,7 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
3132
}
3233

3334
// History
34-
cmd := rb.rdb.XRange(ctx, historyKey(instanceTask.ID), "-", "+")
35-
msgs, err := cmd.Result()
35+
msgs, err := rb.rdb.XRange(ctx, historyKey(instanceTask.ID), "-", "+").Result()
3636
if err != nil {
3737
return nil, errors.Wrap(err, "could not read event stream")
3838
}
@@ -52,8 +52,7 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
5252
// New Events
5353
newEvents := make([]history.Event, 0)
5454

55-
cmd = rb.rdb.XRange(ctx, pendingEventsKey(instanceTask.ID), "-", "+")
56-
msgs, err = cmd.Result()
55+
msgs, err = rb.rdb.XRange(ctx, pendingEventsKey(instanceTask.ID), "-", "+").Result()
5756
if err != nil {
5857
return nil, errors.Wrap(err, "could not read event stream")
5958
}
@@ -72,7 +71,7 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
7271
rb.rdb.XTrim(ctx, pendingEventsKey(instanceTask.ID), 0)
7372

7473
return &task.Workflow{
75-
ID: instanceTask.ID,
74+
ID: instanceTask.TaskID,
7675
WorkflowInstance: core.NewWorkflowInstance(instanceTask.ID, instanceState.ExecutionID),
7776
History: historyEvents,
7877
NewEvents: newEvents,
@@ -117,10 +116,10 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
117116

118117
// TODO: Delay unlocking the current instance. Can we find a better way here?
119118
if targetInstance != instance {
120-
// TODO: Make sure this task is not already enqueued
121-
122-
if err := rb.workflowQueue.Enqueue(ctx, targetInstance.GetInstanceID(), nil); err != nil {
123-
return errors.Wrap(err, "could not add instance to locked instances set")
119+
if _, err := rb.workflowQueue.Enqueue(ctx, targetInstance.GetInstanceID(), nil); err != nil {
120+
if err != taskqueue.ErrTaskAlreadyInQueue {
121+
return errors.Wrap(err, "could not add instance to locked instances set")
122+
}
124123
}
125124
}
126125
}
@@ -140,7 +139,7 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
140139
// Store activity data
141140
// TODO: Use pipeline?
142141
for _, activityEvent := range activityEvents {
143-
if err := rb.activityQueue.Enqueue(ctx, activityEvent.ID, &activityData{
142+
if _, err := rb.activityQueue.Enqueue(ctx, activityEvent.ID, &activityData{
144143
InstanceID: instance.GetInstanceID(),
145144
ID: activityEvent.ID,
146145
Event: activityEvent,
@@ -166,9 +165,10 @@ func (rb *redisBackend) addWorkflowInstanceEvent(ctx context.Context, instance c
166165
}
167166

168167
// Queue workflow task
169-
// TODO: Ensure this can only be queued once
170-
if err := rb.workflowQueue.Enqueue(ctx, instance.GetInstanceID(), nil); err != nil {
171-
return errors.Wrap(err, "could not queue workflow")
168+
if _, err := rb.workflowQueue.Enqueue(ctx, instance.GetInstanceID(), nil); err != nil {
169+
if err != taskqueue.ErrTaskAlreadyInQueue {
170+
return errors.Wrap(err, "could not queue workflow")
171+
}
172172
}
173173

174174
return nil

0 commit comments

Comments
 (0)