@@ -20,6 +20,7 @@ type Option func(*Worker)
20
20
type Worker struct {
21
21
// redis config
22
22
rdb * redis.Client
23
+ pubsub * redis.Pubsub
23
24
addr string
24
25
db int
25
26
connectionString string
@@ -95,10 +96,11 @@ func WithLogger(l queue.Logger) Option {
95
96
func NewWorker (opts ... Option ) * Worker {
96
97
var err error
97
98
w := & Worker {
98
- addr : "127.0.0.1:6379" ,
99
- channel : "queue" ,
100
- stop : make (chan struct {}),
101
- logger : queue .NewLogger (),
99
+ addr : "127.0.0.1:6379" ,
100
+ channel : "queue" ,
101
+ channelSize : 1024 ,
102
+ stop : make (chan struct {}),
103
+ logger : queue .NewLogger (),
102
104
runFunc : func (context.Context , queue.QueuedMessage ) error {
103
105
return nil
104
106
},
@@ -134,6 +136,9 @@ func NewWorker(opts ...Option) *Worker {
134
136
135
137
w .rdb = rdb
136
138
139
+ ctx := context .Background ()
140
+ w .pubsub = w .rdb .Subscribe (ctx , w .channel )
141
+
137
142
return w
138
143
}
139
144
@@ -199,6 +204,7 @@ func (s *Worker) Shutdown() error {
199
204
}
200
205
201
206
s .stopOnce .Do (func () {
207
+ s .pubsub .Close ()
202
208
s .rdb .Close ()
203
209
close (s .stop )
204
210
})
@@ -221,6 +227,20 @@ func (s *Worker) Queue(job queue.QueuedMessage) error {
221
227
return queue .ErrQueueShutdown
222
228
}
223
229
230
+ ctx := context .Background ()
231
+
232
+ // Wait for confirmation that subscription is created before publishing anything.
233
+ _ , err := s .pubsub .Receive (ctx )
234
+ if err != nil {
235
+ return err
236
+ }
237
+
238
+ // Publish a message.
239
+ err = s .rdb .Publish (ctx , s .channel , job ).Err ()
240
+ if err != nil {
241
+ return err
242
+ }
243
+
224
244
return nil
225
245
}
226
246
0 commit comments