Skip to content

Commit 2ed849c

Browse files
authored
XACK before XDEL to clear pending
1 parent 77f0172 commit 2ed849c

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

backend/redis/taskqueue/queue.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package taskqueue
33
import (
44
"context"
55
"encoding/json"
6+
"log"
67
"time"
78

89
"github.com/go-redis/redis/v8"
@@ -72,8 +73,7 @@ var enqueueCmd = redis.NewScript(`
7273
return nil
7374
end
7475
75-
local task_id = redis.call("xadd", KEYS[2], "*", "id", ARGV[1], "data", ARGV[2])
76-
return task_id
76+
return redis.call("xadd", KEYS[2], "*", "id", ARGV[1], "data", ARGV[2])
7777
`)
7878

7979
func (q *taskQueue[T]) Enqueue(ctx context.Context, id string, data *T) (*string, error) {
@@ -148,17 +148,20 @@ func (q *taskQueue[T]) Extend(ctx context.Context, taskID string) error {
148148
// KEYS[1] = set
149149
// KEYS[2] = stream
150150
// ARGV[1] = task id
151+
// ARGV[2] = group
152+
// We have to XACK _and_ XDEL here. See https://github.com/redis/redis/issues/5754
151153
var completeCmd = redis.NewScript(`
152154
local task = redis.call("XRANGE", KEYS[2], ARGV[1], ARGV[1])
153155
local id = task[1][2][2]
154156
redis.call("SREM", KEYS[1], id)
157+
redis.call("XACK", KEYS[2], ARGV[2], ARGV[1])
155158
return redis.call("XDEL", KEYS[2], ARGV[1])
156159
`)
157160

158161
func (q *taskQueue[T]) Complete(ctx context.Context, taskID string) error {
159162
// Delete the task here. Overall we'll keep the stream at a small size, so fragmentation
160163
// is not an issue for us.
161-
c, err := completeCmd.Run(ctx, q.rdb, []string{q.setKey, q.streamKey}, taskID).Result()
164+
c, err := completeCmd.Run(ctx, q.rdb, []string{q.setKey, q.streamKey}, taskID, q.groupName).Result()
162165
if err != nil && err != redis.Nil {
163166
return errors.Wrap(err, "could not complete task")
164167
}
@@ -167,6 +170,8 @@ func (q *taskQueue[T]) Complete(ctx context.Context, taskID string) error {
167170
return errors.New("could find task to complete")
168171
}
169172

173+
log.Println("Completing activity task", taskID)
174+
170175
return nil
171176
}
172177

0 commit comments

Comments
 (0)