Skip to content

Commit 79774a8

Browse files
authored
chore: add stop channel in worker (#12)
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent a13f58f commit 79774a8

File tree

6 files changed

+33
-20
lines changed

6 files changed

+33
-20
lines changed

nsq/nsq.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ type Worker struct {
3030
q *nsq.Consumer
3131
p *nsq.Producer
3232
startOnce sync.Once
33+
stopOnce sync.Once
34+
stop chan struct{}
3335
maxInFlight int
3436
addr string
3537
topic string
3638
channel string
37-
runFunc func(queue.QueuedMessage) error
39+
runFunc func(queue.QueuedMessage, <-chan struct{}) error
3840
}
3941

4042
// WithAddr setup the addr of NSQ
@@ -59,7 +61,7 @@ func WithChannel(channel string) Option {
5961
}
6062

6163
// WithRunFunc setup the run func of queue
62-
func WithRunFunc(fn func(queue.QueuedMessage) error) Option {
64+
func WithRunFunc(fn func(queue.QueuedMessage, <-chan struct{}) error) Option {
6365
return func(w *Worker) {
6466
w.runFunc = fn
6567
}
@@ -80,7 +82,8 @@ func NewWorker(opts ...Option) *Worker {
8082
topic: "gorush",
8183
channel: "ch",
8284
maxInFlight: runtime.NumCPU(),
83-
runFunc: func(queue.QueuedMessage) error {
85+
stop: make(chan struct{}),
86+
runFunc: func(queue.QueuedMessage, <-chan struct{}) error {
8487
return nil
8588
},
8689
}
@@ -125,7 +128,7 @@ func (s *Worker) AfterRun() error {
125128
}
126129

127130
// Run start the worker
128-
func (s *Worker) Run(quit chan struct{}) error {
131+
func (s *Worker) Run() error {
129132
wg := &sync.WaitGroup{}
130133
s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
131134
wg.Add(1)
@@ -141,11 +144,11 @@ func (s *Worker) Run(quit chan struct{}) error {
141144
}
142145

143146
// run custom process function
144-
return s.runFunc(job)
147+
return s.runFunc(job, s.stop)
145148
}))
146149

147150
// wait close signal
148-
<-quit
151+
<-s.stop
149152

150153
// wait job completed
151154
wg.Wait()
@@ -155,8 +158,11 @@ func (s *Worker) Run(quit chan struct{}) error {
155158

156159
// Shutdown worker
157160
func (s *Worker) Shutdown() error {
158-
s.q.Stop()
159-
s.p.Stop()
161+
s.stopOnce.Do(func() {
162+
s.q.Stop()
163+
s.p.Stop()
164+
close(s.stop)
165+
})
160166
return nil
161167
}
162168

nsq/nsq_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestCustomFuncAndWait(t *testing.T) {
6060
WithAddr(host+":4150"),
6161
WithTopic("test"),
6262
WithMaxInFlight(2),
63-
WithRunFunc(func(msg queue.QueuedMessage) error {
63+
WithRunFunc(func(msg queue.QueuedMessage, s <-chan struct{}) error {
6464
time.Sleep(500 * time.Millisecond)
6565
return nil
6666
}),

queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (q *Queue) work() {
126126
}
127127
}()
128128
q.logger.Infof("start the worker num: %d", num)
129-
if err := q.worker.Run(q.quit); err != nil {
129+
if err := q.worker.Run(); err != nil {
130130
q.logger.Error(err)
131131
}
132132
q.logger.Infof("stop the worker num: %d", num)

simple/simple.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package simple
33
import (
44
"errors"
55
"runtime"
6+
"sync"
67

78
"github.com/appleboy/queue"
89
)
@@ -17,7 +18,9 @@ var errMaxCapacity = errors.New("max capacity reached")
1718
// Worker for simple queue using channel
1819
type Worker struct {
1920
queueNotification chan queue.QueuedMessage
20-
runFunc func(queue.QueuedMessage) error
21+
runFunc func(queue.QueuedMessage, <-chan struct{}) error
22+
stop chan struct{}
23+
stopOnce sync.Once
2124
}
2225

2326
// BeforeRun run script before start worker
@@ -31,17 +34,20 @@ func (s *Worker) AfterRun() error {
3134
}
3235

3336
// Run start the worker
34-
func (s *Worker) Run(_ chan struct{}) error {
37+
func (s *Worker) Run() error {
3538
for notification := range s.queueNotification {
3639
// run custom process function
37-
_ = s.runFunc(notification)
40+
_ = s.runFunc(notification, s.stop)
3841
}
3942
return nil
4043
}
4144

4245
// Shutdown worker
4346
func (s *Worker) Shutdown() error {
44-
close(s.queueNotification)
47+
s.stopOnce.Do(func() {
48+
close(s.queueNotification)
49+
close(s.stop)
50+
})
4551
return nil
4652
}
4753

@@ -73,7 +79,7 @@ func WithQueueNum(num int) Option {
7379
}
7480

7581
// WithRunFunc setup the run func of queue
76-
func WithRunFunc(fn func(queue.QueuedMessage) error) Option {
82+
func WithRunFunc(fn func(queue.QueuedMessage, <-chan struct{}) error) Option {
7783
return func(w *Worker) {
7884
w.runFunc = fn
7985
}
@@ -83,7 +89,8 @@ func WithRunFunc(fn func(queue.QueuedMessage) error) Option {
8389
func NewWorker(opts ...Option) *Worker {
8490
w := &Worker{
8591
queueNotification: make(chan queue.QueuedMessage, runtime.NumCPU()<<1),
86-
runFunc: func(msg queue.QueuedMessage) error {
92+
stop: make(chan struct{}),
93+
runFunc: func(queue.QueuedMessage, <-chan struct{}) error {
8794
return nil
8895
},
8996
}

simple/simple_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func TestCustomFuncAndWait(t *testing.T) {
4848
msg: "foo",
4949
}
5050
w := NewWorker(
51-
WithRunFunc(func(msg queue.QueuedMessage) error {
51+
WithRunFunc(func(msg queue.QueuedMessage, s <-chan struct{}) error {
5252
time.Sleep(500 * time.Millisecond)
5353
return nil
5454
}),

worker.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ type Worker interface {
1010
// BeforeRun is called before starting the worker
1111
BeforeRun() error
1212
// Run is called to start the worker
13-
Run(chan struct{}) error
13+
Run() error
1414
// BeforeRun is called after starting the worker
1515
AfterRun() error
1616
// Shutdown is called if stop all worker
@@ -37,7 +37,7 @@ type emptyWorker struct{}
3737

3838
func (w *emptyWorker) BeforeRun() error { return nil }
3939
func (w *emptyWorker) AfterRun() error { return nil }
40-
func (w *emptyWorker) Run(chan struct{}) error { return nil }
40+
func (w *emptyWorker) Run() error { return nil }
4141
func (w *emptyWorker) Shutdown() error { return nil }
4242
func (w *emptyWorker) Queue(job QueuedMessage) error { return nil }
4343
func (w *emptyWorker) Capacity() int { return 0 }
@@ -49,7 +49,7 @@ type queueWorker struct {
4949

5050
func (w *queueWorker) BeforeRun() error { return nil }
5151
func (w *queueWorker) AfterRun() error { return nil }
52-
func (w *queueWorker) Run(chan struct{}) error {
52+
func (w *queueWorker) Run() error {
5353
for msg := range w.messages {
5454
if string(msg.Bytes()) == "panic" {
5555
panic("show panic")

0 commit comments

Comments
 (0)