Skip to content

Commit 2a43c22

Browse files
authored
chore: Add stop flag in Queue and refactor to handle stop flow (#19)
1 parent baf860b commit 2a43c22

File tree

3 files changed

+100
-10
lines changed

3 files changed

+100
-10
lines changed

queue.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type (
2222
stopOnce sync.Once
2323
runningWorkers int32
2424
timeout time.Duration
25+
stopFlag int32
2526
}
2627

2728
// Job with Timeout
@@ -103,6 +104,10 @@ func (q *Queue) Start() {
103104

104105
// Shutdown stops all queues.
105106
func (q *Queue) Shutdown() {
107+
if !atomic.CompareAndSwapInt32(&q.stopFlag, 0, 1) {
108+
return
109+
}
110+
106111
q.stopOnce.Do(func() {
107112
if err := q.worker.Shutdown(); err != nil {
108113
q.logger.Error(err)
@@ -138,10 +143,8 @@ func (q *Queue) QueueWithTimeout(timeout time.Duration, job QueuedMessage) error
138143
}
139144

140145
func (q *Queue) work() {
141-
select {
142-
case <-q.quit:
146+
if atomic.LoadInt32(&q.stopFlag) == 1 {
143147
return
144-
default:
145148
}
146149

147150
num := atomic.AddInt32(&q.runningWorkers, 1)

simple/simple.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"sync"
7+
"time"
78

89
"github.com/appleboy/queue"
910
)
@@ -41,6 +42,7 @@ func (s *Worker) handle(m interface{}) error {
4142
done := make(chan error, 1)
4243
panicChan := make(chan interface{}, 1)
4344
job, _ := m.(queue.Job)
45+
startTime := time.Now()
4446
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
4547
defer cancel()
4648

@@ -61,16 +63,22 @@ func (s *Worker) handle(m interface{}) error {
6163
case p := <-panicChan:
6264
panic(p)
6365
case <-ctx.Done(): // timeout reached
64-
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
65-
s.logger.Infof("job timeout: %s", job.Timeout.String())
66-
}
67-
// wait job
68-
return <-done
66+
return ctx.Err()
6967
case <-s.stop: // shutdown service
68+
// cancel job
7069
cancel()
70+
71+
leftTime := job.Timeout - time.Since(startTime)
7172
// wait job
72-
return <-done
73-
case err := <-done: // job finish and continue to worker
73+
select {
74+
case <-time.After(leftTime):
75+
return context.DeadlineExceeded
76+
case err := <-done: // job finish
77+
return err
78+
case p := <-panicChan:
79+
panic(p)
80+
}
81+
case err := <-done: // job finish
7482
return err
7583
}
7684
}

simple/simple_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,3 +248,82 @@ func TestGoroutinePanic(t *testing.T) {
248248
q.Shutdown()
249249
q.Wait()
250250
}
251+
252+
func TestHandleTimeout(t *testing.T) {
253+
job := queue.Job{
254+
Timeout: 100 * time.Millisecond,
255+
Body: []byte("foo"),
256+
}
257+
w := NewWorker(
258+
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
259+
time.Sleep(200 * time.Millisecond)
260+
return nil
261+
}),
262+
)
263+
264+
err := w.handle(job)
265+
assert.Error(t, err)
266+
assert.Equal(t, context.DeadlineExceeded, err)
267+
268+
job = queue.Job{
269+
Timeout: 150 * time.Millisecond,
270+
Body: []byte("foo"),
271+
}
272+
273+
w = NewWorker(
274+
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
275+
time.Sleep(200 * time.Millisecond)
276+
return nil
277+
}),
278+
)
279+
280+
done := make(chan error)
281+
go func() {
282+
done <- w.handle(job)
283+
}()
284+
285+
assert.NoError(t, w.Shutdown())
286+
287+
err = <-done
288+
assert.Error(t, err)
289+
assert.Equal(t, context.DeadlineExceeded, err)
290+
}
291+
292+
func TestJobComplete(t *testing.T) {
293+
job := queue.Job{
294+
Timeout: 100 * time.Millisecond,
295+
Body: []byte("foo"),
296+
}
297+
w := NewWorker(
298+
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
299+
return errors.New("job completed")
300+
}),
301+
)
302+
303+
err := w.handle(job)
304+
assert.Error(t, err)
305+
assert.Equal(t, errors.New("job completed"), err)
306+
307+
job = queue.Job{
308+
Timeout: 250 * time.Millisecond,
309+
Body: []byte("foo"),
310+
}
311+
312+
w = NewWorker(
313+
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
314+
time.Sleep(200 * time.Millisecond)
315+
return errors.New("job completed")
316+
}),
317+
)
318+
319+
done := make(chan error)
320+
go func() {
321+
done <- w.handle(job)
322+
}()
323+
324+
assert.NoError(t, w.Shutdown())
325+
326+
err = <-done
327+
assert.Error(t, err)
328+
assert.Equal(t, errors.New("job completed"), err)
329+
}

0 commit comments

Comments
 (0)