Skip to content

Commit 9b9c869

Browse files
authored
feat(worker): re-queue the task before shutdown the worker (#3)
re-queue the task in the flight before shutdown the worker
1 parent bc48e0c commit 9b9c869

File tree

2 files changed

+24
-8
lines changed

2 files changed

+24
-8
lines changed

_example/server-client/server/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ func main() {
2929
w := redisdb.NewWorker(
3030
redisdb.WithAddr("127.0.0.1:6379"),
3131
redisdb.WithStreamName("foobar"),
32-
redisdb.WithDisableConsumer(),
3332
)
3433

3534
// define the queue

redis.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type Worker struct {
2525
stopOnce sync.Once
2626
startOnce sync.Once
2727
stop chan struct{}
28+
exit chan struct{}
2829
opts options
2930
}
3031

@@ -34,6 +35,7 @@ func NewWorker(opts ...Option) *Worker {
3435
w := &Worker{
3536
opts: newOptions(opts...),
3637
stop: make(chan struct{}),
38+
exit: make(chan struct{}),
3739
tasks: make(chan redis.XMessage),
3840
}
3941

@@ -116,6 +118,10 @@ func (w *Worker) fetchTask() {
116118
case <-w.stop:
117119
// Todo: re-queue the task
118120
w.opts.logger.Info("re-queue the task: ", message.ID)
121+
if err := w.queue(message.Values); err != nil {
122+
w.opts.logger.Error("error to re-queue the task: ", message.ID)
123+
}
124+
close(w.exit)
119125
return
120126
}
121127
}
@@ -178,6 +184,13 @@ func (w *Worker) Shutdown() error {
178184

179185
w.stopOnce.Do(func() {
180186
close(w.stop)
187+
188+
// wait requeue
189+
select {
190+
case <-w.exit:
191+
case <-time.After(200 * time.Millisecond):
192+
}
193+
181194
switch v := w.rdb.(type) {
182195
case *redis.Client:
183196
v.Close()
@@ -189,25 +202,29 @@ func (w *Worker) Shutdown() error {
189202
return nil
190203
}
191204

192-
// Queue send notification to queue
193-
func (w *Worker) Queue(task core.QueuedMessage) error {
194-
if atomic.LoadInt32(&w.stopFlag) == 1 {
195-
return queue.ErrQueueShutdown
196-
}
197-
205+
func (w *Worker) queue(data interface{}) error {
198206
ctx := context.Background()
199207

200208
// Publish a message.
201209
err := w.rdb.XAdd(ctx, &redis.XAddArgs{
202210
Stream: w.opts.streamName,
203211
MaxLen: 0,
204212
MaxLenApprox: 0,
205-
Values: map[string]interface{}{"body": BytesToStr(task.Bytes())},
213+
Values: data,
206214
}).Err()
207215

208216
return err
209217
}
210218

219+
// Queue send notification to queue
220+
func (w *Worker) Queue(task core.QueuedMessage) error {
221+
if atomic.LoadInt32(&w.stopFlag) == 1 {
222+
return queue.ErrQueueShutdown
223+
}
224+
225+
return w.queue(map[string]interface{}{"body": BytesToStr(task.Bytes())})
226+
}
227+
211228
// Run start the worker
212229
func (w *Worker) Run(task core.QueuedMessage) error {
213230
data, _ := task.(queue.Job)

0 commit comments

Comments
 (0)