Skip to content

Commit f27e7be

Browse files
authored
chore(metric): support busy workers count. (#3)
1 parent 2c17d14 commit f27e7be

File tree

4 files changed

+80
-34
lines changed

4 files changed

+80
-34
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/golang-queue/nats
33
go 1.16
44

55
require (
6-
github.com/golang-queue/queue v0.0.7
6+
github.com/golang-queue/queue v0.0.8-0.20210905085819-3cd1dfe014e2
77
github.com/golang/protobuf v1.5.2 // indirect
88
github.com/nats-io/nats-server/v2 v2.3.4 // indirect
99
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3-
github.com/golang-queue/queue v0.0.7 h1:WENCPyPBcIWYgBFSAZ8USGtwmxeCeMkhjwuyM0MAi84=
4-
github.com/golang-queue/queue v0.0.7/go.mod h1:JS5tYJacahCjafcplU5idNLX2vkYioqh6wEDX5o9Nms=
3+
github.com/golang-queue/queue v0.0.8-0.20210905085819-3cd1dfe014e2 h1:C2nn3ZHokfvJ787A/IiUDgweK5QZa5RGm/tfvhNfOtg=
4+
github.com/golang-queue/queue v0.0.8-0.20210905085819-3cd1dfe014e2/go.mod h1:JS5tYJacahCjafcplU5idNLX2vkYioqh6wEDX5o9Nms=
55
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
66
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
77
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=

nats.go

Lines changed: 48 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,28 @@ type Option func(*Worker)
1919

2020
// Worker for NSQ
2121
type Worker struct {
22-
addr string
23-
subj string
24-
queue string
25-
client *nats.Conn
26-
stop chan struct{}
27-
stopOnce sync.Once
28-
runFunc func(context.Context, queue.QueuedMessage) error
29-
logger queue.Logger
30-
stopFlag int32
22+
addr string
23+
subj string
24+
queue string
25+
client *nats.Conn
26+
stop chan struct{}
27+
stopOnce sync.Once
28+
runFunc func(context.Context, queue.QueuedMessage) error
29+
logger queue.Logger
30+
stopFlag int32
31+
busyWorkers uint64
32+
}
33+
34+
func (w *Worker) incBusyWorker() {
35+
atomic.AddUint64(&w.busyWorkers, 1)
36+
}
37+
38+
func (w *Worker) decBusyWorker() {
39+
atomic.AddUint64(&w.busyWorkers, ^uint64(0))
40+
}
41+
42+
func (w *Worker) BusyWorkers() uint64 {
43+
return atomic.LoadUint64(&w.busyWorkers)
3144
}
3245

3346
// WithAddr setup the addr of NATS
@@ -93,22 +106,26 @@ func NewWorker(opts ...Option) *Worker {
93106
}
94107

95108
// BeforeRun run script before start worker
96-
func (s *Worker) BeforeRun() error {
109+
func (w *Worker) BeforeRun() error {
97110
return nil
98111
}
99112

100113
// AfterRun run script after start worker
101-
func (s *Worker) AfterRun() error {
114+
func (w *Worker) AfterRun() error {
102115
return nil
103116
}
104117

105-
func (s *Worker) handle(job queue.Job) error {
118+
func (w *Worker) handle(job queue.Job) error {
106119
// create channel with buffer size 1 to avoid goroutine leak
107120
done := make(chan error, 1)
108121
panicChan := make(chan interface{}, 1)
109122
startTime := time.Now()
110123
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
111-
defer cancel()
124+
w.incBusyWorker()
125+
defer func() {
126+
cancel()
127+
w.decBusyWorker()
128+
}()
112129

113130
// run the job
114131
go func() {
@@ -120,15 +137,15 @@ func (s *Worker) handle(job queue.Job) error {
120137
}()
121138

122139
// run custom process function
123-
done <- s.runFunc(ctx, job)
140+
done <- w.runFunc(ctx, job)
124141
}()
125142

126143
select {
127144
case p := <-panicChan:
128145
panic(p)
129146
case <-ctx.Done(): // timeout reached
130147
return ctx.Err()
131-
case <-s.stop: // shutdown service
148+
case <-w.stop: // shutdown service
132149
// cancel job
133150
cancel()
134151

@@ -148,10 +165,10 @@ func (s *Worker) handle(job queue.Job) error {
148165
}
149166

150167
// Run start the worker
151-
func (s *Worker) Run() error {
168+
func (w *Worker) Run() error {
152169
wg := &sync.WaitGroup{}
153170
panicChan := make(chan interface{}, 1)
154-
_, err := s.client.QueueSubscribe(s.subj, s.queue, func(m *nats.Msg) {
171+
_, err := w.client.QueueSubscribe(w.subj, w.queue, func(m *nats.Msg) {
155172
wg.Add(1)
156173
defer func() {
157174
wg.Done()
@@ -163,8 +180,8 @@ func (s *Worker) Run() error {
163180
var data queue.Job
164181
_ = json.Unmarshal(m.Data, &data)
165182

166-
if err := s.handle(data); err != nil {
167-
s.logger.Error(err)
183+
if err := w.handle(data); err != nil {
184+
w.logger.Error(err)
168185
}
169186
})
170187
if err != nil {
@@ -173,9 +190,9 @@ func (s *Worker) Run() error {
173190

174191
// wait close signal
175192
select {
176-
case <-s.stop:
193+
case <-w.stop:
177194
case err := <-panicChan:
178-
s.logger.Error(err)
195+
w.logger.Error(err)
179196
}
180197

181198
// wait job completed
@@ -185,35 +202,35 @@ func (s *Worker) Run() error {
185202
}
186203

187204
// Shutdown worker
188-
func (s *Worker) Shutdown() error {
189-
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
205+
func (w *Worker) Shutdown() error {
206+
if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) {
190207
return queue.ErrQueueShutdown
191208
}
192209

193-
s.stopOnce.Do(func() {
194-
s.client.Close()
195-
close(s.stop)
210+
w.stopOnce.Do(func() {
211+
w.client.Close()
212+
close(w.stop)
196213
})
197214
return nil
198215
}
199216

200217
// Capacity for channel
201-
func (s *Worker) Capacity() int {
218+
func (w *Worker) Capacity() int {
202219
return 0
203220
}
204221

205222
// Usage for count of channel usage
206-
func (s *Worker) Usage() int {
223+
func (w *Worker) Usage() int {
207224
return 0
208225
}
209226

210227
// Queue send notification to queue
211-
func (s *Worker) Queue(job queue.QueuedMessage) error {
212-
if atomic.LoadInt32(&s.stopFlag) == 1 {
228+
func (w *Worker) Queue(job queue.QueuedMessage) error {
229+
if atomic.LoadInt32(&w.stopFlag) == 1 {
213230
return queue.ErrQueueShutdown
214231
}
215232

216-
err := s.client.Publish(s.subj, job.Bytes())
233+
err := w.client.Publish(w.subj, job.Bytes())
217234
if err != nil {
218235
return err
219236
}

nats_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,3 +371,32 @@ func TestJobComplete(t *testing.T) {
371371
assert.Error(t, err)
372372
assert.Equal(t, errors.New("job completed"), err)
373373
}
374+
375+
func TestBusyWorkerCount(t *testing.T) {
376+
job := queue.Job{
377+
Timeout: 500 * time.Millisecond,
378+
Body: []byte("foo"),
379+
}
380+
381+
w := NewWorker(
382+
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
383+
time.Sleep(200 * time.Millisecond)
384+
return nil
385+
}),
386+
)
387+
388+
assert.Equal(t, uint64(0), w.BusyWorkers())
389+
go func() {
390+
assert.NoError(t, w.handle(job))
391+
}()
392+
go func() {
393+
assert.NoError(t, w.handle(job))
394+
}()
395+
396+
time.Sleep(50 * time.Millisecond)
397+
assert.Equal(t, uint64(2), w.BusyWorkers())
398+
time.Sleep(200 * time.Millisecond)
399+
assert.Equal(t, uint64(0), w.BusyWorkers())
400+
401+
assert.NoError(t, w.Shutdown())
402+
}

0 commit comments

Comments
 (0)