Skip to content

Commit 5cee079

Browse files
authored
feat(NSQ): update maxInFlight default value as 1 (#15)
1 parent 3f98429 commit 5cee079

File tree

10 files changed

+62
-31
lines changed

10 files changed

+62
-31
lines changed

.github/workflows/go.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
strategy:
2323
matrix:
2424
os: [ubuntu-latest]
25-
go: [1.13, 1.14, 1.15, 1.16, 1.17, 1.18]
25+
go: [1.17, 1.18]
2626
include:
2727
- os: ubuntu-latest
2828
go-build: ~/.cache/go-build

_example/server-client/client/main.go

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"github.com/golang-queue/nsq"
1010
"github.com/golang-queue/queue"
1111
"github.com/golang-queue/queue/core"
12+
13+
"github.com/appleboy/graceful"
1214
)
1315

1416
type job struct {
@@ -27,36 +29,54 @@ func main() {
2729
taskN := 10000
2830
rets := make(chan string, taskN)
2931

32+
m := graceful.NewManager()
33+
3034
// define the worker
3135
w := nsq.NewWorker(
3236
nsq.WithAddr("127.0.0.1:4150"),
3337
nsq.WithTopic("example"),
3438
nsq.WithChannel("foobar"),
35-
// concurrent job number
36-
nsq.WithMaxInFlight(10),
3739
nsq.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
3840
var v *job
3941
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
4042
return err
4143
}
4244
rets <- v.Message
43-
time.Sleep(6 * time.Second)
45+
fmt.Println("got message:", v.Message)
46+
fmt.Println("wait 10 seconds")
47+
time.Sleep(10 * time.Second)
4448
return nil
4549
}),
4650
)
4751

4852
// define the queue
4953
q := queue.NewPool(
50-
5,
54+
1,
5155
queue.WithWorker(w),
5256
)
5357

54-
// wait until all tasks done
55-
for i := 0; i < taskN; i++ {
56-
fmt.Println("message:", <-rets)
57-
time.Sleep(50 * time.Millisecond)
58-
}
58+
m.AddRunningJob(func(ctx context.Context) error {
59+
for {
60+
select {
61+
case <-ctx.Done():
62+
select {
63+
case m := <-rets:
64+
fmt.Println("message:", m)
65+
default:
66+
}
67+
return nil
68+
case m := <-rets:
69+
fmt.Println("message:", m)
70+
time.Sleep(50 * time.Millisecond)
71+
}
72+
}
73+
})
74+
75+
m.AddShutdownJob(func() error {
76+
// shutdown the service and notify all the worker
77+
q.Release()
78+
return nil
79+
})
5980

60-
// shutdown the service and notify all the worker
61-
q.Release()
81+
<-m.Done()
6282
}

_example/server-client/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ module example
33
go 1.18
44

55
require (
6+
github.com/appleboy/graceful v0.0.4
67
github.com/golang-queue/nsq v0.0.0-00010101000000-000000000000
7-
github.com/golang-queue/queue v0.0.13-0.20220408035349-ed24fa14aa00
8+
github.com/golang-queue/queue v0.0.13-0.20220420024737-03d90b78b732
89
)
910

