Skip to content

Commit 17b0b69

Browse files
authored
TWEAK: limit script args with small batch (#119)
1 parent c0f6eb1 commit 17b0b69

File tree

8 files changed

+221
-1486
lines changed

8 files changed

+221
-1486
lines changed

redis_queue.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,19 @@ import (
1010
"github.com/redis/go-redis/v9"
1111
)
1212

13+
func batchSlice(n int) [][]int {
14+
const size = 1000
15+
var batches [][]int
16+
for i := 0; i < n; i += size {
17+
j := i + size
18+
if j > n {
19+
j = n
20+
}
21+
batches = append(batches, []int{i, j})
22+
}
23+
return batches
24+
}
25+
1326
type redisQueue struct {
1427
client redis.UniversalClient
1528

@@ -170,6 +183,16 @@ func (q *redisQueue) Enqueue(job *Job, opt *EnqueueOptions) error {
170183
}
171184

172185
func (q *redisQueue) BulkEnqueue(jobs []*Job, opt *EnqueueOptions) error {
186+
for _, batch := range batchSlice(len(jobs)) {
187+
err := q.bulkEnqueueSmallBatch(jobs[batch[0]:batch[1]], opt)
188+
if err != nil {
189+
return err
190+
}
191+
}
192+
return nil
193+
}
194+
195+
func (q *redisQueue) bulkEnqueueSmallBatch(jobs []*Job, opt *EnqueueOptions) error {
173196
err := opt.Validate()
174197
if err != nil {
175198
return err
@@ -201,6 +224,18 @@ func (q *redisQueue) Dequeue(opt *DequeueOptions) (*Job, error) {
201224
}
202225

203226
func (q *redisQueue) BulkDequeue(count int64, opt *DequeueOptions) ([]*Job, error) {
227+
var jobs []*Job
228+
for _, batch := range batchSlice(int(count)) {
229+
foundJobs, err := q.bulkDequeueSmallBatch(int64(batch[1]-batch[0]), opt)
230+
if err != nil {
231+
return nil, err
232+
}
233+
jobs = append(jobs, foundJobs...)
234+
}
235+
return jobs, nil
236+
}
237+
238+
func (q *redisQueue) bulkDequeueSmallBatch(count int64, opt *DequeueOptions) ([]*Job, error) {
204239
err := opt.Validate()
205240
if err != nil {
206241
return nil, err
@@ -236,6 +271,16 @@ func (q *redisQueue) Ack(job *Job, opt *AckOptions) error {
236271
}
237272

238273
func (q *redisQueue) BulkAck(jobs []*Job, opt *AckOptions) error {
274+
for _, batch := range batchSlice(len(jobs)) {
275+
err := q.bulkAckSmallBatch(jobs[batch[0]:batch[1]], opt)
276+
if err != nil {
277+
return err
278+
}
279+
}
280+
return nil
281+
}
282+
283+
func (q *redisQueue) bulkAckSmallBatch(jobs []*Job, opt *AckOptions) error {
239284
err := opt.Validate()
240285
if err != nil {
241286
return err
@@ -253,6 +298,18 @@ func (q *redisQueue) BulkAck(jobs []*Job, opt *AckOptions) error {
253298
}
254299

255300
func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) {
301+
var jobs []*Job
302+
for _, batch := range batchSlice(len(jobIDs)) {
303+
foundJobs, err := q.bulkFindSmallBatch(jobIDs[batch[0]:batch[1]], opt)
304+
if err != nil {
305+
return nil, err
306+
}
307+
jobs = append(jobs, foundJobs...)
308+
}
309+
return jobs, nil
310+
}
311+
312+
func (q *redisQueue) bulkFindSmallBatch(jobIDs []string, opt *FindOptions) ([]*Job, error) {
256313
err := opt.Validate()
257314
if err != nil {
258315
return nil, err

redis_queue_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,3 +327,27 @@ func TestRedisQueueGetQueueMetrics(t *testing.T) {
327327
require.EqualValues(t, 1, m.ScheduledTotal)
328328
require.True(t, m.Latency == 0)
329329
}
330+
331+
func TestRedisQueueBulkEnqueue(t *testing.T) {
332+
client := redistest.NewClient()
333+
defer client.Close()
334+
require.NoError(t, redistest.Reset(client))
335+
q := NewRedisQueue(client)
336+
337+
const jobCount = 100000
338+
var jobs []*Job
339+
for i := 0; i < jobCount; i++ {
340+
job := NewJob()
341+
jobs = append(jobs, job)
342+
}
343+
344+
err := q.BulkEnqueue(jobs, &EnqueueOptions{
345+
Namespace: "{ns1}",
346+
QueueID: "q1",
347+
})
348+
require.NoError(t, err)
349+
350+
count, err := client.ZCard(context.Background(), "{ns1}:queue:q1").Result()
351+
require.NoError(t, err)
352+
require.Equal(t, int64(jobCount), count)
353+
}

sidekiq/external_queue.go

Lines changed: 0 additions & 94 deletions
This file was deleted.

sidekiq/job.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bytes"
55
"encoding/json"
66
"errors"
7-
"time"
87

98
"github.com/taylorchu/work"
109
)
@@ -82,22 +81,3 @@ func newSidekiqJob(job *work.Job, sqQueue, sqClass string) (*sidekiqJob, error)
8281
}
8382
return &sqJob, nil
8483
}
85-
86-
func newJob(sqJob *sidekiqJob) (*work.Job, error) {
87-
updatedAt := sqJob.CreatedAt
88-
for _, ts := range []float64{sqJob.FailedAt, sqJob.RetriedAt} {
89-
if ts > updatedAt {
90-
updatedAt = ts
91-
}
92-
}
93-
job := work.Job{
94-
ID: sqJob.ID,
95-
Payload: sqJob.Args,
96-
CreatedAt: time.Unix(int64(sqJob.CreatedAt), 0),
97-
UpdatedAt: time.Unix(int64(updatedAt), 0),
98-
EnqueuedAt: time.Unix(int64(sqJob.EnqueuedAt), 0),
99-
Retries: sqJob.RetryCount,
100-
LastError: sqJob.ErrorMessage,
101-
}
102-
return &job, nil
103-
}

0 commit comments

Comments
 (0)