Skip to content

Commit 61d8f55

Browse files
authored
chore(NATS): support job timeout. (#26)
1 parent 3525a28 commit 61d8f55

File tree

2 files changed

+382
-29
lines changed

2 files changed

+382
-29
lines changed

nats/nats.go

Lines changed: 88 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package nats
22

33
import (
4+
"context"
5+
"encoding/json"
46
"sync"
7+
"sync/atomic"
8+
"time"
59

610
"github.com/appleboy/queue"
711

@@ -13,16 +17,6 @@ var _ queue.Worker = (*Worker)(nil)
1317
// Option for queue system
1418
type Option func(*Worker)
1519

16-
// Job with NSQ message
17-
type Job struct {
18-
Body []byte
19-
}
20-
21-
// Bytes get bytes format
22-
func (j *Job) Bytes() []byte {
23-
return j.Body
24-
}
25-
2620
// Worker for NSQ
2721
type Worker struct {
2822
addr string
@@ -31,7 +25,9 @@ type Worker struct {
3125
client *nats.Conn
3226
stop chan struct{}
3327
stopOnce sync.Once
34-
runFunc func(queue.QueuedMessage, <-chan struct{}) error
28+
runFunc func(context.Context, queue.QueuedMessage) error
29+
logger queue.Logger
30+
stopFlag int32
3531
}
3632

3733
// WithAddr setup the addr of NATS
@@ -56,12 +52,19 @@ func WithQueue(queue string) Option {
5652
}
5753

5854
// WithRunFunc setup the run func of queue
59-
func WithRunFunc(fn func(queue.QueuedMessage, <-chan struct{}) error) Option {
55+
func WithRunFunc(fn func(context.Context, queue.QueuedMessage) error) Option {
6056
return func(w *Worker) {
6157
w.runFunc = fn
6258
}
6359
}
6460

61+
// WithLogger set custom logger
62+
func WithLogger(l queue.Logger) Option {
63+
return func(w *Worker) {
64+
w.logger = l
65+
}
66+
}
67+
6568
// NewWorker for struc
6669
func NewWorker(opts ...Option) *Worker {
6770
var err error
@@ -70,7 +73,7 @@ func NewWorker(opts ...Option) *Worker {
7073
subj: "foobar",
7174
queue: "foobar",
7275
stop: make(chan struct{}),
73-
runFunc: func(queue.QueuedMessage, <-chan struct{}) error {
76+
runFunc: func(context.Context, queue.QueuedMessage) error {
7477
return nil
7578
},
7679
}
@@ -99,26 +102,81 @@ func (s *Worker) AfterRun() error {
99102
return nil
100103
}
101104

105+
func (s *Worker) handle(job queue.Job) error {
106+
// create channel with buffer size 1 to avoid goroutine leak
107+
done := make(chan error, 1)
108+
panicChan := make(chan interface{}, 1)
109+
startTime := time.Now()
110+
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
111+
defer cancel()
112+
113+
// run the job
114+
go func() {
115+
// handle panic issue
116+
defer func() {
117+
if p := recover(); p != nil {
118+
panicChan <- p
119+
}
120+
}()
121+
122+
// run custom process function
123+
done <- s.runFunc(ctx, job)
124+
}()
125+
126+
select {
127+
case p := <-panicChan:
128+
panic(p)
129+
case <-ctx.Done(): // timeout reached
130+
return ctx.Err()
131+
case <-s.stop: // shutdown service
132+
// cancel job
133+
cancel()
134+
135+
leftTime := job.Timeout - time.Since(startTime)
136+
// wait job
137+
select {
138+
case <-time.After(leftTime):
139+
return context.DeadlineExceeded
140+
case err := <-done: // job finish
141+
return err
142+
case p := <-panicChan:
143+
panic(p)
144+
}
145+
case err := <-done: // job finish
146+
return err
147+
}
148+
}
149+
102150
// Run start the worker
103151
func (s *Worker) Run() error {
104152
wg := &sync.WaitGroup{}
105-
153+
panicChan := make(chan interface{}, 1)
106154
_, err := s.client.QueueSubscribe(s.subj, s.queue, func(m *nats.Msg) {
107155
wg.Add(1)
108-
defer wg.Done()
109-
job := &Job{
110-
Body: m.Data,
156+
defer func() {
157+
wg.Done()
158+
if p := recover(); p != nil {
159+
panicChan <- p
160+
}
161+
}()
162+
163+
var data queue.Job
164+
_ = json.Unmarshal(m.Data, &data)
165+
166+
if err := s.handle(data); err != nil {
167+
s.logger.Error(err)
111168
}
112-
113-
// run custom process function
114-
_ = s.runFunc(job, s.stop)
115169
})
116170
if err != nil {
117171
return err
118172
}
119173

120174
// wait close signal
121-
<-s.stop
175+
select {
176+
case <-s.stop:
177+
case err := <-panicChan:
178+
s.logger.Error(err)
179+
}
122180

123181
// wait job completed
124182
wg.Wait()
@@ -128,9 +186,13 @@ func (s *Worker) Run() error {
128186

129187
// Shutdown worker
130188
func (s *Worker) Shutdown() error {
189+
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
190+
return queue.ErrQueueShutdown
191+
}
192+
131193
s.stopOnce.Do(func() {
132-
close(s.stop)
133194
s.client.Close()
195+
close(s.stop)
134196
})
135197
return nil
136198
}
@@ -147,6 +209,10 @@ func (s *Worker) Usage() int {
147209

148210
// Queue send notification to queue
149211
func (s *Worker) Queue(job queue.QueuedMessage) error {
212+
if atomic.LoadInt32(&s.stopFlag) == 1 {
213+
return queue.ErrQueueShutdown
214+
}
215+
150216
err := s.client.Publish(s.subj, job.Bytes())
151217
if err != nil {
152218
return err

0 commit comments

Comments
 (0)