1011
require (

_example/server-client/go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
github.com/appleboy/graceful v0.0.4 h1:Q4LCeq4DFy59qiACLtuH+mSqDERtUzwkQbCWpRaWwvQ=
2+
github.com/appleboy/graceful v0.0.4/go.mod h1:Q2mVx0t+N0lCDZc5MJudbcpTm6cgGM/J2gZCZIqD9dc=
13
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
24
github.com/golang-queue/queue v0.0.13-0.20220408035349-ed24fa14aa00 h1:EFiINOvAuGgxiE3MNu7PKY0O2Pvvc7r6YApdqAnhWnQ=
35
github.com/golang-queue/queue v0.0.13-0.20220408035349-ed24fa14aa00/go.mod h1:g1yxxDl8JMo4gUfxt11fjjU3SXU1ah61EvwshmDoSIs=
6+
github.com/golang-queue/queue v0.0.13-0.20220420024737-03d90b78b732 h1:Wqs2Dsl+jUCcGSniLP5o1lmDcGrRvujRCTyqFwHDrvw=
7+
github.com/golang-queue/queue v0.0.13-0.20220420024737-03d90b78b732/go.mod h1:g1yxxDl8JMo4gUfxt11fjjU3SXU1ah61EvwshmDoSIs=
48
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
59
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
610
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=

_example/server-client/server/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (j *job) Bytes() []byte {
2323
}
2424

2525
func main() {
26-
taskN := 100
26+
taskN := 5
2727

2828
// define the worker
2929
w := nsq.NewWorker(

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/golang-queue/nsq
33
go 1.18
44

55
require (
6-
github.com/golang-queue/queue v0.0.13-0.20220408035349-ed24fa14aa00
6+
github.com/golang-queue/queue v0.0.13-0.20220420024737-03d90b78b732
77
github.com/nsqio/go-nsq v1.1.0
88
github.com/stretchr/testify v1.7.1
99
go.uber.org/goleak v1.1.12

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3-
github.com/golang-queue/queue v0.0.13-0.20220408035349-ed24fa14aa00 h1:EFiINOvAuGgxiE3MNu7PKY0O2Pvvc7r6YApdqAnhWnQ=
4-
github.com/golang-queue/queue v0.0.13-0.20220408035349-ed24fa14aa00/go.mod h1:g1yxxDl8JMo4gUfxt11fjjU3SXU1ah61EvwshmDoSIs=
3+
github.com/golang-queue/queue v0.0.13-0.20220420024737-03d90b78b732 h1:Wqs2Dsl+jUCcGSniLP5o1lmDcGrRvujRCTyqFwHDrvw=
4+
github.com/golang-queue/queue v0.0.13-0.20220420024737-03d90b78b732/go.mod h1:g1yxxDl8JMo4gUfxt11fjjU3SXU1ah61EvwshmDoSIs=
55
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
66
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
77
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=

nsq.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func NewWorker(opts ...Option) *Worker {
3232
w := &Worker{
3333
opts: newOptions(opts...),
3434
stop: make(chan struct{}),
35-
tasks: make(chan *nsq.Message, 1),
35+
tasks: make(chan *nsq.Message),
3636
}
3737

3838
cfg := nsq.NewConfig()
@@ -76,13 +76,19 @@ func (w *Worker) startConsumer(cfg *nsq.Config) error {
7676
return nil
7777
}
7878

79-
select {
80-
case w.tasks <- msg:
81-
case <-w.stop:
82-
if msg != nil {
83-
// re-queue the job if worker has been shutdown.
84-
w.opts.logger.Info("re-queue the old job")
85-
msg.Requeue(-1)
79+
loop:
80+
for {
81+
select {
82+
case w.tasks <- msg:
83+
break loop
84+
case <-w.stop:
85+
if msg != nil {
86+
// re-queue the job if worker has been shutdown.
87+
msg.Requeue(-1)
88+
}
89+
break loop
90+
case <-time.After(2 * time.Second):
91+
msg.Touch()
8692
}
8793
}
8894

nsq_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -391,13 +391,14 @@ func TestNSQStatsInWorker(t *testing.T) {
391391
assert.NoError(t, w.Queue(m))
392392
assert.NoError(t, w.Queue(m))
393393
assert.Equal(t, int(1), w.Stats().Connections)
394-
assert.Equal(t, int(1), len(w.tasks))
395394

396-
time.Sleep(300 * time.Millisecond)
395+
time.Sleep(50 * time.Millisecond)
397396

398-
assert.Equal(t, uint64(3), w.Stats().MessagesReceived)
399-
assert.Equal(t, uint64(1), w.Stats().MessagesFinished)
397+
assert.Equal(t, uint64(1), w.Stats().MessagesReceived)
398+
assert.Equal(t, uint64(0), w.Stats().MessagesFinished)
400399
assert.Equal(t, uint64(0), w.Stats().MessagesRequeued)
401400

402401
_ = w.Shutdown()
402+
time.Sleep(50 * time.Millisecond)
403+
assert.Equal(t, uint64(1), w.Stats().MessagesRequeued)
403404
}

options.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package nsq
22

33
import (
44
"context"
5-
"runtime"
65

76
"github.com/golang-queue/queue"
87
"github.com/golang-queue/queue/core"
@@ -85,7 +84,7 @@ func newOptions(opts ...Option) Options {
8584
addr: "127.0.0.1:4150",
8685
topic: "gorush",
8786
channel: "ch",
88-
maxInFlight: runtime.NumCPU(),
87+
maxInFlight: 1,
8988

9089
logger: queue.NewLogger(),
9190
runFunc: func(context.Context, core.QueuedMessage) error {

0 commit comments

Comments
 (0)