Skip to content

Commit 0e51823

Browse files
authored
feat(queue): Re-Queue Task Before Shutdown worker (#11)
1 parent 8b586d6 commit 0e51823

File tree

2 files changed

+42
-7
lines changed

2 files changed

+42
-7
lines changed

nats.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ var _ queue.Worker = (*Worker)(nil)
1616

1717
// Worker for NSQ
1818
type Worker struct {
19-
client *nats.Conn
20-
stop chan struct{}
21-
stopFlag int32
22-
stopOnce sync.Once
23-
opts options
24-
tasks chan *nats.Msg
19+
client *nats.Conn
20+
stop chan struct{}
21+
exit chan struct{}
22+
stopFlag int32
23+
stopOnce sync.Once
24+
opts options
25+
subscription *nats.Subscription
26+
tasks chan *nats.Msg
2527
}
2628

2729
// NewWorker for struc
@@ -30,6 +32,7 @@ func NewWorker(opts ...Option) *Worker {
3032
w := &Worker{
3133
opts: newOptions(opts...),
3234
stop: make(chan struct{}),
35+
exit: make(chan struct{}),
3336
tasks: make(chan *nats.Msg, 1),
3437
}
3538

@@ -46,13 +49,18 @@ func NewWorker(opts ...Option) *Worker {
4649
}
4750

4851
func (w *Worker) startConsumer() error {
49-
_, err := w.client.QueueSubscribe(w.opts.subj, w.opts.queue, func(msg *nats.Msg) {
52+
var err error
53+
w.subscription, err = w.client.QueueSubscribe(w.opts.subj, w.opts.queue, func(msg *nats.Msg) {
5054
select {
5155
case w.tasks <- msg:
5256
case <-w.stop:
5357
if msg != nil {
5458
// re-queue the job if worker has been shutdown.
5559
w.opts.logger.Info("re-queue the old job")
60+
if err := w.client.Publish(w.opts.subj, msg.Data); err != nil {
61+
panic(err)
62+
}
63+
close(w.exit)
5664
}
5765
}
5866
})
@@ -125,7 +133,12 @@ func (w *Worker) Shutdown() error {
125133
}
126134

127135
w.stopOnce.Do(func() {
136+
_ = w.subscription.Unsubscribe()
128137
close(w.stop)
138+
select {
139+
case <-w.exit:
140+
case <-time.After(50 * time.Millisecond):
141+
}
129142
w.client.Close()
130143
close(w.tasks)
131144
})

nats_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,3 +355,25 @@ func TestJobComplete(t *testing.T) {
355355
assert.Error(t, err)
356356
assert.Equal(t, errors.New("job completed"), err)
357357
}
358+
359+
func TestReQueueTaskInWorkerBeforeShutdown(t *testing.T) {
360+
job := queue.Job{
361+
Payload: []byte("foo"),
362+
}
363+
w := NewWorker(
364+
WithAddr(host+":4222"),
365+
WithSubj("test02"),
366+
WithQueue("test02"),
367+
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
368+
log.Println(string(m.Bytes()))
369+
return nil
370+
}),
371+
)
372+
373+
assert.NoError(t, w.Queue(job))
374+
assert.NoError(t, w.Queue(job))
375+
assert.NoError(t, w.Queue(job))
376+
time.Sleep(500 * time.Millisecond)
377+
// see "re-queue the old job" message
378+
assert.NoError(t, w.Shutdown())
379+
}

0 commit comments

Comments
 (0)