Skip to content

Commit baf860b

Browse files
authored
chore(queue): handle panic and refactor the job handle (#18)
1 parent bbdc746 commit baf860b

File tree

3 files changed

+73
-30
lines changed

3 files changed

+73
-30
lines changed

queue.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,12 @@ func (q *Queue) QueueWithTimeout(timeout time.Duration, job QueuedMessage) error
138138
}
139139

140140
func (q *Queue) work() {
141+
select {
142+
case <-q.quit:
143+
return
144+
default:
145+
}
146+
141147
num := atomic.AddInt32(&q.runningWorkers, 1)
142148
if err := q.worker.BeforeRun(); err != nil {
143149
q.logger.Fatal(err)

simple/simple.go

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,45 @@ func (s *Worker) AfterRun() error {
3636
return nil
3737
}
3838

39+
func (s *Worker) handle(m interface{}) error {
40+
// create channel with buffer size 1 to avoid goroutine leak
41+
done := make(chan error, 1)
42+
panicChan := make(chan interface{}, 1)
43+
job, _ := m.(queue.Job)
44+
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
45+
defer cancel()
46+
47+
// run the job
48+
go func() {
49+
// handle panic issue
50+
defer func() {
51+
if p := recover(); p != nil {
52+
panicChan <- p
53+
}
54+
}()
55+
56+
// run custom process function
57+
done <- s.runFunc(ctx, job)
58+
}()
59+
60+
select {
61+
case p := <-panicChan:
62+
panic(p)
63+
case <-ctx.Done(): // timeout reached
64+
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
65+
s.logger.Infof("job timeout: %s", job.Timeout.String())
66+
}
67+
// wait job
68+
return <-done
69+
case <-s.stop: // shutdown service
70+
cancel()
71+
// wait job
72+
return <-done
73+
case err := <-done: // job finish and continue to worker
74+
return err
75+
}
76+
}
77+
3978
// Run start the worker
4079
func (s *Worker) Run() error {
4180
// check queue status
@@ -46,33 +85,9 @@ func (s *Worker) Run() error {
4685
}
4786

4887
for task := range s.taskQueue {
49-
done := make(chan struct{})
50-
v, _ := task.(queue.Job)
51-
ctx, cancel := context.WithTimeout(context.Background(), v.Timeout)
52-
// vet doesn't complain if I do this
53-
_ = cancel
54-
55-
// run the job
56-
go func() {
57-
// run custom process function
58-
_ = s.runFunc(ctx, task)
59-
close(done)
60-
}()
61-
62-
select {
63-
case <-ctx.Done(): // timeout reached
64-
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
65-
s.logger.Infof("job timeout: %s", v.Timeout.String())
66-
}
67-
// wait job
68-
<-done
69-
case <-s.stop: // shutdown service
70-
cancel()
71-
// wait job
72-
<-done
73-
case <-done: // job finish and continue to work
88+
if err := s.handle(task); err != nil {
89+
s.logger.Error(err.Error())
7490
}
75-
7691
}
7792
return nil
7893
}

simple/simple_test.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,10 @@ func TestCustomFuncAndWait(t *testing.T) {
6464
assert.NoError(t, err)
6565
q.Start()
6666
time.Sleep(100 * time.Millisecond)
67-
assert.NoError(t, w.Queue(m))
68-
assert.NoError(t, w.Queue(m))
69-
assert.NoError(t, w.Queue(m))
70-
assert.NoError(t, w.Queue(m))
67+
assert.NoError(t, q.Queue(m))
68+
assert.NoError(t, q.Queue(m))
69+
assert.NoError(t, q.Queue(m))
70+
assert.NoError(t, q.Queue(m))
7171
time.Sleep(600 * time.Millisecond)
7272
q.Shutdown()
7373
q.Wait()
@@ -226,3 +226,25 @@ func TestGoroutineLeak(t *testing.T) {
226226
q.Wait()
227227
fmt.Println("number of goroutines:", runtime.NumGoroutine())
228228
}
229+
230+
func TestGoroutinePanic(t *testing.T) {
231+
m := mockMessage{
232+
msg: "foo",
233+
}
234+
w := NewWorker(
235+
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
236+
panic("missing something")
237+
}),
238+
)
239+
q, err := queue.NewQueue(
240+
queue.WithWorker(w),
241+
queue.WithWorkerCount(2),
242+
)
243+
assert.NoError(t, err)
244+
q.Start()
245+
time.Sleep(50 * time.Millisecond)
246+
assert.NoError(t, q.Queue(m))
247+
time.Sleep(50 * time.Millisecond)
248+
q.Shutdown()
249+
q.Wait()
250+
}

0 commit comments

Comments
 (0)