Skip to content

Commit 18f5bb4

Browse files
authored
feat(queue): all task has been done after closing the channel (#56)
1 parent fe0fb0e commit 18f5bb4

File tree

4 files changed

+39
-32
lines changed

4 files changed

+39
-32
lines changed

consumer.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,13 @@ func (s *Consumer) Queue(task QueuedMessage) error {
119119

120120
func (s *Consumer) Request() (QueuedMessage, error) {
121121
select {
122-
case task := <-s.taskQueue:
122+
case task, ok := <-s.taskQueue:
123+
if !ok {
124+
return nil, ErrQueueHasBeenClosed
125+
}
123126
return task, nil
124127
default:
125-
return nil, errors.New("no task in queue")
128+
return nil, ErrNoTaskInQueue
126129
}
127130
}
128131

consumer_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,10 @@ func TestGoroutineLeak(t *testing.T) {
148148
for {
149149
select {
150150
case <-ctx.Done():
151-
log.Println("get data:", string(m.Bytes()))
152151
if errors.Is(ctx.Err(), context.Canceled) {
153-
log.Println("queue has been shutdown and cancel the job")
152+
log.Println("queue has been shutdown and cancel the job: " + string(m.Bytes()))
154153
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
155-
log.Println("job deadline exceeded")
154+
log.Println("job deadline exceeded: " + string(m.Bytes()))
156155
}
157156
return nil
158157
default:

errors.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package queue
2+
3+
import "errors"
4+
5+
var (
6+
// ErrNoTaskInQueue there is nothing in the queue
7+
ErrNoTaskInQueue = errors.New("no task in queue")
8+
// ErrQueueHasBeenClosed the current queue is closed
9+
ErrQueueHasBeenClosed = errors.New("queue has been closed")
10+
)

queue.go

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -246,46 +246,41 @@ func (q *Queue) start() {
246246
tasks := make(chan QueuedMessage, 1)
247247

248248
for {
249-
var task QueuedMessage
250-
251249
// request task from queue in background
252250
q.routineGroup.Run(func() {
253251
for {
254-
select {
255-
case <-q.quit:
256-
return
257-
default:
258-
t, err := q.worker.Request()
259-
if t == nil || err != nil {
260-
if err != nil {
261-
select {
262-
case <-q.quit:
252+
t, err := q.worker.Request()
253+
if t == nil || err != nil {
254+
if err != nil {
255+
select {
256+
case <-q.quit:
257+
if !errors.Is(err, ErrNoTaskInQueue) {
258+
close(tasks)
263259
return
264-
case <-time.After(time.Second):
265-
// sleep 1 second to fetch new task
266260
}
261+
case <-time.After(time.Second):
262+
// sleep 1 second to fetch new task
267263
}
268264
}
269-
if t != nil {
270-
tasks <- t
265+
}
266+
if t != nil {
267+
tasks <- t
268+
return
269+
}
270+
271+
select {
272+
case <-q.quit:
273+
if !errors.Is(err, ErrNoTaskInQueue) {
274+
close(tasks)
271275
return
272276
}
277+
default:
273278
}
274279
}
275280
})
276281

277-
// read task
278-
select {
279-
case task = <-tasks:
280-
case <-q.quit:
281-
select {
282-
case task = <-tasks:
283-
// queue task before shutdown the service
284-
if err := q.worker.Queue(task); err != nil {
285-
q.logger.Errorf("can't re-queue task: %v", err)
286-
}
287-
default:
288-
}
282+
task, ok := <-tasks
283+
if !ok {
289284
return
290285
}
291286

0 commit comments

Comments
 (0)