Skip to content

Commit fe0fb0e

Browse files
authored
fix(worker): handle all jobs with graceful shutdown (#54)
1 parent f4bc970 commit fe0fb0e

File tree

3 files changed

+10
-34
lines changed

3 files changed

+10
-34
lines changed

consumer.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,6 @@ func (s *Consumer) handle(job Job) error {
7676

7777
// Run start the worker
7878
func (s *Consumer) Run(task QueuedMessage) error {
79-
// check queue status
80-
select {
81-
case <-s.stop:
82-
return ErrQueueShutdown
83-
default:
84-
}
85-
8679
var data Job
8780
_ = json.Unmarshal(task.Bytes(), &data)
8881
if v, ok := task.(Job); ok {

consumer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
143143

144144
func TestGoroutineLeak(t *testing.T) {
145145
w := NewConsumer(
146-
WithLogger(NewEmptyLogger()),
146+
WithLogger(NewLogger()),
147147
WithFn(func(ctx context.Context, m QueuedMessage) error {
148148
for {
149149
select {
@@ -178,7 +178,7 @@ func TestGoroutineLeak(t *testing.T) {
178178
}
179179

180180
q.Start()
181-
time.Sleep(2 * time.Second)
181+
time.Sleep(1 * time.Second)
182182
q.Release()
183183
fmt.Println("number of goroutines:", runtime.NumGoroutine())
184184
}

queue.go

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ func NewQueue(opts ...Option) (*Queue, error) {
8181

8282
// Start to enable all worker
8383
func (q *Queue) Start() {
84-
go q.start()
84+
q.routineGroup.Run(func() {
85+
q.start()
86+
})
8587
}
8688

8789
// Shutdown stops all queues.
@@ -220,6 +222,7 @@ func (q *Queue) work(task QueuedMessage) {
220222
}
221223
}
222224

225+
// UpdateWorkerCount to update worker number dynamically.
223226
func (q *Queue) UpdateWorkerCount(num int) {
224227
q.workerCount = num
225228
q.schedule()
@@ -244,13 +247,9 @@ func (q *Queue) start() {
244247

245248
for {
246249
var task QueuedMessage
247-
if atomic.LoadInt32(&q.stopFlag) == 1 {
248-
return
249-
}
250250

251251
// request task from queue in background
252252
q.routineGroup.Run(func() {
253-
loop:
254253
for {
255254
select {
256255
case <-q.quit:
@@ -261,15 +260,15 @@ func (q *Queue) start() {
261260
if err != nil {
262261
select {
263262
case <-q.quit:
264-
break loop
263+
return
265264
case <-time.After(time.Second):
266265
// sleep 1 second to fetch new task
267266
}
268267
}
269268
}
270269
if t != nil {
271270
tasks <- t
272-
break loop
271+
return
273272
}
274273
}
275274
}
@@ -292,24 +291,8 @@ func (q *Queue) start() {
292291

293292
// check worker number
294293
q.schedule()
295-
296-
// get worker to execute new task
297-
select {
298-
case <-q.quit:
299-
if err := q.worker.Queue(task); err != nil {
300-
q.logger.Errorf("can't re-queue task: %v", err)
301-
}
302-
return
303-
case <-q.ready:
304-
select {
305-
case <-q.quit:
306-
if err := q.worker.Queue(task); err != nil {
307-
q.logger.Errorf("can't re-queue task: %v", err)
308-
}
309-
return
310-
default:
311-
}
312-
}
294+
// wait worker ready
295+
<-q.ready
313296

314297
// start new task
315298
q.metric.IncBusyWorker()

0 commit comments

Comments
 (0)