Skip to content

Commit 8c30db8

Browse files
authored
chore(options): add new options. (#8)
1 parent 0ccc674 commit 8c30db8

File tree

2 files changed

+146
-127
lines changed

2 files changed

+146
-127
lines changed

options.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package redisdb
2+
3+
import (
4+
"context"
5+
6+
"github.com/golang-queue/queue"
7+
)
8+
9+
// Option for queue system
10+
type Option func(*options)
11+
12+
type options struct {
13+
runFunc func(context.Context, queue.QueuedMessage) error
14+
logger queue.Logger
15+
metric queue.Metric
16+
addr string
17+
db int
18+
connectionString string
19+
password string
20+
channelName string
21+
channelSize int
22+
cluster bool
23+
}
24+
25+
// WithAddr setup the addr of redis
26+
func WithAddr(addr string) Option {
27+
return func(w *options) {
28+
w.addr = addr
29+
}
30+
}
31+
32+
// WithPassword redis password
33+
func WithDB(db int) Option {
34+
return func(w *options) {
35+
w.db = db
36+
}
37+
}
38+
39+
// WithCluster redis cluster
40+
func WithCluster(enable bool) Option {
41+
return func(w *options) {
42+
w.cluster = enable
43+
}
44+
}
45+
46+
// WithChannelSize redis channel size
47+
func WithChannelSize(size int) Option {
48+
return func(w *options) {
49+
w.channelSize = size
50+
}
51+
}
52+
53+
// WithPassword redis password
54+
func WithPassword(passwd string) Option {
55+
return func(w *options) {
56+
w.password = passwd
57+
}
58+
}
59+
60+
// WithConnectionString redis connection string
61+
func WithConnectionString(connectionString string) Option {
62+
return func(w *options) {
63+
w.connectionString = connectionString
64+
}
65+
}
66+
67+
// WithChannel setup the channel of redis
68+
func WithChannel(channel string) Option {
69+
return func(w *options) {
70+
w.channelName = channel
71+
}
72+
}
73+
74+
// WithRunFunc setup the run func of queue
75+
func WithRunFunc(fn func(context.Context, queue.QueuedMessage) error) Option {
76+
return func(w *options) {
77+
w.runFunc = fn
78+
}
79+
}
80+
81+
// WithLogger set custom logger
82+
func WithLogger(l queue.Logger) Option {
83+
return func(w *options) {
84+
w.logger = l
85+
}
86+
}
87+
88+
// WithMetric set custom Metric
89+
func WithMetric(m queue.Metric) Option {
90+
return func(w *options) {
91+
w.metric = m
92+
}
93+
}
94+
95+
func newOptions(opts ...Option) options {
96+
defaultOpts := options{
97+
addr: "127.0.0.1:6379",
98+
channelName: "queue",
99+
channelSize: 1024,
100+
logger: queue.NewLogger(),
101+
runFunc: func(context.Context, queue.QueuedMessage) error {
102+
return nil
103+
},
104+
metric: queue.NewMetric(),
105+
}
106+
107+
// Loop through each option
108+
for _, opt := range opts {
109+
// Call the option giving the instantiated
110+
opt(&defaultOpts)
111+
}
112+
113+
return defaultOpts
114+
}

redis.go

Lines changed: 32 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -15,181 +15,86 @@ import (
1515

1616
var _ queue.Worker = (*Worker)(nil)
1717

18-
// Option for queue system
19-
type Option func(*Worker)
20-
2118
// Worker for Redis
2219
type Worker struct {
2320
// redis config
24-
rdb redis.Cmdable
25-
pubsub *redis.PubSub
26-
channel <-chan *redis.Message
27-
addr string
28-
db int
29-
connectionString string
30-
password string
31-
channelName string
32-
channelSize int
33-
cluster bool
34-
21+
rdb redis.Cmdable
22+
pubsub *redis.PubSub
23+
channel <-chan *redis.Message
24+
stopFlag int32
3525
stopOnce sync.Once
3626
stop chan struct{}
37-
runFunc func(context.Context, queue.QueuedMessage) error
38-
logger queue.Logger
39-
stopFlag int32
40-
metric queue.Metric
27+
28+
opts options
4129
}
4230

4331
func (w *Worker) incBusyWorker() {
44-
w.metric.IncBusyWorker()
32+
w.opts.metric.IncBusyWorker()
4533
}
4634

4735
func (w *Worker) decBusyWorker() {
48-
w.metric.DecBusyWorker()
36+
w.opts.metric.DecBusyWorker()
4937
}
5038

5139
// BusyWorkers return count of busy workers currently.
5240
func (w *Worker) BusyWorkers() uint64 {
53-
return w.metric.BusyWorkers()
54-
}
55-
56-
// WithAddr setup the addr of redis
57-
func WithAddr(addr string) Option {
58-
return func(w *Worker) {
59-
w.addr = addr
60-
}
61-
}
62-
63-
// WithPassword redis password
64-
func WithDB(db int) Option {
65-
return func(w *Worker) {
66-
w.db = db
67-
}
68-
}
69-
70-
// WithCluster redis cluster
71-
func WithCluster(enable bool) Option {
72-
return func(w *Worker) {
73-
w.cluster = enable
74-
}
75-
}
76-
77-
// WithChannelSize redis channel size
78-
func WithChannelSize(size int) Option {
79-
return func(w *Worker) {
80-
w.channelSize = size
81-
}
82-
}
83-
84-
// WithPassword redis password
85-
func WithPassword(passwd string) Option {
86-
return func(w *Worker) {
87-
w.password = passwd
88-
}
89-
}
90-
91-
// WithConnectionString redis connection string
92-
func WithConnectionString(connectionString string) Option {
93-
return func(w *Worker) {
94-
w.connectionString = connectionString
95-
}
96-
}
97-
98-
// WithChannel setup the channel of redis
99-
func WithChannel(channel string) Option {
100-
return func(w *Worker) {
101-
w.channelName = channel
102-
}
103-
}
104-
105-
// WithRunFunc setup the run func of queue
106-
func WithRunFunc(fn func(context.Context, queue.QueuedMessage) error) Option {
107-
return func(w *Worker) {
108-
w.runFunc = fn
109-
}
110-
}
111-
112-
// WithLogger set custom logger
113-
func WithLogger(l queue.Logger) Option {
114-
return func(w *Worker) {
115-
w.logger = l
116-
}
117-
}
118-
119-
// WithMetric set custom Metric
120-
func WithMetric(m queue.Metric) Option {
121-
return func(w *Worker) {
122-
w.metric = m
123-
}
41+
return w.opts.metric.BusyWorkers()
12442
}
12543

12644
// NewWorker for struc
12745
func NewWorker(opts ...Option) *Worker {
12846
var err error
12947
w := &Worker{
130-
addr: "127.0.0.1:6379",
131-
channelName: "queue",
132-
channelSize: 1024,
133-
stop: make(chan struct{}),
134-
logger: queue.NewLogger(),
135-
runFunc: func(context.Context, queue.QueuedMessage) error {
136-
return nil
137-
},
138-
metric: queue.NewMetric(),
139-
}
140-
141-
// Loop through each option
142-
for _, opt := range opts {
143-
// Call the option giving the instantiated
144-
opt(w)
48+
opts: newOptions(opts...),
49+
stop: make(chan struct{}),
14550
}
14651

147-
if w.connectionString != "" {
148-
options, err := redis.ParseURL(w.connectionString)
52+
if w.opts.connectionString != "" {
53+
options, err := redis.ParseURL(w.opts.connectionString)
14954
if err != nil {
150-
w.logger.Fatal(err)
55+
w.opts.logger.Fatal(err)
15156
}
15257
w.rdb = redis.NewClient(options)
153-
} else if w.addr != "" {
154-
if w.cluster {
58+
} else if w.opts.addr != "" {
59+
if w.opts.cluster {
15560
w.rdb = redis.NewClusterClient(&redis.ClusterOptions{
156-
Addrs: strings.Split(w.addr, ","),
157-
Password: w.password,
61+
Addrs: strings.Split(w.opts.addr, ","),
62+
Password: w.opts.password,
15863
})
15964
} else {
16065
options := &redis.Options{
161-
Addr: w.addr,
162-
Password: w.password,
163-
DB: w.db,
66+
Addr: w.opts.addr,
67+
Password: w.opts.password,
68+
DB: w.opts.db,
16469
}
16570
w.rdb = redis.NewClient(options)
16671
}
16772
}
16873

16974
_, err = w.rdb.Ping(context.Background()).Result()
17075
if err != nil {
171-
w.logger.Fatal(err)
76+
w.opts.logger.Fatal(err)
17277
}
17378

17479
ctx := context.Background()
17580

17681
switch v := w.rdb.(type) {
17782
case *redis.Client:
178-
w.pubsub = v.Subscribe(ctx, w.channelName)
83+
w.pubsub = v.Subscribe(ctx, w.opts.channelName)
17984
case *redis.ClusterClient:
180-
w.pubsub = v.Subscribe(ctx, w.channelName)
85+
w.pubsub = v.Subscribe(ctx, w.opts.channelName)
18186
}
18287

18388
var ropts []redis.ChannelOption
18489

185-
if w.channelSize > 1 {
186-
ropts = append(ropts, redis.WithChannelSize(w.channelSize))
90+
if w.opts.channelSize > 1 {
91+
ropts = append(ropts, redis.WithChannelSize(w.opts.channelSize))
18792
}
18893

18994
w.channel = w.pubsub.Channel(ropts...)
19095
// make sure the connection is successful
19196
if err := w.pubsub.Ping(ctx); err != nil {
192-
w.logger.Fatal(err)
97+
w.opts.logger.Fatal(err)
19398
}
19499

195100
return w
@@ -227,7 +132,7 @@ func (w *Worker) handle(job queue.Job) error {
227132
}()
228133

229134
// run custom process function
230-
done <- w.runFunc(ctx, job)
135+
done <- w.opts.runFunc(ctx, job)
231136
}()
232137

233138
select {
@@ -292,7 +197,7 @@ func (w *Worker) Queue(job queue.QueuedMessage) error {
292197
ctx := context.Background()
293198

294199
// Publish a message.
295-
err := w.rdb.Publish(ctx, w.channelName, job.Bytes()).Err()
200+
err := w.rdb.Publish(ctx, w.opts.channelName, job.Bytes()).Err()
296201
if err != nil {
297202
return err
298203
}
@@ -319,16 +224,16 @@ func (w *Worker) Run() error {
319224
}
320225

321226
if !ok {
322-
return fmt.Errorf("redis pubsub: channel=%s closed", w.channelName)
227+
return fmt.Errorf("redis pubsub: channel=%s closed", w.opts.channelName)
323228
}
324229

325230
var data queue.Job
326231
if err := json.Unmarshal([]byte(m.Payload), &data); err != nil {
327-
w.logger.Error("json unmarshal error: ", err)
232+
w.opts.logger.Error("json unmarshal error: ", err)
328233
continue
329234
}
330235
if err := w.handle(data); err != nil {
331-
w.logger.Error("handle job error: ", err)
236+
w.opts.logger.Error("handle job error: ", err)
332237
}
333238
case <-w.stop:
334239
return nil

0 commit comments

Comments
 (0)