Skip to content

Commit e9712fc

Browse files
authored
FEATURE: more efficient sidekiq pull (#105)
1 parent 0990fd1 commit e9712fc

File tree

4 files changed

+686
-198
lines changed

4 files changed

+686
-198
lines changed

sidekiq/pull.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package sidekiq
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"strings"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
"github.com/redis/go-redis/v9"
13+
"github.com/taylorchu/work"
14+
)
15+
16+
// JobPuller pulls jobs from sidekiq-compatible queue.
17+
type JobPuller interface {
18+
Pull(*PullOptions) error
19+
}
20+
21+
// PullOptions specifies how a job is pulled from sidekiq-compatible queue.
22+
type PullOptions struct {
23+
// work-compatible namespace
24+
Namespace string
25+
// optional work-compatible queue
26+
// This allows moving jobs to another redis instance. Without this, these jobs are moved
27+
// within the same sidekiq redis instance.
28+
Queue work.Queue
29+
// sidekiq-compatible namespace
30+
// Only used by https://github.com/resque/redis-namespace. By default, it is empty.
31+
SidekiqNamespace string
32+
// sidekiq-compatible queue like `default`.
33+
SidekiqQueue string
34+
}
35+
36+
// Validate validates PullOptions.
37+
func (opt *PullOptions) Validate() error {
38+
if opt.Namespace == "" {
39+
return work.ErrEmptyNamespace
40+
}
41+
if opt.SidekiqQueue == "" {
42+
return work.ErrEmptyQueueID
43+
}
44+
return nil
45+
}
46+
47+
// Pull moves jobs from sidekiq-compatible queue into work-compatible queue.
48+
func (q *sidekiqQueue) Pull(opt *PullOptions) error {
49+
err := opt.Validate()
50+
if err != nil {
51+
return err
52+
}
53+
queueNamespace := fmt.Sprintf("%s:sidekiq-queue-pull:%s", opt.SidekiqNamespace, opt.SidekiqQueue)
54+
queueID := uuid.NewString()
55+
56+
ctx, cancel := context.WithCancel(context.Background())
57+
defer cancel()
58+
59+
expireInSec := 10
60+
refreshInSec := 2
61+
err = q.dequeueStartScript.Run(ctx, q.client, nil,
62+
opt.SidekiqNamespace,
63+
opt.SidekiqQueue,
64+
queueNamespace,
65+
queueID,
66+
time.Now().Unix(),
67+
expireInSec,
68+
).Err()
69+
if err != nil {
70+
return err
71+
}
72+
defer func() error {
73+
return q.dequeueStopScript.Run(ctx, q.client, nil,
74+
queueNamespace,
75+
queueID,
76+
).Err()
77+
}()
78+
go func() {
79+
defer cancel()
80+
for {
81+
select {
82+
case <-ctx.Done():
83+
return
84+
case <-time.After(time.Duration(refreshInSec) * time.Second):
85+
err := q.dequeueHeartbeatScript.Run(ctx, q.client, nil,
86+
queueNamespace,
87+
queueID,
88+
time.Now().Unix(),
89+
expireInSec,
90+
).Err()
91+
if err != nil {
92+
return
93+
}
94+
}
95+
}
96+
}()
97+
98+
pull := func() error {
99+
res, err := q.dequeueScript.Run(ctx, q.client, nil,
100+
queueNamespace,
101+
queueID,
102+
).Result()
103+
if err != nil {
104+
return err
105+
}
106+
var sqJob sidekiqJob
107+
err = json.NewDecoder(strings.NewReader(res.(string))).Decode(&sqJob)
108+
if err != nil {
109+
return err
110+
}
111+
err = sqJob.Validate()
112+
if err != nil {
113+
return err
114+
}
115+
job, err := newJob(&sqJob)
116+
if err != nil {
117+
return err
118+
}
119+
queue := opt.Queue
120+
if queue == nil {
121+
queue = q.RedisQueue
122+
}
123+
var found bool
124+
if finder, ok := queue.(work.BulkJobFinder); ok {
125+
// best effort to check for duplicates
126+
jobs, err := finder.BulkFind([]string{job.ID}, &work.FindOptions{
127+
Namespace: opt.Namespace,
128+
})
129+
if err != nil {
130+
return err
131+
}
132+
found = len(jobs) == 1 && jobs[0] != nil
133+
}
134+
if !found {
135+
err := queue.Enqueue(job, &work.EnqueueOptions{
136+
Namespace: opt.Namespace,
137+
QueueID: FormatQueueID(sqJob.Queue, sqJob.Class),
138+
})
139+
if err != nil {
140+
return err
141+
}
142+
}
143+
err = q.ackScript.Run(ctx, q.client, nil,
144+
queueNamespace,
145+
queueID,
146+
res.(string),
147+
).Err()
148+
if err != nil {
149+
return err
150+
}
151+
return nil
152+
}
153+
154+
for {
155+
err := pull()
156+
if err != nil {
157+
if errors.Is(err, redis.Nil) {
158+
return nil
159+
}
160+
return err
161+
}
162+
}
163+
}

0 commit comments

Comments
 (0)