Skip to content

Commit 286596a

Browse files
committed
chore(queue): test Capacity and Usage
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 0241103 commit 286596a

File tree

2 files changed

+111
-0
lines changed

2 files changed

+111
-0
lines changed

queue_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,19 @@ package queue
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/stretchr/testify/assert"
78
)
89

10+
type mockMessage struct {
11+
message string
12+
}
13+
14+
func (m mockMessage) Bytes() []byte {
15+
return []byte(m.message)
16+
}
17+
918
func TestNewQueue(t *testing.T) {
1019
q, err := NewQueue()
1120
assert.Error(t, err)
@@ -18,3 +27,69 @@ func TestNewQueue(t *testing.T) {
1827
assert.NoError(t, err)
1928
assert.NotNil(t, q)
2029
}
30+
31+
func TestWorkerNum(t *testing.T) {
32+
w := &queueWorker{
33+
messages: make(chan QueuedMessage, 100),
34+
}
35+
q, err := NewQueue(
36+
WithWorker(w),
37+
WithWorkerCount(2),
38+
)
39+
assert.NoError(t, err)
40+
assert.NotNil(t, q)
41+
42+
q.Start()
43+
q.Start()
44+
time.Sleep(20 * time.Millisecond)
45+
assert.Equal(t, 4, q.Workers())
46+
q.Shutdown()
47+
q.Wait()
48+
}
49+
50+
func TestShtdonwOnce(t *testing.T) {
51+
w := &queueWorker{
52+
messages: make(chan QueuedMessage, 100),
53+
}
54+
q, err := NewQueue(
55+
WithWorker(w),
56+
WithWorkerCount(2),
57+
)
58+
assert.NoError(t, err)
59+
assert.NotNil(t, q)
60+
61+
q.Start()
62+
time.Sleep(20 * time.Millisecond)
63+
assert.Equal(t, 2, q.Workers())
64+
q.Shutdown()
65+
// don't panic here
66+
q.Shutdown()
67+
q.Wait()
68+
assert.Equal(t, 0, q.Workers())
69+
}
70+
71+
func TestWorkerStatus(t *testing.T) {
72+
m := mockMessage{
73+
message: "foobar",
74+
}
75+
w := &queueWorker{
76+
messages: make(chan QueuedMessage, 100),
77+
}
78+
q, err := NewQueue(
79+
WithWorker(w),
80+
WithWorkerCount(2),
81+
)
82+
assert.NoError(t, err)
83+
assert.NotNil(t, q)
84+
85+
q.Queue(m)
86+
q.Queue(m)
87+
q.Queue(m)
88+
q.Queue(m)
89+
assert.Equal(t, 100, q.Capacity())
90+
assert.Equal(t, 4, q.Usage())
91+
q.Start()
92+
time.Sleep(20 * time.Millisecond)
93+
q.Shutdown()
94+
q.Wait()
95+
}

worker.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
package queue
22

3+
import (
4+
"errors"
5+
"log"
6+
"time"
7+
)
8+
39
// Worker interface
410
type Worker interface {
511
BeforeRun() error
@@ -28,3 +34,33 @@ func (w *emptyWorker) Shutdown() error { return nil }
2834
func (w *emptyWorker) Queue(job QueuedMessage) error { return nil }
2935
func (w *emptyWorker) Capacity() int { return 0 }
3036
func (w *emptyWorker) Usage() int { return 0 }
37+
38+
type queueWorker struct {
39+
messages chan QueuedMessage
40+
}
41+
42+
func (w *queueWorker) BeforeRun() error { return nil }
43+
func (w *queueWorker) AfterRun() error { return nil }
44+
func (w *queueWorker) Run(chan struct{}) error {
45+
for msg := range w.messages {
46+
log.Println("got message", msg)
47+
time.Sleep(100 * time.Millisecond)
48+
}
49+
return nil
50+
}
51+
52+
func (w *queueWorker) Shutdown() error {
53+
close(w.messages)
54+
return nil
55+
}
56+
57+
func (w *queueWorker) Queue(job QueuedMessage) error {
58+
select {
59+
case w.messages <- job:
60+
return nil
61+
default:
62+
return errors.New("max capacity reached")
63+
}
64+
}
65+
func (w *queueWorker) Capacity() int { return cap(w.messages) }
66+
func (w *queueWorker) Usage() int { return len(w.messages) }

0 commit comments

Comments
 (0)