Skip to content

Commit 0cbfbd1

Browse files
authored
chore(worker): remove disable consumer flag (#13)
1 parent 476fc66 commit 0cbfbd1

File tree

3 files changed

+21
-49
lines changed

3 files changed

+21
-49
lines changed

nats.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type Worker struct {
2222
exit chan struct{}
2323
stopFlag int32
2424
stopOnce sync.Once
25+
startOnce sync.Once
2526
opts options
2627
subscription *nats.Subscription
2728
tasks chan *nats.Msg
@@ -49,24 +50,22 @@ func NewWorker(opts ...Option) *Worker {
4950
return w
5051
}
5152

52-
func (w *Worker) startConsumer() error {
53-
if w.opts.disableConsumer {
54-
return nil
55-
}
56-
var err error
57-
w.subscription, err = w.client.QueueSubscribe(w.opts.subj, w.opts.queue, func(msg *nats.Msg) {
58-
select {
59-
case w.tasks <- msg:
60-
case <-w.stop:
61-
if msg != nil {
62-
// re-queue the task if worker has been shutdown.
63-
w.opts.logger.Info("re-queue the current task")
64-
if err := w.client.Publish(w.opts.subj, msg.Data); err != nil {
65-
w.opts.logger.Errorf("error to re-queue the current task: %s", err.Error())
53+
func (w *Worker) startConsumer() (err error) {
54+
w.startOnce.Do(func() {
55+
w.subscription, err = w.client.QueueSubscribe(w.opts.subj, w.opts.queue, func(msg *nats.Msg) {
56+
select {
57+
case w.tasks <- msg:
58+
case <-w.stop:
59+
if msg != nil {
60+
// re-queue the task if worker has been shutdown.
61+
w.opts.logger.Info("re-queue the current task")
62+
if err := w.client.Publish(w.opts.subj, msg.Data); err != nil {
63+
w.opts.logger.Errorf("error to re-queue the current task: %s", err.Error())
64+
}
6665
}
66+
close(w.exit)
6767
}
68-
close(w.exit)
69-
}
68+
})
7069
})
7170

7271
return err
@@ -165,6 +164,7 @@ func (w *Worker) Queue(job core.QueuedMessage) error {
165164

166165
// Request a new task
167166
func (w *Worker) Request() (core.QueuedMessage, error) {
167+
_ = w.startConsumer()
168168
clock := 0
169169
loop:
170170
for {

nats_test.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -372,23 +372,3 @@ func TestReQueueTaskInWorkerBeforeShutdown(t *testing.T) {
372372
// see "re-queue the current task" message
373373
assert.NoError(t, w.Shutdown())
374374
}
375-
376-
func TestWithDisableConsumer(t *testing.T) {
377-
job := queue.Job{
378-
Payload: []byte("foo"),
379-
}
380-
w := NewWorker(
381-
WithAddr(host+":4222"),
382-
WithSubj("test02"),
383-
WithQueue("test02"),
384-
WithDisableConsumer(),
385-
)
386-
387-
assert.NoError(t, w.Queue(job))
388-
assert.NoError(t, w.Queue(job))
389-
assert.NoError(t, w.Queue(job))
390-
time.Sleep(100 * time.Millisecond)
391-
assert.Equal(t, 0, len(w.tasks))
392-
// see "re-queue the old job" message
393-
assert.NoError(t, w.Shutdown())
394-
}

options.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,11 @@ import (
1111
type Option func(*options)
1212

1313
type options struct {
14-
runFunc func(context.Context, core.QueuedMessage) error
15-
logger queue.Logger
16-
addr string
17-
subj string
18-
queue string
19-
disableConsumer bool
14+
runFunc func(context.Context, core.QueuedMessage) error
15+
logger queue.Logger
16+
addr string
17+
subj string
18+
queue string
2019
}
2120

2221
// WithAddr setup the addr of NATS
@@ -54,13 +53,6 @@ func WithLogger(l queue.Logger) Option {
5453
}
5554
}
5655

57-
// WithDisableConsumer disable consumer
58-
func WithDisableConsumer() Option {
59-
return func(w *options) {
60-
w.disableConsumer = true
61-
}
62-
}
63-
6456
func newOptions(opts ...Option) options {
6557
defaultOpts := options{
6658
addr: "127.0.0.1:4222",

0 commit comments

Comments
 (0)