Skip to content

Commit 8cbabf0

Browse files
authored
chore(options): add new options. (#7)
1 parent 7cb6615 commit 8cbabf0

File tree

2 files changed

+104
-88
lines changed

2 files changed

+104
-88
lines changed

nsq.go

Lines changed: 28 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -15,105 +15,40 @@ import (
1515

1616
var _ queue.Worker = (*Worker)(nil)
1717

18-
// Option for queue system
19-
type Option func(*Worker)
20-
2118
// Worker for NSQ
2219
type Worker struct {
23-
q *nsq.Consumer
24-
p *nsq.Producer
25-
startOnce sync.Once
26-
stopOnce sync.Once
27-
stop chan struct{}
28-
maxInFlight int
29-
addr string
30-
topic string
31-
channel string
32-
runFunc func(context.Context, queue.QueuedMessage) error
33-
logger queue.Logger
34-
stopFlag int32
35-
startFlag int32
36-
metric queue.Metric
37-
disable bool
20+
q *nsq.Consumer
21+
p *nsq.Producer
22+
startOnce sync.Once
23+
stopOnce sync.Once
24+
stop chan struct{}
25+
stopFlag int32
26+
startFlag int32
27+
opts options
3828
}
3929

4030
func (w *Worker) incBusyWorker() {
41-
w.metric.IncBusyWorker()
31+
w.opts.metric.IncBusyWorker()
4232
}
4333

4434
func (w *Worker) decBusyWorker() {
45-
w.metric.DecBusyWorker()
35+
w.opts.metric.DecBusyWorker()
4636
}
4737

4838
// BusyWorkers return count of busy workers currently.
4939
func (w *Worker) BusyWorkers() uint64 {
50-
return w.metric.BusyWorkers()
51-
}
52-
53-
// WithAddr setup the addr of NSQ
54-
func WithAddr(addr string) Option {
55-
return func(w *Worker) {
56-
w.addr = addr
57-
}
58-
}
59-
60-
// WithTopic setup the topic of NSQ
61-
func WithTopic(topic string) Option {
62-
return func(w *Worker) {
63-
w.topic = topic
64-
}
65-
}
66-
67-
// WithChannel setup the channel of NSQ
68-
func WithChannel(channel string) Option {
69-
return func(w *Worker) {
70-
w.channel = channel
71-
}
72-
}
73-
74-
// WithRunFunc setup the run func of queue
75-
func WithRunFunc(fn func(context.Context, queue.QueuedMessage) error) Option {
76-
return func(w *Worker) {
77-
w.runFunc = fn
78-
}
79-
}
80-
81-
// WithMaxInFlight Maximum number of messages to allow in flight (concurrency knob)
82-
func WithMaxInFlight(num int) Option {
83-
return func(w *Worker) {
84-
w.maxInFlight = num
85-
}
86-
}
87-
88-
// WithLogger set custom logger
89-
func WithLogger(l queue.Logger) Option {
90-
return func(w *Worker) {
91-
w.logger = l
92-
}
93-
}
94-
95-
// WithMetric set custom Metric
96-
func WithMetric(m queue.Metric) Option {
97-
return func(w *Worker) {
98-
w.metric = m
99-
}
100-
}
101-
102-
func withDisable() Option {
103-
return func(w *Worker) {
104-
w.disable = true
105-
}
40+
return w.opts.metric.BusyWorkers()
10641
}
10742

10843
// NewWorker for struc
10944
func NewWorker(opts ...Option) *Worker {
110-
w := &Worker{
45+
defaultOpts := options{
11146
addr: "127.0.0.1:4150",
11247
topic: "gorush",
11348
channel: "ch",
11449
maxInFlight: runtime.NumCPU(),
115-
stop: make(chan struct{}),
116-
logger: queue.NewLogger(),
50+
51+
logger: queue.NewLogger(),
11752
runFunc: func(context.Context, queue.QueuedMessage) error {
11853
return nil
11954
},
@@ -123,7 +58,12 @@ func NewWorker(opts ...Option) *Worker {
12358
// Loop through each option
12459
for _, opt := range opts {
12560
// Call the option giving the instantiated
126-
opt(w)
61+
opt(&defaultOpts)
62+
}
63+
64+
w := &Worker{
65+
opts: defaultOpts,
66+
stop: make(chan struct{}),
12767
}
12868

12969
w.startProducerAndConsumer()
@@ -132,19 +72,19 @@ func NewWorker(opts ...Option) *Worker {
13272
}
13373

13474
func (w *Worker) startProducerAndConsumer() {
135-
if w.disable {
75+
if w.opts.disable {
13676
return
13777
}
13878

13979
var err error
14080
cfg := nsq.NewConfig()
141-
cfg.MaxInFlight = w.maxInFlight
142-
w.q, err = nsq.NewConsumer(w.topic, w.channel, cfg)
81+
cfg.MaxInFlight = w.opts.maxInFlight
82+
w.q, err = nsq.NewConsumer(w.opts.topic, w.opts.channel, cfg)
14383
if err != nil {
14484
panic(err)
14585
}
14686

147-
w.p, err = nsq.NewProducer(w.addr, cfg)
87+
w.p, err = nsq.NewProducer(w.opts.addr, cfg)
14888
if err != nil {
14989
panic(err)
15090
}
@@ -159,7 +99,7 @@ func (w *Worker) BeforeRun() error {
15999
func (w *Worker) AfterRun() error {
160100
w.startOnce.Do(func() {
161101
time.Sleep(100 * time.Millisecond)
162-
err := w.q.ConnectToNSQD(w.addr)
102+
err := w.q.ConnectToNSQD(w.opts.addr)
163103
if err != nil {
164104
panic("Could not connect nsq server: " + err.Error())
165105
}
@@ -192,7 +132,7 @@ func (w *Worker) handle(job queue.Job) error {
192132
}()
193133

194134
// run custom process function
195-
done <- w.runFunc(ctx, job)
135+
done <- w.opts.runFunc(ctx, job)
196136
}()
197137

198138
select {
@@ -253,7 +193,7 @@ func (w *Worker) Run() error {
253193
select {
254194
case <-w.stop:
255195
case err := <-panicChan:
256-
w.logger.Error(err)
196+
w.opts.logger.Error(err)
257197
}
258198

259199
// wait job completed
@@ -297,7 +237,7 @@ func (w *Worker) Queue(job queue.QueuedMessage) error {
297237
return queue.ErrQueueShutdown
298238
}
299239

300-
err := w.p.Publish(w.topic, job.Bytes())
240+
err := w.p.Publish(w.opts.topic, job.Bytes())
301241
if err != nil {
302242
return err
303243
}

options.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package nsq
2+
3+
import (
4+
"context"
5+
6+
"github.com/golang-queue/queue"
7+
)
8+
9+
// Option for queue system
10+
type Option func(*options)
11+
12+
type options struct {
13+
maxInFlight int
14+
addr string
15+
topic string
16+
channel string
17+
runFunc func(context.Context, queue.QueuedMessage) error
18+
logger queue.Logger
19+
metric queue.Metric
20+
disable bool
21+
}
22+
23+
// WithAddr setup the addr of NSQ
24+
func WithAddr(addr string) Option {
25+
return func(w *options) {
26+
w.addr = addr
27+
}
28+
}
29+
30+
// WithTopic setup the topic of NSQ
31+
func WithTopic(topic string) Option {
32+
return func(w *options) {
33+
w.topic = topic
34+
}
35+
}
36+
37+
// WithChannel setup the channel of NSQ
38+
func WithChannel(channel string) Option {
39+
return func(w *options) {
40+
w.channel = channel
41+
}
42+
}
43+
44+
// WithRunFunc setup the run func of queue
45+
func WithRunFunc(fn func(context.Context, queue.QueuedMessage) error) Option {
46+
return func(w *options) {
47+
w.runFunc = fn
48+
}
49+
}
50+
51+
// WithMaxInFlight Maximum number of messages to allow in flight (concurrency knob)
52+
func WithMaxInFlight(num int) Option {
53+
return func(w *options) {
54+
w.maxInFlight = num
55+
}
56+
}
57+
58+
// WithLogger set custom logger
59+
func WithLogger(l queue.Logger) Option {
60+
return func(w *options) {
61+
w.logger = l
62+
}
63+
}
64+
65+
// WithMetric set custom Metric
66+
func WithMetric(m queue.Metric) Option {
67+
return func(w *options) {
68+
w.metric = m
69+
}
70+
}
71+
72+
func withDisable() Option {
73+
return func(w *options) {
74+
w.disable = true
75+
}
76+
}

0 commit comments

Comments
 (0)