Skip to content

Commit 9244a52

Browse files
authored
chore(metric): support busy workers count. (#3)
1 parent 0c45d1f commit 9244a52

File tree

4 files changed

+75
-29
lines changed

4 files changed

+75
-29
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/golang-queue/nsq
33
go 1.16
44

55
require (
6-
github.com/golang-queue/queue v0.0.7
6+
github.com/golang-queue/queue v0.0.8-0.20210905085819-3cd1dfe014e2
77
github.com/nsqio/go-nsq v1.0.8
88
github.com/stretchr/testify v1.7.0
99
)

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3-
github.com/golang-queue/queue v0.0.7 h1:WENCPyPBcIWYgBFSAZ8USGtwmxeCeMkhjwuyM0MAi84=
4-
github.com/golang-queue/queue v0.0.7/go.mod h1:JS5tYJacahCjafcplU5idNLX2vkYioqh6wEDX5o9Nms=
3+
github.com/golang-queue/queue v0.0.8-0.20210905085819-3cd1dfe014e2 h1:C2nn3ZHokfvJ787A/IiUDgweK5QZa5RGm/tfvhNfOtg=
4+
github.com/golang-queue/queue v0.0.8-0.20210905085819-3cd1dfe014e2/go.mod h1:JS5tYJacahCjafcplU5idNLX2vkYioqh6wEDX5o9Nms=
55
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
66
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
77
github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk=

nsq.go

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,19 @@ type Worker struct {
3333
logger queue.Logger
3434
stopFlag int32
3535
startFlag int32
36+
busyWorkers uint64
37+
}
38+
39+
func (w *Worker) incBusyWorker() {
40+
atomic.AddUint64(&w.busyWorkers, 1)
41+
}
42+
43+
func (w *Worker) decBusyWorker() {
44+
atomic.AddUint64(&w.busyWorkers, ^uint64(0))
45+
}
46+
47+
func (w *Worker) BusyWorkers() uint64 {
48+
return atomic.LoadUint64(&w.busyWorkers)
3649
}
3750

3851
// WithAddr setup the addr of NSQ
@@ -114,32 +127,36 @@ func NewWorker(opts ...Option) *Worker {
114127
}
115128

116129
// BeforeRun run script before start worker
117-
func (s *Worker) BeforeRun() error {
130+
func (w *Worker) BeforeRun() error {
118131
return nil
119132
}
120133

121134
// AfterRun run script after start worker
122-
func (s *Worker) AfterRun() error {
123-
s.startOnce.Do(func() {
135+
func (w *Worker) AfterRun() error {
136+
w.startOnce.Do(func() {
124137
time.Sleep(100 * time.Millisecond)
125-
err := s.q.ConnectToNSQD(s.addr)
138+
err := w.q.ConnectToNSQD(w.addr)
126139
if err != nil {
127140
panic("Could not connect nsq server: " + err.Error())
128141
}
129142

130-
atomic.CompareAndSwapInt32(&s.startFlag, 0, 1)
143+
atomic.CompareAndSwapInt32(&w.startFlag, 0, 1)
131144
})
132145

133146
return nil
134147
}
135148

136-
func (s *Worker) handle(job queue.Job) error {
149+
func (w *Worker) handle(job queue.Job) error {
137150
// create channel with buffer size 1 to avoid goroutine leak
138151
done := make(chan error, 1)
139152
panicChan := make(chan interface{}, 1)
140153
startTime := time.Now()
141154
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
142-
defer cancel()
155+
w.incBusyWorker()
156+
defer func() {
157+
cancel()
158+
w.decBusyWorker()
159+
}()
143160

144161
// run the job
145162
go func() {
@@ -151,15 +168,15 @@ func (s *Worker) handle(job queue.Job) error {
151168
}()
152169

153170
// run custom process function
154-
done <- s.runFunc(ctx, job)
171+
done <- w.runFunc(ctx, job)
155172
}()
156173

157174
select {
158175
case p := <-panicChan:
159176
panic(p)
160177
case <-ctx.Done(): // timeout reached
161178
return ctx.Err()
162-
case <-s.stop: // shutdown service
179+
case <-w.stop: // shutdown service
163180
// cancel job
164181
cancel()
165182

@@ -179,10 +196,10 @@ func (s *Worker) handle(job queue.Job) error {
179196
}
180197

181198
// Run start the worker
182-
func (s *Worker) Run() error {
199+
func (w *Worker) Run() error {
183200
wg := &sync.WaitGroup{}
184201
panicChan := make(chan interface{}, 1)
185-
s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
202+
w.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
186203
wg.Add(1)
187204
defer func() {
188205
wg.Done()
@@ -198,14 +215,14 @@ func (s *Worker) Run() error {
198215

199216
var data queue.Job
200217
_ = json.Unmarshal(msg.Body, &data)
201-
return s.handle(data)
218+
return w.handle(data)
202219
}))
203220

204221
// wait close signal
205222
select {
206-
case <-s.stop:
223+
case <-w.stop:
207224
case err := <-panicChan:
208-
s.logger.Error(err)
225+
w.logger.Error(err)
209226
}
210227

211228
// wait job completed
@@ -215,39 +232,39 @@ func (s *Worker) Run() error {
215232
}
216233

217234
// Shutdown worker
218-
func (s *Worker) Shutdown() error {
219-
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
235+
func (w *Worker) Shutdown() error {
236+
if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) {
220237
return queue.ErrQueueShutdown
221238
}
222239

223-
s.stopOnce.Do(func() {
224-
if atomic.LoadInt32(&s.startFlag) == 1 {
225-
s.q.Stop()
226-
s.p.Stop()
240+
w.stopOnce.Do(func() {
241+
if atomic.LoadInt32(&w.startFlag) == 1 {
242+
w.q.Stop()
243+
w.p.Stop()
227244
}
228245

229-
close(s.stop)
246+
close(w.stop)
230247
})
231248
return nil
232249
}
233250

234251
// Capacity for channel
235-
func (s *Worker) Capacity() int {
252+
func (w *Worker) Capacity() int {
236253
return 0
237254
}
238255

239256
// Usage for count of channel usage
240-
func (s *Worker) Usage() int {
257+
func (w *Worker) Usage() int {
241258
return 0
242259
}
243260

244261
// Queue send notification to queue
245-
func (s *Worker) Queue(job queue.QueuedMessage) error {
246-
if atomic.LoadInt32(&s.stopFlag) == 1 {
262+
func (w *Worker) Queue(job queue.QueuedMessage) error {
263+
if atomic.LoadInt32(&w.stopFlag) == 1 {
247264
return queue.ErrQueueShutdown
248265
}
249266

250-
err := s.p.Publish(s.topic, job.Bytes())
267+
err := w.p.Publish(w.topic, job.Bytes())
251268
if err != nil {
252269
return err
253270
}

nsq_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,3 +365,32 @@ func TestJobComplete(t *testing.T) {
365365
assert.Error(t, err)
366366
assert.Equal(t, errors.New("job completed"), err)
367367
}
368+
369+
func TestBusyWorkerCount(t *testing.T) {
370+
job := queue.Job{
371+
Timeout: 500 * time.Millisecond,
372+
Body: []byte("foo"),
373+
}
374+
375+
w := NewWorker(
376+
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
377+
time.Sleep(200 * time.Millisecond)
378+
return nil
379+
}),
380+
)
381+
382+
assert.Equal(t, uint64(0), w.BusyWorkers())
383+
go func() {
384+
assert.NoError(t, w.handle(job))
385+
}()
386+
go func() {
387+
assert.NoError(t, w.handle(job))
388+
}()
389+
390+
time.Sleep(50 * time.Millisecond)
391+
assert.Equal(t, uint64(2), w.BusyWorkers())
392+
time.Sleep(200 * time.Millisecond)
393+
assert.Equal(t, uint64(0), w.BusyWorkers())
394+
395+
assert.NoError(t, w.Shutdown())
396+
}

0 commit comments

Comments
 (0)