Skip to content

Commit dd33b55

Browse files
authored
chore(worker): remove metrics and add new method (#7)
1 parent 65da231 commit dd33b55

File tree

5 files changed

+13
-80
lines changed

5 files changed

+13
-80
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/golang-queue/nats
33
go 1.16
44

55
require (
6-
github.com/golang-queue/queue v0.0.11
6+
github.com/golang-queue/queue v0.0.13-0.20220330060848-d1a0d31ce747
77
github.com/golang/protobuf v1.5.2 // indirect
88
github.com/nats-io/nats-server/v2 v2.7.1 // indirect
99
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
33
github.com/golang-queue/queue v0.0.11 h1:qaE7rOYUh+11NcO6U9zZ7NazglIwEp0j7Dn0sSoavcI=
44
github.com/golang-queue/queue v0.0.11/go.mod h1:ku8iyjYffqYY6Duts+xl+QYfN3/KDK4MEvXMZUkHyio=
5+
github.com/golang-queue/queue v0.0.13-0.20220330060848-d1a0d31ce747 h1:uNTbCoWORAcna89KcKgP22WFGv5fsij05e70DCnLrUU=
6+
github.com/golang-queue/queue v0.0.13-0.20220330060848-d1a0d31ce747/go.mod h1:ku8iyjYffqYY6Duts+xl+QYfN3/KDK4MEvXMZUkHyio=
57
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
68
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
79
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=

nats.go

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,6 @@ type Worker struct {
2323
opts options
2424
}
2525

26-
func (w *Worker) incBusyWorker() {
27-
w.opts.metric.IncBusyWorker()
28-
}
29-
30-
func (w *Worker) decBusyWorker() {
31-
w.opts.metric.DecBusyWorker()
32-
}
33-
34-
// BusyWorkers return count of busy workers currently.
35-
func (w *Worker) BusyWorkers() uint64 {
36-
return w.opts.metric.BusyWorkers()
37-
}
38-
3926
// NewWorker for struc
4027
func NewWorker(opts ...Option) *Worker {
4128
var err error
@@ -68,10 +55,8 @@ func (w *Worker) handle(job queue.Job) error {
6855
panicChan := make(chan interface{}, 1)
6956
startTime := time.Now()
7057
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
71-
w.incBusyWorker()
7258
defer func() {
7359
cancel()
74-
w.decBusyWorker()
7560
}()
7661

7762
// run the job
@@ -112,7 +97,7 @@ func (w *Worker) handle(job queue.Job) error {
11297
}
11398

11499
// Run start the worker
115-
func (w *Worker) Run() error {
100+
func (w *Worker) Run(task queue.QueuedMessage) error {
116101
wg := &sync.WaitGroup{}
117102
panicChan := make(chan interface{}, 1)
118103
_, err := w.client.QueueSubscribe(w.opts.subj, w.opts.queue, func(m *nats.Msg) {
@@ -184,3 +169,8 @@ func (w *Worker) Queue(job queue.QueuedMessage) error {
184169

185170
return nil
186171
}
172+
173+
// Request a new task
174+
func (w *Worker) Request() (queue.QueuedMessage, error) {
175+
return nil, nil
176+
}

nats_test.go

Lines changed: 4 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -124,27 +124,6 @@ func TestEnqueueJobAfterShutdown(t *testing.T) {
124124
q.Wait()
125125
}
126126

127-
func TestWorkerNumAfterShutdown(t *testing.T) {
128-
w := NewWorker(
129-
WithAddr(host + ":4222"),
130-
)
131-
q, err := queue.NewQueue(
132-
queue.WithWorker(w),
133-
queue.WithWorkerCount(2),
134-
)
135-
assert.NoError(t, err)
136-
q.Start()
137-
q.Start()
138-
time.Sleep(100 * time.Millisecond)
139-
assert.Equal(t, 4, q.Workers())
140-
q.Shutdown()
141-
q.Wait()
142-
assert.Equal(t, 0, q.Workers())
143-
q.Start()
144-
q.Start()
145-
assert.Equal(t, 0, q.Workers())
146-
}
147-
148127
func TestJobReachTimeout(t *testing.T) {
149128
m := mockMessage{
150129
Message: "foo",
@@ -297,7 +276,7 @@ func TestGoroutinePanic(t *testing.T) {
297276
func TestHandleTimeout(t *testing.T) {
298277
job := queue.Job{
299278
Timeout: 100 * time.Millisecond,
300-
Body: []byte("foo"),
279+
Payload: []byte("foo"),
301280
}
302281
w := NewWorker(
303282
WithAddr(host+":4222"),
@@ -314,7 +293,7 @@ func TestHandleTimeout(t *testing.T) {
314293

315294
job = queue.Job{
316295
Timeout: 150 * time.Millisecond,
317-
Body: []byte("foo"),
296+
Payload: []byte("foo"),
318297
}
319298

320299
w = NewWorker(
@@ -340,7 +319,7 @@ func TestHandleTimeout(t *testing.T) {
340319
func TestJobComplete(t *testing.T) {
341320
job := queue.Job{
342321
Timeout: 100 * time.Millisecond,
343-
Body: []byte("foo"),
322+
Payload: []byte("foo"),
344323
}
345324
w := NewWorker(
346325
WithAddr(host+":4222"),
@@ -356,7 +335,7 @@ func TestJobComplete(t *testing.T) {
356335

357336
job = queue.Job{
358337
Timeout: 250 * time.Millisecond,
359-
Body: []byte("foo"),
338+
Payload: []byte("foo"),
360339
}
361340

362341
w = NewWorker(
@@ -378,32 +357,3 @@ func TestJobComplete(t *testing.T) {
378357
assert.Error(t, err)
379358
assert.Equal(t, errors.New("job completed"), err)
380359
}
381-
382-
func TestBusyWorkerCount(t *testing.T) {
383-
job := queue.Job{
384-
Timeout: 500 * time.Millisecond,
385-
Body: []byte("foo"),
386-
}
387-
388-
w := NewWorker(
389-
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
390-
time.Sleep(200 * time.Millisecond)
391-
return nil
392-
}),
393-
)
394-
395-
assert.Equal(t, uint64(0), w.BusyWorkers())
396-
go func() {
397-
assert.NoError(t, w.handle(job))
398-
}()
399-
go func() {
400-
assert.NoError(t, w.handle(job))
401-
}()
402-
403-
time.Sleep(50 * time.Millisecond)
404-
assert.Equal(t, uint64(2), w.BusyWorkers())
405-
time.Sleep(200 * time.Millisecond)
406-
assert.Equal(t, uint64(0), w.BusyWorkers())
407-
408-
assert.NoError(t, w.Shutdown())
409-
}

options.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ type Option func(*options)
1212
type options struct {
1313
runFunc func(context.Context, queue.QueuedMessage) error
1414
logger queue.Logger
15-
metric queue.Metric
1615
addr string
1716
subj string
1817
queue string
@@ -53,13 +52,6 @@ func WithLogger(l queue.Logger) Option {
5352
}
5453
}
5554

56-
// WithMetric set custom Metric
57-
func WithMetric(m queue.Metric) Option {
58-
return func(w *options) {
59-
w.metric = m
60-
}
61-
}
62-
6355
func newOptions(opts ...Option) options {
6456
defaultOpts := options{
6557
addr: "127.0.0.1:4222",
@@ -68,7 +60,6 @@ func newOptions(opts ...Option) options {
6860
runFunc: func(context.Context, queue.QueuedMessage) error {
6961
return nil
7062
},
71-
metric: queue.NewMetric(),
7263
}
7364

7465
// Loop through each option

0 commit comments

Comments
 (0)