Skip to content

Commit 254dafb

Browse files
authored
feat(metrics): add nsq queue stats (#14)
1 parent 191576b commit 254dafb

File tree

4 files changed

+62
-7
lines changed

4 files changed

+62
-7
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/golang-queue/nsq
33
go 1.18
44

55
require (
6-
github.com/golang-queue/queue v0.0.13-0.20220330060848-d1a0d31ce747
6+
github.com/golang-queue/queue v0.0.13-0.20220403053215-9d5c5466dc9a
77
github.com/nsqio/go-nsq v1.1.0
88
github.com/stretchr/testify v1.7.0
99
go.uber.org/goleak v1.1.12

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
33
github.com/golang-queue/queue v0.0.13-0.20220330060848-d1a0d31ce747 h1:uNTbCoWORAcna89KcKgP22WFGv5fsij05e70DCnLrUU=
44
github.com/golang-queue/queue v0.0.13-0.20220330060848-d1a0d31ce747/go.mod h1:ku8iyjYffqYY6Duts+xl+QYfN3/KDK4MEvXMZUkHyio=
5+
github.com/golang-queue/queue v0.0.13-0.20220403053215-9d5c5466dc9a h1:XeBmQF8PCff5DwphR2eZPGKzVnT8/34/JK1IUcSoSpc=
6+
github.com/golang-queue/queue v0.0.13-0.20220403053215-9d5c5466dc9a/go.mod h1:ku8iyjYffqYY6Duts+xl+QYfN3/KDK4MEvXMZUkHyio=
57
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
68
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
79
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=

nsq.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,6 @@ func (w *Worker) startConsumer(cfg *nsq.Config) error {
6969
}
7070

7171
w.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
72-
// re-queue the job if worker has been shutdown.
73-
if atomic.LoadInt32(&w.stopFlag) == 1 {
74-
msg.Requeue(-1)
75-
return nil
76-
}
77-
7872
if len(msg.Body) == 0 {
7973
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
8074
// In this case, a message with an empty body is simply ignored/discarded.
@@ -86,6 +80,7 @@ func (w *Worker) startConsumer(cfg *nsq.Config) error {
8680
case <-w.stop:
8781
if msg != nil {
8882
// re-queue the job if worker has been shutdown.
83+
w.opts.logger.Info("re-queue the old job")
8984
msg.Requeue(-1)
9085
}
9186
}
@@ -219,3 +214,12 @@ loop:
219214

220215
return nil, queue.ErrNoTaskInQueue
221216
}
217+
218+
// Stats retrieves the current connection and message statistics for a Consumer
219+
func (w *Worker) Stats() *nsq.ConsumerStats {
220+
if w.q == nil {
221+
return nil
222+
}
223+
224+
return w.q.Stats()
225+
}

nsq_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,3 +347,52 @@ func TestJobComplete(t *testing.T) {
347347
assert.Error(t, err)
348348
assert.Equal(t, errors.New("job completed"), err)
349349
}
350+
351+
func TestNSQStatsinQueue(t *testing.T) {
352+
m := mockMessage{
353+
Message: "foo",
354+
}
355+
w := NewWorker(
356+
WithAddr(host+":4150"),
357+
WithTopic("nsq_stats"),
358+
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
359+
log.Println("get message")
360+
return nil
361+
}),
362+
)
363+
q, err := queue.NewQueue(
364+
queue.WithWorker(w),
365+
queue.WithWorkerCount(1),
366+
)
367+
assert.NoError(t, err)
368+
assert.NoError(t, q.Queue(m))
369+
assert.NoError(t, q.Queue(m))
370+
q.Start()
371+
assert.Equal(t, int(1), w.Stats().Connections)
372+
time.Sleep(500 * time.Millisecond)
373+
assert.Equal(t, uint64(2), w.Stats().MessagesReceived)
374+
assert.Equal(t, uint64(2), w.Stats().MessagesFinished)
375+
q.Release()
376+
assert.Equal(t, int(0), w.Stats().Connections)
377+
}
378+
379+
func TestNSQStatsInWorker(t *testing.T) {
380+
m := mockMessage{
381+
Message: "foo",
382+
}
383+
w := NewWorker(
384+
WithAddr(host+":4150"),
385+
WithTopic("nsq_stats_queue"),
386+
)
387+
388+
assert.NoError(t, w.Queue(m))
389+
assert.NoError(t, w.Queue(m))
390+
assert.NoError(t, w.Queue(m))
391+
assert.Equal(t, int(1), w.Stats().Connections)
392+
time.Sleep(300 * time.Millisecond)
393+
assert.Equal(t, uint64(3), w.Stats().MessagesReceived)
394+
assert.Equal(t, uint64(1), w.Stats().MessagesFinished)
395+
assert.Equal(t, uint64(0), w.Stats().MessagesRequeued)
396+
397+
_ = w.Shutdown()
398+
}

0 commit comments

Comments
 (0)