Skip to content

Commit 17193e1

Browse files
committed
refactor: remove startOnce and Add handle job
1 parent 8080cce commit 17193e1

File tree

1 file changed

+60
-7
lines changed

1 file changed

+60
-7
lines changed

redis.go

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"sync"
66
"sync/atomic"
7+
"time"
78

89
"github.com/golang-queue/queue"
910

@@ -23,11 +24,11 @@ type Worker struct {
2324
db int
2425
connectionString string
2526
password string
27+
channel string
28+
channelSize int
2629

27-
startOnce sync.Once
2830
stopOnce sync.Once
2931
stop chan struct{}
30-
channel string
3132
runFunc func(context.Context, queue.QueuedMessage) error
3233
logger queue.Logger
3334
stopFlag int32
@@ -48,6 +49,13 @@ func WithDB(db int) Option {
4849
}
4950
}
5051

52+
// WithChannelSize redis password
53+
func WithChannelSize(size int) Option {
54+
return func(w *Worker) {
55+
w.channelSize = size
56+
}
57+
}
58+
5159
// WithPassword redis password
5260
func WithPassword(passwd string) Option {
5361
return func(w *Worker) {
@@ -140,7 +148,48 @@ func (s *Worker) AfterRun() error {
140148
}
141149

142150
func (s *Worker) handle(job queue.Job) error {
143-
return nil
151+
// create channel with buffer size 1 to avoid goroutine leak
152+
done := make(chan error, 1)
153+
panicChan := make(chan interface{}, 1)
154+
startTime := time.Now()
155+
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
156+
defer cancel()
157+
158+
// run the job
159+
go func() {
160+
// handle panic issue
161+
defer func() {
162+
if p := recover(); p != nil {
163+
panicChan <- p
164+
}
165+
}()
166+
167+
// run custom process function
168+
done <- s.runFunc(ctx, job)
169+
}()
170+
171+
select {
172+
case p := <-panicChan:
173+
panic(p)
174+
case <-ctx.Done(): // timeout reached
175+
return ctx.Err()
176+
case <-s.stop: // shutdown service
177+
// cancel job
178+
cancel()
179+
180+
leftTime := job.Timeout - time.Since(startTime)
181+
// wait job
182+
select {
183+
case <-time.After(leftTime):
184+
return context.DeadlineExceeded
185+
case err := <-done: // job finish
186+
return err
187+
case p := <-panicChan:
188+
panic(p)
189+
}
190+
case err := <-done: // job finish
191+
return err
192+
}
144193
}
145194

146195
// Shutdown worker
@@ -150,10 +199,7 @@ func (s *Worker) Shutdown() error {
150199
}
151200

152201
s.stopOnce.Do(func() {
153-
if atomic.LoadInt32(&s.startFlag) == 1 {
154-
s.rdb.Close()
155-
}
156-
202+
s.rdb.Close()
157203
close(s.stop)
158204
})
159205
return nil
@@ -180,5 +226,12 @@ func (s *Worker) Queue(job queue.QueuedMessage) error {
180226

181227
// Run start the worker
182228
func (s *Worker) Run() error {
229+
// check queue status
230+
select {
231+
case <-s.stop:
232+
return queue.ErrQueueShutdown
233+
default:
234+
}
235+
183236
return nil
184237
}

0 commit comments

Comments
 (0)