Skip to content

Commit ed16ba0

Browse files
committed
chore: Add test worker.
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent c76d54e commit ed16ba0

File tree

4 files changed

+97
-86
lines changed

4 files changed

+97
-86
lines changed

worker.go

Lines changed: 0 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
package queue
22

3-
import (
4-
"context"
5-
"errors"
6-
"time"
7-
)
8-
93
// Worker interface
104
type Worker interface {
115
// BeforeRun is called before starting the worker
@@ -28,83 +22,3 @@ type Worker interface {
2822
type QueuedMessage interface {
2923
Bytes() []byte
3024
}
31-
32-
var (
33-
_ Worker = (*emptyWorker)(nil)
34-
_ Worker = (*messageWorker)(nil)
35-
)
36-
37-
type emptyWorker struct{}
38-
39-
func (w *emptyWorker) BeforeRun() error { return nil }
40-
func (w *emptyWorker) AfterRun() error { return nil }
41-
func (w *emptyWorker) Run() error { return nil }
42-
func (w *emptyWorker) Shutdown() error { return nil }
43-
func (w *emptyWorker) Queue(job QueuedMessage) error { return nil }
44-
func (w *emptyWorker) Capacity() int { return 0 }
45-
func (w *emptyWorker) Usage() int { return 0 }
46-
47-
type messageWorker struct {
48-
messages chan QueuedMessage
49-
}
50-
51-
func (w *messageWorker) BeforeRun() error { return nil }
52-
func (w *messageWorker) AfterRun() error { return nil }
53-
func (w *messageWorker) Run() error {
54-
for msg := range w.messages {
55-
if string(msg.Bytes()) == "panic" {
56-
panic("show panic")
57-
}
58-
time.Sleep(20 * time.Millisecond)
59-
}
60-
return nil
61-
}
62-
63-
func (w *messageWorker) Shutdown() error {
64-
close(w.messages)
65-
return nil
66-
}
67-
68-
func (w *messageWorker) Queue(job QueuedMessage) error {
69-
select {
70-
case w.messages <- job:
71-
return nil
72-
default:
73-
return errors.New("max capacity reached")
74-
}
75-
}
76-
func (w *messageWorker) Capacity() int { return cap(w.messages) }
77-
func (w *messageWorker) Usage() int { return len(w.messages) }
78-
79-
type taskWorker struct {
80-
messages chan QueuedMessage
81-
}
82-
83-
func (w *taskWorker) BeforeRun() error { return nil }
84-
func (w *taskWorker) AfterRun() error { return nil }
85-
func (w *taskWorker) Run() error {
86-
for msg := range w.messages {
87-
if v, ok := msg.(Job); ok {
88-
if v.Task != nil {
89-
_ = v.Task(context.Background())
90-
}
91-
}
92-
}
93-
return nil
94-
}
95-
96-
func (w *taskWorker) Shutdown() error {
97-
close(w.messages)
98-
return nil
99-
}
100-
101-
func (w *taskWorker) Queue(job QueuedMessage) error {
102-
select {
103-
case w.messages <- job:
104-
return nil
105-
default:
106-
return errors.New("max capacity reached")
107-
}
108-
}
109-
func (w *taskWorker) Capacity() int { return cap(w.messages) }
110-
func (w *taskWorker) Usage() int { return len(w.messages) }

worker_empty.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package queue
2+
3+
var _ Worker = (*emptyWorker)(nil)
4+
5+
// just for unit testing, don't use it.
6+
type emptyWorker struct{}
7+
8+
func (w *emptyWorker) BeforeRun() error { return nil }
9+
func (w *emptyWorker) AfterRun() error { return nil }
10+
func (w *emptyWorker) Run() error { return nil }
11+
func (w *emptyWorker) Shutdown() error { return nil }
12+
func (w *emptyWorker) Queue(job QueuedMessage) error { return nil }
13+
func (w *emptyWorker) Capacity() int { return 0 }
14+
func (w *emptyWorker) Usage() int { return 0 }

worker_message.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package queue
2+
3+
import (
4+
"errors"
5+
"time"
6+
)
7+
8+
var _ Worker = (*messageWorker)(nil)
9+
10+
// just for unit testing, don't use it.
11+
type messageWorker struct {
12+
messages chan QueuedMessage
13+
}
14+
15+
func (w *messageWorker) BeforeRun() error { return nil }
16+
func (w *messageWorker) AfterRun() error { return nil }
17+
func (w *messageWorker) Run() error {
18+
for msg := range w.messages {
19+
if string(msg.Bytes()) == "panic" {
20+
panic("show panic")
21+
}
22+
time.Sleep(20 * time.Millisecond)
23+
}
24+
return nil
25+
}
26+
27+
func (w *messageWorker) Shutdown() error {
28+
close(w.messages)
29+
return nil
30+
}
31+
32+
func (w *messageWorker) Queue(job QueuedMessage) error {
33+
select {
34+
case w.messages <- job:
35+
return nil
36+
default:
37+
return errors.New("max capacity reached")
38+
}
39+
}
40+
func (w *messageWorker) Capacity() int { return cap(w.messages) }
41+
func (w *messageWorker) Usage() int { return len(w.messages) }

worker_task.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package queue
2+
3+
import (
4+
"context"
5+
"errors"
6+
)
7+
8+
var _ Worker = (*taskWorker)(nil)
9+
10+
// just for unit testing, don't use it.
11+
type taskWorker struct {
12+
messages chan QueuedMessage
13+
}
14+
15+
func (w *taskWorker) BeforeRun() error { return nil }
16+
func (w *taskWorker) AfterRun() error { return nil }
17+
func (w *taskWorker) Run() error {
18+
for msg := range w.messages {
19+
if v, ok := msg.(Job); ok {
20+
if v.Task != nil {
21+
_ = v.Task(context.Background())
22+
}
23+
}
24+
}
25+
return nil
26+
}
27+
28+
func (w *taskWorker) Shutdown() error {
29+
close(w.messages)
30+
return nil
31+
}
32+
33+
func (w *taskWorker) Queue(job QueuedMessage) error {
34+
select {
35+
case w.messages <- job:
36+
return nil
37+
default:
38+
return errors.New("max capacity reached")
39+
}
40+
}
41+
func (w *taskWorker) Capacity() int { return cap(w.messages) }
42+
func (w *taskWorker) Usage() int { return len(w.messages) }

0 commit comments

Comments
 (0)