Skip to content

Commit b58b1d8

Browse files
authored
chore(NSQ): support job timeout (#25)
1 parent 6d3b0b9 commit b58b1d8

File tree

2 files changed

+383
-33
lines changed

2 files changed

+383
-33
lines changed

nsq/nsq.go

Lines changed: 92 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package nsq
22

33
import (
4+
"context"
5+
"encoding/json"
46
"runtime"
57
"sync"
8+
"sync/atomic"
69
"time"
710

811
"github.com/appleboy/queue"
@@ -15,16 +18,6 @@ var _ queue.Worker = (*Worker)(nil)
1518
// Option for queue system
1619
type Option func(*Worker)
1720

18-
// Job with NSQ message
19-
type Job struct {
20-
Body []byte
21-
}
22-
23-
// Bytes get bytes format
24-
func (j *Job) Bytes() []byte {
25-
return j.Body
26-
}
27-
2821
// Worker for NSQ
2922
type Worker struct {
3023
q *nsq.Consumer
@@ -36,7 +29,10 @@ type Worker struct {
3629
addr string
3730
topic string
3831
channel string
39-
runFunc func(queue.QueuedMessage, <-chan struct{}) error
32+
runFunc func(context.Context, queue.QueuedMessage) error
33+
logger queue.Logger
34+
stopFlag int32
35+
startFlag int32
4036
}
4137

4238
// WithAddr setup the addr of NSQ
@@ -61,7 +57,7 @@ func WithChannel(channel string) Option {
6157
}
6258

6359
// WithRunFunc setup the run func of queue
64-
func WithRunFunc(fn func(queue.QueuedMessage, <-chan struct{}) error) Option {
60+
func WithRunFunc(fn func(context.Context, queue.QueuedMessage) error) Option {
6561
return func(w *Worker) {
6662
w.runFunc = fn
6763
}
@@ -74,6 +70,13 @@ func WithMaxInFlight(num int) Option {
7470
}
7571
}
7672

73+
// WithLogger set custom logger
74+
func WithLogger(l queue.Logger) Option {
75+
return func(w *Worker) {
76+
w.logger = l
77+
}
78+
}
79+
7780
// NewWorker for struc
7881
func NewWorker(opts ...Option) *Worker {
7982
var err error
@@ -83,7 +86,8 @@ func NewWorker(opts ...Option) *Worker {
8386
channel: "ch",
8487
maxInFlight: runtime.NumCPU(),
8588
stop: make(chan struct{}),
86-
runFunc: func(queue.QueuedMessage, <-chan struct{}) error {
89+
logger: queue.NewLogger(),
90+
runFunc: func(context.Context, queue.QueuedMessage) error {
8791
return nil
8892
},
8993
}
@@ -122,33 +126,87 @@ func (s *Worker) AfterRun() error {
122126
if err != nil {
123127
panic("Could not connect nsq server: " + err.Error())
124128
}
129+
130+
atomic.CompareAndSwapInt32(&s.startFlag, 0, 1)
125131
})
126132

127133
return nil
128134
}
129135

136+
func (s *Worker) handle(job queue.Job) error {
137+
// create channel with buffer size 1 to avoid goroutine leak
138+
done := make(chan error, 1)
139+
panicChan := make(chan interface{}, 1)
140+
startTime := time.Now()
141+
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
142+
defer cancel()
143+
144+
// run the job
145+
go func() {
146+
// handle panic issue
147+
defer func() {
148+
if p := recover(); p != nil {
149+
panicChan <- p
150+
}
151+
}()
152+
153+
// run custom process function
154+
done <- s.runFunc(ctx, job)
155+
}()
156+
157+
select {
158+
case p := <-panicChan:
159+
panic(p)
160+
case <-ctx.Done(): // timeout reached
161+
return ctx.Err()
162+
case <-s.stop: // shutdown service
163+
// cancel job
164+
cancel()
165+
166+
leftTime := job.Timeout - time.Since(startTime)
167+
// wait job
168+
select {
169+
case <-time.After(leftTime):
170+
return context.DeadlineExceeded
171+
case err := <-done: // job finish
172+
return err
173+
case p := <-panicChan:
174+
panic(p)
175+
}
176+
case err := <-done: // job finish
177+
return err
178+
}
179+
}
180+
130181
// Run start the worker
131182
func (s *Worker) Run() error {
132183
wg := &sync.WaitGroup{}
184+
panicChan := make(chan interface{}, 1)
133185
s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
134186
wg.Add(1)
135-
defer wg.Done()
187+
defer func() {
188+
wg.Done()
189+
if p := recover(); p != nil {
190+
panicChan <- p
191+
}
192+
}()
136193
if len(msg.Body) == 0 {
137194
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
138195
// In this case, a message with an empty body is simply ignored/discarded.
139196
return nil
140197
}
141198

142-
job := &Job{
143-
Body: msg.Body,
144-
}
145-
146-
// run custom process function
147-
return s.runFunc(job, s.stop)
199+
var data queue.Job
200+
_ = json.Unmarshal(msg.Body, &data)
201+
return s.handle(data)
148202
}))
149203

150204
// wait close signal
151-
<-s.stop
205+
select {
206+
case <-s.stop:
207+
case err := <-panicChan:
208+
s.logger.Error(err)
209+
}
152210

153211
// wait job completed
154212
wg.Wait()
@@ -158,9 +216,16 @@ func (s *Worker) Run() error {
158216

159217
// Shutdown worker
160218
func (s *Worker) Shutdown() error {
219+
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
220+
return queue.ErrQueueShutdown
221+
}
222+
161223
s.stopOnce.Do(func() {
162-
s.q.Stop()
163-
s.p.Stop()
224+
if atomic.LoadInt32(&s.startFlag) == 1 {
225+
s.q.Stop()
226+
s.p.Stop()
227+
}
228+
164229
close(s.stop)
165230
})
166231
return nil
@@ -178,6 +243,10 @@ func (s *Worker) Usage() int {
178243

179244
// Queue send notification to queue
180245
func (s *Worker) Queue(job queue.QueuedMessage) error {
246+
if atomic.LoadInt32(&s.stopFlag) == 1 {
247+
return queue.ErrQueueShutdown
248+
}
249+
181250
err := s.p.Publish(s.topic, job.Bytes())
182251
if err != nil {
183252
return err

0 commit comments

Comments
 (0)