Skip to content

Commit 35c03df

Browse files
authored
chore: connect redis channel in initial mode. (#4)
1 parent 77eb8e7 commit 35c03df

File tree

1 file changed

+20
-28
lines changed

1 file changed

+20
-28
lines changed

redis.go

Lines changed: 20 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ type Worker struct {
2222
// redis config
2323
rdb *redis.Client
2424
pubsub *redis.PubSub
25+
channel <-chan *redis.Message
2526
addr string
2627
db int
2728
connectionString string
2829
password string
29-
channel string
30+
channelName string
3031
channelSize int
3132

3233
stopOnce sync.Once
@@ -75,7 +76,7 @@ func WithConnectionString(connectionString string) Option {
7576
// WithChannel setup the channel of redis
7677
func WithChannel(channel string) Option {
7778
return func(w *Worker) {
78-
w.channel = channel
79+
w.channelName = channel
7980
}
8081
}
8182

@@ -98,7 +99,7 @@ func NewWorker(opts ...Option) *Worker {
9899
var err error
99100
w := &Worker{
100101
addr: "127.0.0.1:6379",
101-
channel: "queue",
102+
channelName: "queue",
102103
channelSize: 1024,
103104
stop: make(chan struct{}),
104105
logger: queue.NewLogger(),
@@ -138,7 +139,19 @@ func NewWorker(opts ...Option) *Worker {
138139
w.rdb = rdb
139140

140141
ctx := context.Background()
141-
w.pubsub = w.rdb.Subscribe(ctx, w.channel)
142+
w.pubsub = w.rdb.Subscribe(ctx, w.channelName)
143+
144+
var ropts []redis.ChannelOption
145+
146+
if w.channelSize > 1 {
147+
ropts = append(ropts, redis.WithChannelSize(w.channelSize))
148+
}
149+
150+
w.channel = w.pubsub.Channel(ropts...)
151+
// make sure the connection is successful
152+
if err := w.pubsub.Ping(ctx); err != nil {
153+
w.logger.Fatal(err)
154+
}
142155

143156
return w
144157
}
@@ -247,7 +260,7 @@ func (w *Worker) Queue(job queue.QueuedMessage) error {
247260
ctx := context.Background()
248261

249262
// Publish a message.
250-
err := w.rdb.Publish(ctx, w.channel, job.Bytes()).Err()
263+
err := w.rdb.Publish(ctx, w.channelName, job.Bytes()).Err()
251264
if err != nil {
252265
return err
253266
}
@@ -257,27 +270,6 @@ func (w *Worker) Queue(job queue.QueuedMessage) error {
257270

258271
// Run start the worker
259272
func (w *Worker) Run() error {
260-
// check queue status
261-
select {
262-
case <-w.stop:
263-
return nil
264-
default:
265-
}
266-
267-
var options []redis.ChannelOption
268-
ctx := context.Background()
269-
270-
if w.channelSize > 1 {
271-
options = append(options, redis.WithChannelSize(w.channelSize))
272-
}
273-
274-
ch := w.pubsub.Channel(options...)
275-
// make sure the connection is successful
276-
err := w.pubsub.Ping(ctx)
277-
if err != nil {
278-
return err
279-
}
280-
281273
for {
282274
// check queue status
283275
select {
@@ -287,15 +279,15 @@ func (w *Worker) Run() error {
287279
}
288280

289281
select {
290-
case m, ok := <-ch:
282+
case m, ok := <-w.channel:
291283
select {
292284
case <-w.stop:
293285
return nil
294286
default:
295287
}
296288

297289
if !ok {
298-
return fmt.Errorf("redis pubsub: channel=%s closed", w.channel)
290+
return fmt.Errorf("redis pubsub: channel=%s closed", w.channelName)
299291
}
300292

301293
var data queue.Job

0 commit comments

Comments
 (0)