Skip to content

Commit 6b35f58

Browse files
authored
chore: Add job timeout in body with bytes format. (#23)
1 parent 0f30526 commit 6b35f58

File tree

2 files changed

+24
-15
lines changed

2 files changed

+24
-15
lines changed

queue.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package queue
22

33
import (
4+
"encoding/json"
45
"errors"
56
"runtime"
67
"sync"
@@ -37,6 +38,11 @@ func (j Job) Bytes() []byte {
3738
return j.Body
3839
}
3940

41+
func (j Job) Encode() []byte {
42+
b, _ := json.Marshal(j)
43+
return b
44+
}
45+
4046
// Option for queue system
4147
type Option func(*Queue)
4248

@@ -126,28 +132,29 @@ func (q *Queue) Wait() {
126132
q.routineGroup.Wait()
127133
}
128134

129-
// Queue to queue all job
130-
func (q *Queue) Queue(job QueuedMessage) error {
135+
func (q *Queue) handleQueue(timeout time.Duration, job QueuedMessage) error {
131136
if atomic.LoadInt32(&q.stopFlag) == 1 {
132137
return ErrQueueShutdown
133138
}
134139

135-
return q.worker.Queue(Job{
136-
Timeout: q.timeout,
140+
data := Job{
141+
Timeout: timeout,
137142
Body: job.Bytes(),
143+
}
144+
145+
return q.worker.Queue(Job{
146+
Body: data.Encode(),
138147
})
139148
}
140149

141150
// Queue to queue all job
142-
func (q *Queue) QueueWithTimeout(timeout time.Duration, job QueuedMessage) error {
143-
if atomic.LoadInt32(&q.stopFlag) == 1 {
144-
return ErrQueueShutdown
145-
}
151+
func (q *Queue) Queue(job QueuedMessage) error {
152+
return q.handleQueue(q.timeout, job)
153+
}
146154

147-
return q.worker.Queue(Job{
148-
Timeout: timeout,
149-
Body: job.Bytes(),
150-
})
155+
// Queue to queue all job
156+
func (q *Queue) QueueWithTimeout(timeout time.Duration, job QueuedMessage) error {
157+
return q.handleQueue(q.timeout, job)
151158
}
152159

153160
func (q *Queue) work() {

simple/simple.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package simple
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"sync"
78
"sync/atomic"
@@ -39,11 +40,10 @@ func (s *Worker) AfterRun() error {
3940
return nil
4041
}
4142

42-
func (s *Worker) handle(m interface{}) error {
43+
func (s *Worker) handle(job queue.Job) error {
4344
// create channel with buffer size 1 to avoid goroutine leak
4445
done := make(chan error, 1)
4546
panicChan := make(chan interface{}, 1)
46-
job, _ := m.(queue.Job)
4747
startTime := time.Now()
4848
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
4949
defer cancel()
@@ -95,7 +95,9 @@ func (s *Worker) Run() error {
9595
}
9696

9797
for task := range s.taskQueue {
98-
if err := s.handle(task); err != nil {
98+
var data queue.Job
99+
_ = json.Unmarshal(task.Bytes(), &data)
100+
if err := s.handle(data); err != nil {
99101
s.logger.Error(err.Error())
100102
}
101103
}

0 commit comments

Comments
 (0)