Skip to content

Commit 97d74ed

Browse files
authored
TWEAK: speed up sidekiq pull ack (#113)
1 parent 932262b commit 97d74ed

File tree

3 files changed

+102
-40
lines changed

3 files changed

+102
-40
lines changed

sidekiq/pull.go

Lines changed: 60 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -120,16 +120,26 @@ func (q *sidekiqQueue) Pull(opt *PullOptions) error {
120120
}
121121
}()
122122

123-
pull := func() error {
124-
res, err := q.dequeueScript.Run(ctx, q.client, []string{queueNamespace},
125-
queueNamespace,
126-
queueID,
127-
).Result()
128-
if err != nil {
129-
return err
123+
res, err := q.dequeueScript.Run(ctx, q.client, []string{queueNamespace},
124+
queueNamespace,
125+
queueID,
126+
).Result()
127+
if err != nil {
128+
if errors.Is(err, redis.Nil) {
129+
return nil
130130
}
131+
return err
132+
}
133+
queue := opt.Queue
134+
if queue == nil {
135+
queue = q.RedisQueue
136+
}
137+
jobm := res.([]interface{})
138+
jobs := make([]*work.Job, len(jobm))
139+
queueIDs := make([]string, len(jobm))
140+
for i, iface := range jobm {
131141
var sqJob sidekiqJob
132-
err = json.NewDecoder(strings.NewReader(res.(string))).Decode(&sqJob)
142+
err := json.NewDecoder(strings.NewReader(iface.(string))).Decode(&sqJob)
133143
if err != nil {
134144
return err
135145
}
@@ -141,48 +151,62 @@ func (q *sidekiqQueue) Pull(opt *PullOptions) error {
141151
if err != nil {
142152
return err
143153
}
144-
queue := opt.Queue
145-
if queue == nil {
146-
queue = q.RedisQueue
154+
jobs[i] = job
155+
queueIDs[i] = FormatQueueID(sqJob.Queue, sqJob.Class)
156+
}
157+
found := make([]*work.Job, len(jobs))
158+
if finder, ok := queue.(work.BulkJobFinder); ok {
159+
jobIDs := make([]string, len(jobs))
160+
for i, job := range jobs {
161+
jobIDs[i] = job.ID
162+
}
163+
// best effort to check for duplicates
164+
foundJobs, err := finder.BulkFind(jobIDs, &work.FindOptions{
165+
Namespace: opt.Namespace,
166+
})
167+
if err != nil {
168+
return err
147169
}
148-
var found bool
149-
if finder, ok := queue.(work.BulkJobFinder); ok {
150-
// best effort to check for duplicates
151-
jobs, err := finder.BulkFind([]string{job.ID}, &work.FindOptions{
170+
found = foundJobs
171+
}
172+
if bulkEnqueuer, ok := queue.(work.BulkEnqueuer); ok {
173+
m := make(map[string][]*work.Job)
174+
for i, job := range jobs {
175+
if found[i] != nil {
176+
continue
177+
}
178+
queueID := queueIDs[i]
179+
m[queueID] = append(m[queueID], job)
180+
}
181+
for queueID, jobs := range m {
182+
err := bulkEnqueuer.BulkEnqueue(jobs, &work.EnqueueOptions{
152183
Namespace: opt.Namespace,
184+
QueueID: queueID,
153185
})
154186
if err != nil {
155187
return err
156188
}
157-
found = len(jobs) == 1 && jobs[0] != nil
158189
}
159-
if !found {
190+
} else {
191+
for i, job := range jobs {
192+
if found[i] != nil {
193+
continue
194+
}
160195
err := queue.Enqueue(job, &work.EnqueueOptions{
161196
Namespace: opt.Namespace,
162-
QueueID: FormatQueueID(sqJob.Queue, sqJob.Class),
197+
QueueID: queueIDs[i],
163198
})
164199
if err != nil {
165200
return err
166201
}
167202
}
168-
err = q.ackScript.Run(ctx, q.client, []string{queueNamespace},
169-
queueNamespace,
170-
queueID,
171-
res.(string),
172-
).Err()
173-
if err != nil {
174-
return err
175-
}
176-
return nil
177203
}
178-
179-
for {
180-
err := pull()
181-
if err != nil {
182-
if errors.Is(err, redis.Nil) {
183-
return nil
184-
}
185-
return err
186-
}
204+
err = q.ackScript.Run(ctx, q.client, []string{queueNamespace},
205+
queueNamespace,
206+
queueID,
207+
).Err()
208+
if err != nil {
209+
return err
187210
}
211+
return nil
188212
}

sidekiq/pull_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,3 +520,43 @@ func TestPullDequeueHeartbeat(t *testing.T) {
520520
require.Equal(t, "123", z[0].Member)
521521
require.EqualValues(t, now.Unix()+100, z[0].Score)
522522
}
523+
524+
func TestPullDequeueAck(t *testing.T) {
525+
client := redistest.NewClient()
526+
defer client.Close()
527+
require.NoError(t, redistest.Reset(client))
528+
529+
jobIDs := []string{"83b27ea26dd65821239ca6aa", "ebb3186ec09c42642f980a20", "7e59de95478191698aa69b22"}
530+
for _, jobID := range jobIDs {
531+
err := client.LPush(context.Background(), pullTestSidekiqQueueKey, fmt.Sprintf(`{"class":"TestWorker","args":[],"retry":3,"queue":"default","backtrace":true,"jid":%q,"created_at":1567788641.0875323,"enqueued_at":1567788642.0879307,"retry_count":2,"error_message":"error: test","error_class":"StandardError","failed_at":1567791043,"retried_at":1567791046}"`, jobID)).Err()
532+
require.NoError(t, err)
533+
}
534+
535+
q := NewQueue(client)
536+
537+
res, err := q.(*sidekiqQueue).dequeueScript.Run(context.Background(), client, []string{pullTestNamespace},
538+
pullTestSidekiqNamespace,
539+
fmt.Sprintf("queue:%s", pullTestSidekiqQueue),
540+
).Result()
541+
require.NoError(t, err)
542+
543+
jobm := res.([]interface{})
544+
require.Len(t, jobm, 3)
545+
require.Contains(t, jobm[0], "7e59de95478191698aa69b22")
546+
require.Contains(t, jobm[1], "ebb3186ec09c42642f980a20")
547+
require.Contains(t, jobm[2], "83b27ea26dd65821239ca6aa")
548+
549+
count, err := client.LLen(context.Background(), pullTestSidekiqQueueKey).Result()
550+
require.NoError(t, err)
551+
require.Equal(t, int64(3), count)
552+
553+
err = q.(*sidekiqQueue).ackScript.Run(context.Background(), client, []string{pullTestNamespace},
554+
pullTestSidekiqNamespace,
555+
fmt.Sprintf("queue:%s", pullTestSidekiqQueue),
556+
).Err()
557+
require.NoError(t, err)
558+
559+
count, err = client.LLen(context.Background(), pullTestSidekiqQueueKey).Result()
560+
require.NoError(t, err)
561+
require.Equal(t, int64(0), count)
562+
}

sidekiq/queue.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,16 +98,15 @@ func NewQueue(client redis.UniversalClient) Queue {
9898
local queue_id = ARGV[2]
9999
100100
local queue_key = table.concat({queue_ns, queue_id}, ":")
101-
return redis.call("lindex", queue_key, -1)
101+
return redis.call("lrange", queue_key, 0, -1)
102102
`)
103103

104104
ackScript := redis.NewScript(`
105105
local queue_ns = ARGV[1]
106106
local queue_id = ARGV[2]
107-
local jobm = ARGV[3]
108107
109108
local queue_key = table.concat({queue_ns, queue_id}, ":")
110-
return redis.call("lrem", queue_key, -1, jobm)
109+
return redis.call("del", queue_key)
111110
`)
112111

113112
dequeueStartScript := redis.NewScript(`
@@ -156,7 +155,6 @@ func NewQueue(client redis.UniversalClient) Queue {
156155
local puller_queue_key = table.concat({queue_ns, queue_id}, ":")
157156
local pullers_key = table.concat({queue_ns, "pullers"}, ":")
158157
if redis.call("llen", puller_queue_key) == 0 then
159-
redis.call("del", puller_queue_key)
160158
return redis.call("zrem", pullers_key, queue_id)
161159
end
162160
return 0

0 commit comments

Comments
 (0)