Skip to content

Commit 8be2447

Browse files
authored
chore(options): add new options. (#5)
1 parent ab4bfa3 commit 8be2447

File tree

2 files changed

+94
-75
lines changed

2 files changed

+94
-75
lines changed

nats.go

Lines changed: 13 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -14,99 +14,37 @@ import (
1414

1515
var _ queue.Worker = (*Worker)(nil)
1616

17-
// Option for queue system
18-
type Option func(*Worker)
19-
2017
// Worker for NSQ
2118
type Worker struct {
22-
addr string
23-
subj string
24-
queue string
2519
client *nats.Conn
2620
stop chan struct{}
27-
stopOnce sync.Once
28-
runFunc func(context.Context, queue.QueuedMessage) error
29-
logger queue.Logger
3021
stopFlag int32
31-
metric queue.Metric
22+
stopOnce sync.Once
23+
opts options
3224
}
3325

3426
func (w *Worker) incBusyWorker() {
35-
w.metric.IncBusyWorker()
27+
w.opts.metric.IncBusyWorker()
3628
}
3729

3830
func (w *Worker) decBusyWorker() {
39-
w.metric.DecBusyWorker()
31+
w.opts.metric.DecBusyWorker()
4032
}
4133

4234
// BusyWorkers return count of busy workers currently.
4335
func (w *Worker) BusyWorkers() uint64 {
44-
return w.metric.BusyWorkers()
45-
}
46-
47-
// WithAddr setup the addr of NATS
48-
func WithAddr(addr string) Option {
49-
return func(w *Worker) {
50-
w.addr = "nats://" + addr
51-
}
52-
}
53-
54-
// WithSubj setup the subject of NATS
55-
func WithSubj(subj string) Option {
56-
return func(w *Worker) {
57-
w.subj = subj
58-
}
59-
}
60-
61-
// WithQueue setup the queue of NATS
62-
func WithQueue(queue string) Option {
63-
return func(w *Worker) {
64-
w.queue = queue
65-
}
66-
}
67-
68-
// WithRunFunc setup the run func of queue
69-
func WithRunFunc(fn func(context.Context, queue.QueuedMessage) error) Option {
70-
return func(w *Worker) {
71-
w.runFunc = fn
72-
}
73-
}
74-
75-
// WithLogger set custom logger
76-
func WithLogger(l queue.Logger) Option {
77-
return func(w *Worker) {
78-
w.logger = l
79-
}
80-
}
81-
82-
// WithMetric set custom Metric
83-
func WithMetric(m queue.Metric) Option {
84-
return func(w *Worker) {
85-
w.metric = m
86-
}
36+
return w.opts.metric.BusyWorkers()
8737
}
8838

8939
// NewWorker for struc
9040
func NewWorker(opts ...Option) *Worker {
9141
var err error
9242
w := &Worker{
93-
addr: "127.0.0.1:4222",
94-
subj: "foobar",
95-
queue: "foobar",
96-
stop: make(chan struct{}),
97-
runFunc: func(context.Context, queue.QueuedMessage) error {
98-
return nil
99-
},
100-
metric: queue.NewMetric(),
101-
}
102-
103-
// Loop through each option
104-
for _, opt := range opts {
105-
// Call the option giving the instantiated
106-
opt(w)
43+
opts: newOptions(opts...),
44+
stop: make(chan struct{}),
10745
}
10846

109-
w.client, err = nats.Connect(w.addr)
47+
w.client, err = nats.Connect(w.opts.addr)
11048
if err != nil {
11149
panic(err)
11250
}
@@ -146,7 +84,7 @@ func (w *Worker) handle(job queue.Job) error {
14684
}()
14785

14886
// run custom process function
149-
done <- w.runFunc(ctx, job)
87+
done <- w.opts.runFunc(ctx, job)
15088
}()
15189

15290
select {
@@ -177,7 +115,7 @@ func (w *Worker) handle(job queue.Job) error {
177115
func (w *Worker) Run() error {
178116
wg := &sync.WaitGroup{}
179117
panicChan := make(chan interface{}, 1)
180-
_, err := w.client.QueueSubscribe(w.subj, w.queue, func(m *nats.Msg) {
118+
_, err := w.client.QueueSubscribe(w.opts.subj, w.opts.queue, func(m *nats.Msg) {
181119
wg.Add(1)
182120
defer func() {
183121
wg.Done()
@@ -190,7 +128,7 @@ func (w *Worker) Run() error {
190128
_ = json.Unmarshal(m.Data, &data)
191129

192130
if err := w.handle(data); err != nil {
193-
w.logger.Error(err)
131+
w.opts.logger.Error(err)
194132
}
195133
})
196134
if err != nil {
@@ -201,7 +139,7 @@ func (w *Worker) Run() error {
201139
select {
202140
case <-w.stop:
203141
case err := <-panicChan:
204-
w.logger.Error(err)
142+
w.opts.logger.Error(err)
205143
}
206144

207145
// wait job completed
@@ -239,7 +177,7 @@ func (w *Worker) Queue(job queue.QueuedMessage) error {
239177
return queue.ErrQueueShutdown
240178
}
241179

242-
err := w.client.Publish(w.subj, job.Bytes())
180+
err := w.client.Publish(w.opts.subj, job.Bytes())
243181
if err != nil {
244182
return err
245183
}

options.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package nats
2+
3+
import (
4+
"context"
5+
6+
"github.com/golang-queue/queue"
7+
)
8+
9+
// Option for queue system
10+
type Option func(*options)
11+
12+
type options struct {
13+
runFunc func(context.Context, queue.QueuedMessage) error
14+
logger queue.Logger
15+
metric queue.Metric
16+
addr string
17+
subj string
18+
queue string
19+
}
20+
21+
// WithAddr setup the addr of NATS
22+
func WithAddr(addr string) Option {
23+
return func(w *options) {
24+
w.addr = "nats://" + addr
25+
}
26+
}
27+
28+
// WithSubj setup the subject of NATS
29+
func WithSubj(subj string) Option {
30+
return func(w *options) {
31+
w.subj = subj
32+
}
33+
}
34+
35+
// WithQueue setup the queue of NATS
36+
func WithQueue(queue string) Option {
37+
return func(w *options) {
38+
w.queue = queue
39+
}
40+
}
41+
42+
// WithRunFunc setup the run func of queue
43+
func WithRunFunc(fn func(context.Context, queue.QueuedMessage) error) Option {
44+
return func(w *options) {
45+
w.runFunc = fn
46+
}
47+
}
48+
49+
// WithLogger set custom logger
50+
func WithLogger(l queue.Logger) Option {
51+
return func(w *options) {
52+
w.logger = l
53+
}
54+
}
55+
56+
// WithMetric set custom Metric
57+
func WithMetric(m queue.Metric) Option {
58+
return func(w *options) {
59+
w.metric = m
60+
}
61+
}
62+
63+
func newOptions(opts ...Option) options {
64+
defaultOpts := options{
65+
addr: "127.0.0.1:4222",
66+
subj: "foobar",
67+
queue: "foobar",
68+
runFunc: func(context.Context, queue.QueuedMessage) error {
69+
return nil
70+
},
71+
metric: queue.NewMetric(),
72+
}
73+
74+
// Loop through each option
75+
for _, opt := range opts {
76+
// Call the option giving the instantiated
77+
opt(&defaultOpts)
78+
}
79+
80+
return defaultOpts
81+
}

0 commit comments

Comments
 (0)