Skip to content

Commit f4074cb

Browse files
authored
chore(queue): Support task function (#28)
* chore(queue): Support task function Signed-off-by: Bo-Yi Wu <[email protected]> * chore: update Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent a742c4e commit f4074cb

File tree

5 files changed

+160
-16
lines changed

5 files changed

+160
-16
lines changed

queue.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package queue
22

33
import (
4+
"context"
45
"encoding/json"
56
"errors"
67
"runtime"
@@ -12,6 +13,9 @@ import (
1213
// ErrQueueShutdown close the queue.
1314
var ErrQueueShutdown = errors.New("queue has been closed")
1415

16+
// TaskFunc is the task function
17+
type TaskFunc func(context.Context) error
18+
1519
type (
1620
// A Queue is a message queue.
1721
Queue struct {
@@ -28,6 +32,7 @@ type (
2832

2933
// Job with Timeout
3034
Job struct {
35+
Task TaskFunc `json:"-"`
3136
Timeout time.Duration `json:"timeout"`
3237
Body []byte `json:"body"`
3338
}
@@ -157,6 +162,27 @@ func (q *Queue) QueueWithTimeout(timeout time.Duration, job QueuedMessage) error
157162
return q.handleQueue(timeout, job)
158163
}
159164

165+
func (q *Queue) handleQueueTask(timeout time.Duration, task TaskFunc) error {
166+
if atomic.LoadInt32(&q.stopFlag) == 1 {
167+
return ErrQueueShutdown
168+
}
169+
170+
return q.worker.Queue(Job{
171+
Task: task,
172+
Timeout: timeout,
173+
})
174+
}
175+
176+
// QueueTask to queue job task
177+
func (q *Queue) QueueTask(task TaskFunc) error {
178+
return q.handleQueueTask(q.timeout, task)
179+
}
180+
181+
// QueueTaskWithTimeout to queue job task with timeout
182+
func (q *Queue) QueueTaskWithTimeout(timeout time.Duration, task TaskFunc) error {
183+
return q.handleQueueTask(timeout, task)
184+
}
185+
160186
func (q *Queue) work() {
161187
if atomic.LoadInt32(&q.stopFlag) == 1 {
162188
return

queue_test.go

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package queue
22

33
import (
4+
"context"
45
"testing"
56
"time"
67

@@ -33,7 +34,7 @@ func TestNewQueue(t *testing.T) {
3334
}
3435

3536
func TestWorkerNum(t *testing.T) {
36-
w := &queueWorker{
37+
w := &messageWorker{
3738
messages: make(chan QueuedMessage, 100),
3839
}
3940
q, err := NewQueue(
@@ -52,7 +53,7 @@ func TestWorkerNum(t *testing.T) {
5253
}
5354

5455
func TestShtdonwOnce(t *testing.T) {
55-
w := &queueWorker{
56+
w := &messageWorker{
5657
messages: make(chan QueuedMessage, 100),
5758
}
5859
q, err := NewQueue(
@@ -76,7 +77,7 @@ func TestWorkerStatus(t *testing.T) {
7677
m := mockMessage{
7778
message: "foobar",
7879
}
79-
w := &queueWorker{
80+
w := &messageWorker{
8081
messages: make(chan QueuedMessage, 100),
8182
}
8283
q, err := NewQueue(
@@ -99,7 +100,7 @@ func TestWorkerStatus(t *testing.T) {
99100
}
100101

101102
func TestWorkerPanic(t *testing.T) {
102-
w := &queueWorker{
103+
w := &messageWorker{
103104
messages: make(chan QueuedMessage, 10),
104105
}
105106
q, err := NewQueue(
@@ -127,7 +128,7 @@ func TestWorkerPanic(t *testing.T) {
127128
}
128129

129130
func TestCapacityReached(t *testing.T) {
130-
w := &queueWorker{
131+
w := &messageWorker{
131132
messages: make(chan QueuedMessage, 1),
132133
}
133134
q, err := NewQueue(
@@ -148,7 +149,7 @@ func TestCapacityReached(t *testing.T) {
148149
}
149150

150151
func TestCloseQueueAfterShutdown(t *testing.T) {
151-
w := &queueWorker{
152+
w := &messageWorker{
152153
messages: make(chan QueuedMessage, 10),
153154
}
154155
q, err := NewQueue(
@@ -174,3 +175,37 @@ func TestCloseQueueAfterShutdown(t *testing.T) {
174175
assert.Error(t, err)
175176
assert.Equal(t, ErrQueueShutdown, err)
176177
}
178+
179+
func TestQueueTaskJob(t *testing.T) {
180+
w := &taskWorker{
181+
messages: make(chan QueuedMessage, 10),
182+
}
183+
q, err := NewQueue(
184+
WithWorker(w),
185+
WithWorkerCount(5),
186+
WithLogger(NewLogger()),
187+
)
188+
assert.NoError(t, err)
189+
assert.NotNil(t, q)
190+
q.Start()
191+
assert.NoError(t, q.QueueTask(func(ctx context.Context) error {
192+
time.Sleep(120 * time.Millisecond)
193+
q.logger.Info("Add new task 1")
194+
return nil
195+
}))
196+
assert.NoError(t, q.QueueTask(func(ctx context.Context) error {
197+
time.Sleep(100 * time.Millisecond)
198+
q.logger.Info("Add new task 2")
199+
return nil
200+
}))
201+
assert.NoError(t, q.QueueTaskWithTimeout(50*time.Millisecond, func(ctx context.Context) error {
202+
time.Sleep(80 * time.Millisecond)
203+
return nil
204+
}))
205+
time.Sleep(50 * time.Millisecond)
206+
q.Shutdown()
207+
assert.Equal(t, ErrQueueShutdown, q.QueueTask(func(ctx context.Context) error {
208+
return nil
209+
}))
210+
q.Wait()
211+
}

simple/simple.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ func (s *Worker) handle(job queue.Job) error {
5858
}()
5959

6060
// run custom process function
61-
done <- s.runFunc(ctx, job)
61+
if job.Task != nil {
62+
done <- job.Task(ctx)
63+
} else {
64+
done <- s.runFunc(ctx, job)
65+
}
6266
}()
6367

6468
select {
@@ -97,6 +101,11 @@ func (s *Worker) Run() error {
97101
for task := range s.taskQueue {
98102
var data queue.Job
99103
_ = json.Unmarshal(task.Bytes(), &data)
104+
if v, ok := task.(queue.Job); ok {
105+
if v.Task != nil {
106+
data.Task = v.Task
107+
}
108+
}
100109
if err := s.handle(data); err != nil {
101110
s.logger.Error(err.Error())
102111
}

simple/simple_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,3 +327,43 @@ func TestJobComplete(t *testing.T) {
327327
assert.Error(t, err)
328328
assert.Equal(t, errors.New("job completed"), err)
329329
}
330+
331+
func TestTaskJobComplete(t *testing.T) {
332+
job := queue.Job{
333+
Timeout: 100 * time.Millisecond,
334+
Task: func(ctx context.Context) error {
335+
return errors.New("job completed")
336+
},
337+
}
338+
w := NewWorker()
339+
340+
err := w.handle(job)
341+
assert.Error(t, err)
342+
assert.Equal(t, errors.New("job completed"), err)
343+
344+
job = queue.Job{
345+
Timeout: 250 * time.Millisecond,
346+
Task: func(ctx context.Context) error {
347+
return nil
348+
},
349+
}
350+
351+
w = NewWorker()
352+
done := make(chan error)
353+
go func() {
354+
done <- w.handle(job)
355+
}()
356+
assert.NoError(t, w.Shutdown())
357+
err = <-done
358+
assert.NoError(t, err)
359+
360+
// job timeout
361+
job = queue.Job{
362+
Timeout: 50 * time.Millisecond,
363+
Task: func(ctx context.Context) error {
364+
time.Sleep(60 * time.Millisecond)
365+
return nil
366+
},
367+
}
368+
assert.Equal(t, context.DeadlineExceeded, w.handle(job))
369+
}

worker.go

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package queue
22

33
import (
4+
"context"
45
"errors"
56
"time"
67
)
@@ -30,7 +31,7 @@ type QueuedMessage interface {
3031

3132
var (
3233
_ Worker = (*emptyWorker)(nil)
33-
_ Worker = (*queueWorker)(nil)
34+
_ Worker = (*messageWorker)(nil)
3435
)
3536

3637
type emptyWorker struct{}
@@ -43,13 +44,13 @@ func (w *emptyWorker) Queue(job QueuedMessage) error { return nil }
4344
func (w *emptyWorker) Capacity() int { return 0 }
4445
func (w *emptyWorker) Usage() int { return 0 }
4546

46-
type queueWorker struct {
47+
type messageWorker struct {
4748
messages chan QueuedMessage
4849
}
4950

50-
func (w *queueWorker) BeforeRun() error { return nil }
51-
func (w *queueWorker) AfterRun() error { return nil }
52-
func (w *queueWorker) Run() error {
51+
func (w *messageWorker) BeforeRun() error { return nil }
52+
func (w *messageWorker) AfterRun() error { return nil }
53+
func (w *messageWorker) Run() error {
5354
for msg := range w.messages {
5455
if string(msg.Bytes()) == "panic" {
5556
panic("show panic")
@@ -59,18 +60,51 @@ func (w *queueWorker) Run() error {
5960
return nil
6061
}
6162

62-
func (w *queueWorker) Shutdown() error {
63+
func (w *messageWorker) Shutdown() error {
6364
close(w.messages)
6465
return nil
6566
}
6667

67-
func (w *queueWorker) Queue(job QueuedMessage) error {
68+
func (w *messageWorker) Queue(job QueuedMessage) error {
6869
select {
6970
case w.messages <- job:
7071
return nil
7172
default:
7273
return errors.New("max capacity reached")
7374
}
7475
}
75-
func (w *queueWorker) Capacity() int { return cap(w.messages) }
76-
func (w *queueWorker) Usage() int { return len(w.messages) }
76+
func (w *messageWorker) Capacity() int { return cap(w.messages) }
77+
func (w *messageWorker) Usage() int { return len(w.messages) }
78+
79+
type taskWorker struct {
80+
messages chan QueuedMessage
81+
}
82+
83+
func (w *taskWorker) BeforeRun() error { return nil }
84+
func (w *taskWorker) AfterRun() error { return nil }
85+
func (w *taskWorker) Run() error {
86+
for msg := range w.messages {
87+
if v, ok := msg.(Job); ok {
88+
if v.Task != nil {
89+
_ = v.Task(context.Background())
90+
}
91+
}
92+
}
93+
return nil
94+
}
95+
96+
func (w *taskWorker) Shutdown() error {
97+
close(w.messages)
98+
return nil
99+
}
100+
101+
func (w *taskWorker) Queue(job QueuedMessage) error {
102+
select {
103+
case w.messages <- job:
104+
return nil
105+
default:
106+
return errors.New("max capacity reached")
107+
}
108+
}
109+
func (w *taskWorker) Capacity() int { return cap(w.messages) }
110+
func (w *taskWorker) Usage() int { return len(w.messages) }

0 commit comments

Comments
 (0)