Skip to content

Commit 6de9f4c

Browse files
authored
feat(worker): support disable consumer (#12)
1 parent 0e51823 commit 6de9f4c

File tree

3 files changed

+36
-9
lines changed

3 files changed

+36
-9
lines changed

nats.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ func NewWorker(opts ...Option) *Worker {
4949
}
5050

5151
func (w *Worker) startConsumer() error {
52+
if w.opts.disableConsumer {
53+
return nil
54+
}
5255
var err error
5356
w.subscription, err = w.client.QueueSubscribe(w.opts.subj, w.opts.queue, func(msg *nats.Msg) {
5457
select {

nats_test.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -364,10 +364,6 @@ func TestReQueueTaskInWorkerBeforeShutdown(t *testing.T) {
364364
WithAddr(host+":4222"),
365365
WithSubj("test02"),
366366
WithQueue("test02"),
367-
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
368-
log.Println(string(m.Bytes()))
369-
return nil
370-
}),
371367
)
372368

373369
assert.NoError(t, w.Queue(job))
@@ -377,3 +373,23 @@ func TestReQueueTaskInWorkerBeforeShutdown(t *testing.T) {
377373
// see "re-queue the old job" message
378374
assert.NoError(t, w.Shutdown())
379375
}
376+
377+
func TestWithDisableConsumer(t *testing.T) {
378+
job := queue.Job{
379+
Payload: []byte("foo"),
380+
}
381+
w := NewWorker(
382+
WithAddr(host+":4222"),
383+
WithSubj("test02"),
384+
WithQueue("test02"),
385+
WithDisableConsumer(),
386+
)
387+
388+
assert.NoError(t, w.Queue(job))
389+
assert.NoError(t, w.Queue(job))
390+
assert.NoError(t, w.Queue(job))
391+
time.Sleep(100 * time.Millisecond)
392+
assert.Equal(t, 0, len(w.tasks))
393+
// see "re-queue the old job" message
394+
assert.NoError(t, w.Shutdown())
395+
}

options.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ import (
1010
type Option func(*options)
1111

1212
type options struct {
13-
runFunc func(context.Context, queue.QueuedMessage) error
14-
logger queue.Logger
15-
addr string
16-
subj string
17-
queue string
13+
runFunc func(context.Context, queue.QueuedMessage) error
14+
logger queue.Logger
15+
addr string
16+
subj string
17+
queue string
18+
disableConsumer bool
1819
}
1920

2021
// WithAddr setup the addr of NATS
@@ -52,6 +53,13 @@ func WithLogger(l queue.Logger) Option {
5253
}
5354
}
5455

56+
// WithDisableConsumer disable consumer
57+
func WithDisableConsumer() Option {
58+
return func(w *options) {
59+
w.disableConsumer = true
60+
}
61+
}
62+
5563
func newOptions(opts ...Option) options {
5664
defaultOpts := options{
5765
addr: "127.0.0.1:4222",

0 commit comments

Comments
 (0)