Skip to content

Commit 3cc52a5

Browse files
authored
chore: Add options (#32)
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent d2b56b8 commit 3cc52a5

File tree

4 files changed

+103
-84
lines changed

4 files changed

+103
-84
lines changed

consumer.go

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,8 @@ import (
99
"time"
1010
)
1111

12-
const defaultQueueSize = 4096
13-
1412
var _ Worker = (*Consumer)(nil)
1513

16-
// ConsumerOption for queue system
17-
type ConsumerOption func(*Consumer)
18-
1914
var errMaxCapacity = errors.New("max capacity reached")
2015

2116
// Worker for simple queue using channel
@@ -148,42 +143,14 @@ func (s *Consumer) Queue(job QueuedMessage) error {
148143
}
149144
}
150145

151-
// WithQueueNum setup the capcity of queue
152-
func WithConsumerQueueNum(num int) ConsumerOption {
153-
return func(w *Consumer) {
154-
w.taskQueue = make(chan QueuedMessage, num)
155-
}
156-
}
157-
158-
// WithRunFunc setup the run func of queue
159-
func WithConsumerRunFunc(fn func(context.Context, QueuedMessage) error) ConsumerOption {
160-
return func(w *Consumer) {
161-
w.runFunc = fn
162-
}
163-
}
164-
165-
// WithConsumerLogger set custom logger
166-
func WithConsumerLogger(l Logger) ConsumerOption {
167-
return func(w *Consumer) {
168-
w.logger = l
169-
}
170-
}
171-
172146
// NewConsumer for struc
173-
func NewConsumer(opts ...ConsumerOption) *Consumer {
147+
func NewConsumer(opts ...Option) *Consumer {
148+
o := NewOptions(opts...)
174149
w := &Consumer{
175-
taskQueue: make(chan QueuedMessage, defaultQueueSize),
150+
taskQueue: make(chan QueuedMessage, o.queueSize),
176151
stop: make(chan struct{}),
177-
logger: NewLogger(),
178-
runFunc: func(context.Context, QueuedMessage) error {
179-
return nil
180-
},
181-
}
182-
183-
// Loop through each option
184-
for _, opt := range opts {
185-
// Call the option giving the instantiated
186-
opt(w)
152+
logger: o.logger,
153+
runFunc: o.fn,
187154
}
188155

189156
return w

consumer_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func TestQueueUsage(t *testing.T) {
2222
}
2323

2424
func TestMaxCapacity(t *testing.T) {
25-
w := NewConsumer(WithConsumerQueueNum(2))
25+
w := NewConsumer(WithQueueSize(2))
2626
assert.Equal(t, 2, w.Capacity())
2727
assert.Equal(t, 0, w.Usage())
2828

@@ -42,7 +42,7 @@ func TestCustomFuncAndWait(t *testing.T) {
4242
message: "foo",
4343
}
4444
w := NewConsumer(
45-
WithConsumerRunFunc(func(ctx context.Context, m QueuedMessage) error {
45+
WithFn(func(ctx context.Context, m QueuedMessage) error {
4646
time.Sleep(500 * time.Millisecond)
4747
return nil
4848
}),
@@ -109,7 +109,7 @@ func TestJobReachTimeout(t *testing.T) {
109109
message: "foo",
110110
}
111111
w := NewConsumer(
112-
WithConsumerRunFunc(func(ctx context.Context, m QueuedMessage) error {
112+
WithFn(func(ctx context.Context, m QueuedMessage) error {
113113
for {
114114
select {
115115
case <-ctx.Done():
@@ -144,8 +144,8 @@ func TestCancelJobAfterShutdown(t *testing.T) {
144144
message: "foo",
145145
}
146146
w := NewConsumer(
147-
WithConsumerLogger(NewEmptyLogger()),
148-
WithConsumerRunFunc(func(ctx context.Context, m QueuedMessage) error {
147+
WithLogger(NewEmptyLogger()),
148+
WithFn(func(ctx context.Context, m QueuedMessage) error {
149149
for {
150150
select {
151151
case <-ctx.Done():
@@ -179,8 +179,8 @@ func TestGoroutineLeak(t *testing.T) {
179179
message: "foo",
180180
}
181181
w := NewConsumer(
182-
WithConsumerLogger(NewEmptyLogger()),
183-
WithConsumerRunFunc(func(ctx context.Context, m QueuedMessage) error {
182+
WithLogger(NewEmptyLogger()),
183+
WithFn(func(ctx context.Context, m QueuedMessage) error {
184184
for {
185185
select {
186186
case <-ctx.Done():
@@ -222,7 +222,7 @@ func TestGoroutinePanic(t *testing.T) {
222222
message: "foo",
223223
}
224224
w := NewConsumer(
225-
WithConsumerRunFunc(func(ctx context.Context, m QueuedMessage) error {
225+
WithFn(func(ctx context.Context, m QueuedMessage) error {
226226
panic("missing something")
227227
}),
228228
)
@@ -245,7 +245,7 @@ func TestHandleTimeout(t *testing.T) {
245245
Body: []byte("foo"),
246246
}
247247
w := NewConsumer(
248-
WithConsumerRunFunc(func(ctx context.Context, m QueuedMessage) error {
248+
WithFn(func(ctx context.Context, m QueuedMessage) error {
249249
time.Sleep(200 * time.Millisecond)
250250
return nil
251251
}),
@@ -261,7 +261,7 @@ func TestHandleTimeout(t *testing.T) {
261261
}
262262

263263
w = NewConsumer(
264-
WithConsumerRunFunc(func(ctx context.Context, m QueuedMessage) error {
264+
WithFn(func(ctx context.Context, m QueuedMessage) error {
265265
time.Sleep(200 * time.Millisecond)
266266
return nil
267267
}),
@@ -285,7 +285,7 @@ func TestJobComplete(t *testing.T) {
285285
Body: []byte("foo"),
286286
}
287287
w := NewConsumer(
288-
WithConsumerRunFunc(func(ctx context.Context, m QueuedMessage) error {
288+
WithFn(func(ctx context.Context, m QueuedMessage) error {
289289
return errors.New("job completed")
290290
}),
291291
)
@@ -300,7 +300,7 @@ func TestJobComplete(t *testing.T) {
300300
}
301301

302302
w = NewConsumer(
303-
WithConsumerRunFunc(func(ctx context.Context, m QueuedMessage) error {
303+
WithFn(func(ctx context.Context, m QueuedMessage) error {
304304
time.Sleep(200 * time.Millisecond)
305305
return errors.New("job completed")
306306
}),

options.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package queue
2+
3+
import (
4+
"context"
5+
"runtime"
6+
"time"
7+
)
8+
9+
var (
10+
defaultQueueSize = 4096
11+
defaultWorkerCount = runtime.NumCPU()
12+
defaultTimeout = 60 * time.Minute
13+
defaultNewLogger = NewLogger()
14+
defaultFn = func(context.Context, QueuedMessage) error { return nil }
15+
)
16+
17+
// Option for queue system
18+
type Option func(*Options)
19+
20+
// WithWorkerCount set worker count
21+
func WithWorkerCount(num int) Option {
22+
return func(q *Options) {
23+
q.workerCount = num
24+
}
25+
}
26+
27+
// WithQueueSize set worker count
28+
func WithQueueSize(num int) Option {
29+
return func(q *Options) {
30+
q.queueSize = num
31+
}
32+
}
33+
34+
// WithLogger set custom logger
35+
func WithLogger(l Logger) Option {
36+
return func(q *Options) {
37+
q.logger = l
38+
}
39+
}
40+
41+
// WithWorker set custom worker
42+
func WithWorker(w Worker) Option {
43+
return func(q *Options) {
44+
q.worker = w
45+
}
46+
}
47+
48+
// WithFn set custom job function
49+
func WithFn(fn func(context.Context, QueuedMessage) error) Option {
50+
return func(q *Options) {
51+
q.fn = fn
52+
}
53+
}
54+
55+
type Options struct {
56+
workerCount int
57+
timeout time.Duration
58+
logger Logger
59+
queueSize int
60+
worker Worker
61+
fn func(context.Context, QueuedMessage) error
62+
}
63+
64+
func NewOptions(opts ...Option) *Options {
65+
o := &Options{
66+
workerCount: defaultWorkerCount,
67+
queueSize: defaultQueueSize,
68+
timeout: defaultTimeout,
69+
logger: defaultNewLogger,
70+
worker: nil,
71+
fn: defaultFn,
72+
}
73+
74+
// Loop through each option
75+
for _, opt := range opts {
76+
// Call the option giving the instantiated
77+
opt(o)
78+
}
79+
80+
return o
81+
}

queue.go

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7-
"runtime"
87
"sync"
98
"sync/atomic"
109
"time"
@@ -48,47 +47,19 @@ func (j Job) Encode() []byte {
4847
return b
4948
}
5049

51-
// Option for queue system
52-
type Option func(*Queue)
53-
5450
// ErrMissingWorker missing define worker
5551
var ErrMissingWorker = errors.New("missing worker module")
5652

57-
// WithWorkerCount set worker count
58-
func WithWorkerCount(num int) Option {
59-
return func(q *Queue) {
60-
q.workerCount = num
61-
}
62-
}
63-
64-
// WithLogger set custom logger
65-
func WithLogger(l Logger) Option {
66-
return func(q *Queue) {
67-
q.logger = l
68-
}
69-
}
70-
71-
// WithWorker set custom worker
72-
func WithWorker(w Worker) Option {
73-
return func(q *Queue) {
74-
q.worker = w
75-
}
76-
}
77-
7853
// NewQueue returns a Queue.
7954
func NewQueue(opts ...Option) (*Queue, error) {
55+
o := NewOptions(opts...)
8056
q := &Queue{
81-
workerCount: runtime.NumCPU(),
57+
workerCount: o.workerCount,
8258
routineGroup: newRoutineGroup(),
8359
quit: make(chan struct{}),
84-
logger: NewLogger(),
85-
timeout: 24 * 60 * time.Minute,
86-
}
87-
88-
// Loop through each option
89-
for _, opt := range opts {
90-
// Call the option giving the instantiated
91-
opt(q)
60+
logger: o.logger,
61+
timeout: o.timeout,
62+
worker: o.worker,
9263
}
9364

9465
if q.worker == nil {

0 commit comments

Comments
 (0)