Skip to content

Commit 7cb6615

Browse files
authored
chore: Goroutine leak detector to help avoid Goroutine leaks. (#6)
1 parent c2899cc commit 7cb6615

File tree

4 files changed

+40
-12
lines changed

4 files changed

+40
-12
lines changed

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.16
44

55
require (
66
github.com/golang-queue/queue v0.0.11
7-
github.com/nsqio/go-nsq v1.0.8
7+
github.com/nsqio/go-nsq v1.1.0
88
github.com/stretchr/testify v1.7.0
9+
go.uber.org/goleak v1.1.12
910
)

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
99
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
1010
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
1111
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
12-
github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk=
13-
github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
12+
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
13+
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
1414
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
1515
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
1616
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -21,6 +21,7 @@ go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
2121
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
2222
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
2323
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
24+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
2425
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
2526
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
2627
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@@ -40,6 +41,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
4041
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
4142
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
4243
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
44+
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
4345
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
4446
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
4547
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

nsq.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type Worker struct {
3434
stopFlag int32
3535
startFlag int32
3636
metric queue.Metric
37+
disable bool
3738
}
3839

3940
func (w *Worker) incBusyWorker() {
@@ -98,9 +99,14 @@ func WithMetric(m queue.Metric) Option {
9899
}
99100
}
100101

102+
func withDisable() Option {
103+
return func(w *Worker) {
104+
w.disable = true
105+
}
106+
}
107+
101108
// NewWorker for struc
102109
func NewWorker(opts ...Option) *Worker {
103-
var err error
104110
w := &Worker{
105111
addr: "127.0.0.1:4150",
106112
topic: "gorush",
@@ -120,6 +126,17 @@ func NewWorker(opts ...Option) *Worker {
120126
opt(w)
121127
}
122128

129+
w.startProducerAndConsumer()
130+
131+
return w
132+
}
133+
134+
func (w *Worker) startProducerAndConsumer() {
135+
if w.disable {
136+
return
137+
}
138+
139+
var err error
123140
cfg := nsq.NewConfig()
124141
cfg.MaxInFlight = w.maxInFlight
125142
w.q, err = nsq.NewConsumer(w.topic, w.channel, cfg)
@@ -131,8 +148,6 @@ func NewWorker(opts ...Option) *Worker {
131148
if err != nil {
132149
panic(err)
133150
}
134-
135-
return w
136151
}
137152

138153
// BeforeRun run script before start worker

nsq_test.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,15 @@ import (
1212
"github.com/golang-queue/queue"
1313

1414
"github.com/stretchr/testify/assert"
15+
"go.uber.org/goleak"
1516
)
1617

1718
var host = "127.0.0.1"
1819

20+
func TestMain(m *testing.M) {
21+
goleak.VerifyTestMain(m)
22+
}
23+
1924
type mockMessage struct {
2025
Message string
2126
}
@@ -109,7 +114,7 @@ func TestEnqueueJobAfterShutdown(t *testing.T) {
109114
)
110115
assert.NoError(t, err)
111116
q.Start()
112-
time.Sleep(50 * time.Millisecond)
117+
time.Sleep(400 * time.Millisecond)
113118
q.Shutdown()
114119
// can't queue task after shutdown
115120
err = q.Queue(m)
@@ -129,7 +134,7 @@ func TestWorkerNumAfterShutdown(t *testing.T) {
129134
assert.NoError(t, err)
130135
q.Start()
131136
q.Start()
132-
time.Sleep(100 * time.Millisecond)
137+
time.Sleep(400 * time.Millisecond)
133138
assert.Equal(t, 4, q.Workers())
134139
q.Shutdown()
135140
q.Wait()
@@ -170,7 +175,7 @@ func TestJobReachTimeout(t *testing.T) {
170175
)
171176
assert.NoError(t, err)
172177
q.Start()
173-
time.Sleep(50 * time.Millisecond)
178+
time.Sleep(400 * time.Millisecond)
174179
assert.NoError(t, q.QueueWithTimeout(20*time.Millisecond, m))
175180
time.Sleep(2 * time.Second)
176181
q.Shutdown()
@@ -208,7 +213,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
208213
)
209214
assert.NoError(t, err)
210215
q.Start()
211-
time.Sleep(50 * time.Millisecond)
216+
time.Sleep(400 * time.Millisecond)
212217
assert.NoError(t, q.QueueWithTimeout(3*time.Second, m))
213218
time.Sleep(2 * time.Second)
214219
q.Shutdown()
@@ -249,7 +254,7 @@ func TestGoroutineLeak(t *testing.T) {
249254
)
250255
assert.NoError(t, err)
251256
q.Start()
252-
time.Sleep(50 * time.Millisecond)
257+
time.Sleep(400 * time.Millisecond)
253258
for i := 0; i < 500; i++ {
254259
m.Message = fmt.Sprintf("foobar: %d", i+1)
255260
assert.NoError(t, q.Queue(m))
@@ -278,7 +283,7 @@ func TestGoroutinePanic(t *testing.T) {
278283
)
279284
assert.NoError(t, err)
280285
q.Start()
281-
time.Sleep(50 * time.Millisecond)
286+
time.Sleep(400 * time.Millisecond)
282287
assert.NoError(t, q.Queue(m))
283288
assert.NoError(t, q.Queue(m))
284289
time.Sleep(2 * time.Second)
@@ -297,6 +302,7 @@ func TestHandleTimeout(t *testing.T) {
297302
time.Sleep(200 * time.Millisecond)
298303
return nil
299304
}),
305+
withDisable(),
300306
)
301307

302308
err := w.handle(job)
@@ -313,6 +319,7 @@ func TestHandleTimeout(t *testing.T) {
313319
time.Sleep(200 * time.Millisecond)
314320
return nil
315321
}),
322+
withDisable(),
316323
)
317324

318325
done := make(chan error)
@@ -336,6 +343,7 @@ func TestJobComplete(t *testing.T) {
336343
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
337344
return errors.New("job completed")
338345
}),
346+
withDisable(),
339347
)
340348

341349
err := w.handle(job)
@@ -352,6 +360,7 @@ func TestJobComplete(t *testing.T) {
352360
time.Sleep(200 * time.Millisecond)
353361
return errors.New("job completed")
354362
}),
363+
withDisable(),
355364
)
356365

357366
done := make(chan error)
@@ -377,6 +386,7 @@ func TestBusyWorkerCount(t *testing.T) {
377386
time.Sleep(200 * time.Millisecond)
378387
return nil
379388
}),
389+
withDisable(),
380390
)
381391

382392
assert.Equal(t, uint64(0), w.BusyWorkers())

0 commit comments

Comments
 (0)