Skip to content

Commit a6f85d7

Browse files
authored
chore: support using channel in queue (#3)
1 parent 2caf268 commit a6f85d7

File tree

3 files changed

+168
-2
lines changed

3 files changed

+168
-2
lines changed

simple/simple.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package simple
2+
3+
import (
4+
"errors"
5+
"runtime"
6+
7+
"github.com/appleboy/queue"
8+
)
9+
10+
var _ queue.Worker = (*Worker)(nil)
11+
12+
// Option for queue system
13+
type Option func(*Worker)
14+
15+
var errMaxCapacity = errors.New("max capacity reached")
16+
17+
// Worker for simple queue using channel
18+
type Worker struct {
19+
queueNotification chan queue.QueuedMessage
20+
runFunc func(queue.QueuedMessage) error
21+
}
22+
23+
// BeforeRun run script before start worker
24+
func (s *Worker) BeforeRun() error {
25+
return nil
26+
}
27+
28+
// AfterRun run script after start worker
29+
func (s *Worker) AfterRun() error {
30+
return nil
31+
}
32+
33+
// Run start the worker
34+
func (s *Worker) Run(_ chan struct{}) error {
35+
for notification := range s.queueNotification {
36+
_ = s.runFunc(notification)
37+
}
38+
return nil
39+
}
40+
41+
// Shutdown worker
42+
func (s *Worker) Shutdown() error {
43+
close(s.queueNotification)
44+
return nil
45+
}
46+
47+
// Capacity for channel
48+
func (s *Worker) Capacity() int {
49+
return cap(s.queueNotification)
50+
}
51+
52+
// Usage for count of channel usage
53+
func (s *Worker) Usage() int {
54+
return len(s.queueNotification)
55+
}
56+
57+
// Queue send notification to queue
58+
func (s *Worker) Queue(job queue.QueuedMessage) error {
59+
select {
60+
case s.queueNotification <- job:
61+
return nil
62+
default:
63+
return errMaxCapacity
64+
}
65+
}
66+
67+
// WithQueueNum setup the capcity of queue
68+
func WithQueueNum(num int) Option {
69+
return func(w *Worker) {
70+
w.queueNotification = make(chan queue.QueuedMessage, num)
71+
}
72+
}
73+
74+
// WithRunFunc setup the run func of queue
75+
func WithRunFunc(fn func(queue.QueuedMessage) error) Option {
76+
return func(w *Worker) {
77+
w.runFunc = fn
78+
}
79+
}
80+
81+
// NewWorker for struc
82+
func NewWorker(opts ...Option) *Worker {
83+
w := &Worker{
84+
queueNotification: make(chan queue.QueuedMessage, runtime.NumCPU()<<1),
85+
runFunc: func(msg queue.QueuedMessage) error {
86+
return nil
87+
},
88+
}
89+
90+
// Loop through each option
91+
for _, opt := range opts {
92+
// Call the option giving the instantiated
93+
opt(w)
94+
}
95+
96+
return w
97+
}

simple/simple_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package simple
2+
3+
import (
4+
"runtime"
5+
"testing"
6+
"time"
7+
8+
"github.com/appleboy/queue"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
type mockMessage struct {
14+
msg string
15+
}
16+
17+
func (m mockMessage) Bytes() []byte {
18+
return []byte(m.msg)
19+
}
20+
21+
func TestQueueUsage(t *testing.T) {
22+
w := NewWorker()
23+
assert.Equal(t, runtime.NumCPU()<<1, w.Capacity())
24+
assert.Equal(t, 0, w.Usage())
25+
26+
assert.NoError(t, w.Queue(&mockMessage{}))
27+
assert.Equal(t, 1, w.Usage())
28+
}
29+
30+
func TestMaxCapacity(t *testing.T) {
31+
w := NewWorker(WithQueueNum(2))
32+
assert.Equal(t, 2, w.Capacity())
33+
assert.Equal(t, 0, w.Usage())
34+
35+
assert.NoError(t, w.Queue(&mockMessage{}))
36+
assert.Equal(t, 1, w.Usage())
37+
assert.NoError(t, w.Queue(&mockMessage{}))
38+
assert.Equal(t, 2, w.Usage())
39+
assert.Error(t, w.Queue(&mockMessage{}))
40+
assert.Equal(t, 2, w.Usage())
41+
42+
err := w.Queue(&mockMessage{})
43+
assert.Equal(t, errMaxCapacity, err)
44+
}
45+
46+
func TestCustomFuncAndWait(t *testing.T) {
47+
m := mockMessage{
48+
msg: "foo",
49+
}
50+
w := NewWorker(
51+
WithRunFunc(func(msg queue.QueuedMessage) error {
52+
time.Sleep(500 * time.Millisecond)
53+
return nil
54+
}),
55+
)
56+
q, err := queue.NewQueue(
57+
queue.WithWorker(w),
58+
queue.WithWorkerCount(2),
59+
)
60+
assert.NoError(t, err)
61+
q.Start()
62+
time.Sleep(100 * time.Millisecond)
63+
assert.NoError(t, w.Queue(m))
64+
assert.NoError(t, w.Queue(m))
65+
assert.NoError(t, w.Queue(m))
66+
assert.NoError(t, w.Queue(m))
67+
time.Sleep(600 * time.Millisecond)
68+
q.Shutdown()
69+
q.Wait()
70+
// you will see the execute time > 1000ms
71+
}

worker.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package queue
22

33
import (
44
"errors"
5-
"log"
65
"time"
76
)
87

@@ -52,7 +51,6 @@ func (w *queueWorker) BeforeRun() error { return nil }
5251
func (w *queueWorker) AfterRun() error { return nil }
5352
func (w *queueWorker) Run(chan struct{}) error {
5453
for msg := range w.messages {
55-
log.Println("got message", msg)
5654
if string(msg.Bytes()) == "panic" {
5755
panic("show panic")
5856
}

0 commit comments

Comments
 (0)