1
- package simple
1
+ package queue
2
2
3
3
import (
4
4
"context"
@@ -7,40 +7,38 @@ import (
7
7
"sync"
8
8
"sync/atomic"
9
9
"time"
10
-
11
- "github.com/golang-queue/queue"
12
10
)
13
11
14
12
const defaultQueueSize = 4096
15
13
16
- var _ queue. Worker = (* Worker )(nil )
14
+ var _ Worker = (* Consumer )(nil )
17
15
18
- // Option for queue system
19
- type Option func (* Worker )
16
+ // ConsumerOption for queue system
17
+ type ConsumerOption func (* Consumer )
20
18
21
19
var errMaxCapacity = errors .New ("max capacity reached" )
22
20
23
21
// Worker for simple queue using channel
24
- type Worker struct {
25
- taskQueue chan queue. QueuedMessage
26
- runFunc func (context.Context , queue. QueuedMessage ) error
22
+ type Consumer struct {
23
+ taskQueue chan QueuedMessage
24
+ runFunc func (context.Context , QueuedMessage ) error
27
25
stop chan struct {}
28
- logger queue. Logger
26
+ logger Logger
29
27
stopOnce sync.Once
30
28
stopFlag int32
31
29
}
32
30
33
31
// BeforeRun run script before start worker
34
- func (s * Worker ) BeforeRun () error {
32
+ func (s * Consumer ) BeforeRun () error {
35
33
return nil
36
34
}
37
35
38
36
// AfterRun run script after start worker
39
- func (s * Worker ) AfterRun () error {
37
+ func (s * Consumer ) AfterRun () error {
40
38
return nil
41
39
}
42
40
43
- func (s * Worker ) handle (job queue. Job ) error {
41
+ func (s * Consumer ) handle (job Job ) error {
44
42
// create channel with buffer size 1 to avoid goroutine leak
45
43
done := make (chan error , 1 )
46
44
panicChan := make (chan interface {}, 1 )
@@ -90,18 +88,18 @@ func (s *Worker) handle(job queue.Job) error {
90
88
}
91
89
92
90
// Run start the worker
93
- func (s * Worker ) Run () error {
91
+ func (s * Consumer ) Run () error {
94
92
// check queue status
95
93
select {
96
94
case <- s .stop :
97
- return queue . ErrQueueShutdown
95
+ return ErrQueueShutdown
98
96
default :
99
97
}
100
98
101
99
for task := range s .taskQueue {
102
- var data queue. Job
100
+ var data Job
103
101
_ = json .Unmarshal (task .Bytes (), & data )
104
- if v , ok := task .(queue. Job ); ok {
102
+ if v , ok := task .(Job ); ok {
105
103
if v .Task != nil {
106
104
data .Task = v .Task
107
105
}
@@ -114,9 +112,9 @@ func (s *Worker) Run() error {
114
112
}
115
113
116
114
// Shutdown worker
117
- func (s * Worker ) Shutdown () error {
115
+ func (s * Consumer ) Shutdown () error {
118
116
if ! atomic .CompareAndSwapInt32 (& s .stopFlag , 0 , 1 ) {
119
- return queue . ErrQueueShutdown
117
+ return ErrQueueShutdown
120
118
}
121
119
122
120
s .stopOnce .Do (func () {
@@ -127,19 +125,19 @@ func (s *Worker) Shutdown() error {
127
125
}
128
126
129
127
// Capacity for channel
130
- func (s * Worker ) Capacity () int {
128
+ func (s * Consumer ) Capacity () int {
131
129
return cap (s .taskQueue )
132
130
}
133
131
134
132
// Usage for count of channel usage
135
- func (s * Worker ) Usage () int {
133
+ func (s * Consumer ) Usage () int {
136
134
return len (s .taskQueue )
137
135
}
138
136
139
137
// Queue send notification to queue
140
- func (s * Worker ) Queue (job queue. QueuedMessage ) error {
138
+ func (s * Consumer ) Queue (job QueuedMessage ) error {
141
139
if atomic .LoadInt32 (& s .stopFlag ) == 1 {
142
- return queue . ErrQueueShutdown
140
+ return ErrQueueShutdown
143
141
}
144
142
145
143
select {
@@ -151,33 +149,33 @@ func (s *Worker) Queue(job queue.QueuedMessage) error {
151
149
}
152
150
153
151
// WithQueueNum setup the capcity of queue
154
- func WithQueueNum (num int ) Option {
155
- return func (w * Worker ) {
156
- w .taskQueue = make (chan queue. QueuedMessage , num )
152
+ func WithConsumerQueueNum (num int ) ConsumerOption {
153
+ return func (w * Consumer ) {
154
+ w .taskQueue = make (chan QueuedMessage , num )
157
155
}
158
156
}
159
157
160
158
// WithRunFunc setup the run func of queue
161
- func WithRunFunc (fn func (context.Context , queue. QueuedMessage ) error ) Option {
162
- return func (w * Worker ) {
159
+ func WithConsumerRunFunc (fn func (context.Context , QueuedMessage ) error ) ConsumerOption {
160
+ return func (w * Consumer ) {
163
161
w .runFunc = fn
164
162
}
165
163
}
166
164
167
- // WithLogger set custom logger
168
- func WithLogger (l queue. Logger ) Option {
169
- return func (w * Worker ) {
165
+ // WithConsumerLogger set custom logger
166
+ func WithConsumerLogger (l Logger ) ConsumerOption {
167
+ return func (w * Consumer ) {
170
168
w .logger = l
171
169
}
172
170
}
173
171
174
- // NewWorker for struc
175
- func NewWorker (opts ... Option ) * Worker {
176
- w := & Worker {
177
- taskQueue : make (chan queue. QueuedMessage , defaultQueueSize ),
172
+ // NewConsumer for struc
173
+ func NewConsumer (opts ... ConsumerOption ) * Consumer {
174
+ w := & Consumer {
175
+ taskQueue : make (chan QueuedMessage , defaultQueueSize ),
178
176
stop : make (chan struct {}),
179
- logger : queue . NewLogger (),
180
- runFunc : func (context.Context , queue. QueuedMessage ) error {
177
+ logger : NewLogger (),
178
+ runFunc : func (context.Context , QueuedMessage ) error {
181
179
return nil
182
180
},
183
181
}
0 commit comments