Skip to content

Commit d4c7d21

Browse files
authored
feat(sentinel): Add support for Redis Sentinel (#29)
1 parent bbafc78 commit d4c7d21

File tree

3 files changed

+54
-0
lines changed

3 files changed

+54
-0
lines changed

options.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ type options struct {
2020
channelName string
2121
channelSize int
2222
cluster bool
23+
sentinel bool
24+
masterName string
2325
}
2426

2527
// WithAddr setup the addr of redis
@@ -43,6 +45,20 @@ func WithCluster(enable bool) Option {
4345
}
4446
}
4547

48+
// WithSentinel redis sentinel
49+
func WithSentinel(enable bool) Option {
50+
return func(w *options) {
51+
w.sentinel = enable
52+
}
53+
}
54+
55+
// WithMasterName sentinel master name
56+
func WithMasterName(masterName string) Option {
57+
return func(w *options) {
58+
w.masterName = masterName
59+
}
60+
}
61+
4662
// WithChannelSize redis channel size
4763
func WithChannelSize(size int) Option {
4864
return func(w *options) {

redis.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ func NewWorker(opts ...Option) *Worker {
4949
Addrs: strings.Split(w.opts.addr, ","),
5050
Password: w.opts.password,
5151
})
52+
} else if w.opts.sentinel {
53+
w.rdb = redis.NewFailoverClient(&redis.FailoverOptions{
54+
MasterName: w.opts.masterName,
55+
SentinelAddrs: strings.Split(w.opts.addr, ","),
56+
Password: w.opts.password,
57+
DB: w.opts.db,
58+
})
5259
} else {
5360
options := &redis.Options{
5461
Addr: w.opts.addr,

redis_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,37 @@ func TestRedisCluster(t *testing.T) {
130130
// you will see the execute time > 1000ms
131131
}
132132

133+
func TestRedisSentinel(t *testing.T) {
134+
t.Helper()
135+
m := &mockMessage{
136+
Message: "foo",
137+
}
138+
hosts := []string{host + ":26379", host + ":26380"}
139+
140+
w := NewWorker(
141+
WithAddr(strings.Join(hosts, ",")),
142+
WithMasterName("mymaster"),
143+
WithChannel("testSentinel"),
144+
WithSentinel(true),
145+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
146+
time.Sleep(500 * time.Millisecond)
147+
return nil
148+
}),
149+
)
150+
q := queue.NewPool(
151+
5,
152+
queue.WithWorker(w),
153+
)
154+
time.Sleep(100 * time.Millisecond)
155+
assert.NoError(t, q.Queue(m))
156+
assert.NoError(t, q.Queue(m))
157+
assert.NoError(t, q.Queue(m))
158+
assert.NoError(t, q.Queue(m))
159+
time.Sleep(1000 * time.Millisecond)
160+
q.Release()
161+
// you will see the execute time > 1000ms
162+
}
163+
133164
func TestEnqueueJobAfterShutdown(t *testing.T) {
134165
m := mockMessage{
135166
Message: "foo",

0 commit comments

Comments
 (0)