Skip to content

Commit 0e49665

Browse files
authored
trickle: Tighten up goroutine cleanup (#3808)
* trickle: Use goleak in unit tests as necessary. * trickle: Use base context when setting the seq in subscribers. This way subscriptions will still be properly terminated when the passed-in context is cancelled. * trickle: Close pending writers in the publisher as needed. This prevents them from hanging for a while which triggers goleak.
1 parent 586f1d6 commit 0e49665

File tree

4 files changed

+39
-8
lines changed

4 files changed

+39
-8
lines changed

trickle/local_subscriber_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ func TestLocalSubscriber_OverrunSeq(t *testing.T) {
1414

1515
pub, err := NewTricklePublisher(url)
1616
require.Nil(err)
17+
defer pub.Close()
1718

1819
sub := NewLocalSubscriber(server, "testest")
1920

trickle/trickle_publisher.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,15 @@ func (c *TricklePublisher) preconnect() (*pendingPost, error) {
126126
}
127127

128128
func (c *TricklePublisher) Close() error {
129+
130+
// Close any pending writers
131+
c.writeLock.Lock()
132+
pp := c.pendingPost
133+
if pp != nil {
134+
pp.writer.Close()
135+
}
136+
c.writeLock.Unlock()
137+
129138
req, err := http.NewRequest("DELETE", c.baseURL, nil)
130139
if err != nil {
131140
return err
@@ -207,6 +216,7 @@ func (p *pendingPost) Write(data io.Reader) (int64, error) {
207216
// before writing, check for error from preconnects
208217
select {
209218
case err := <-errCh:
219+
writer.Close()
210220
return 0, err
211221
default:
212222
// no error, continue
@@ -229,6 +239,7 @@ func (p *pendingPost) Write(data io.Reader) (int64, error) {
229239
// also prioritize errors over this channel compared to io errors
230240
// such as "read/write on closed pipe"
231241
if err := <-errCh; err != nil {
242+
writer.Close()
232243
return n, err
233244
}
234245

trickle/trickle_subscriber.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type TrickleSubscriber struct {
6060
url string
6161
mu sync.Mutex // Mutex to manage concurrent access
6262
pendingGet *http.Response // Pre-initialized GET request
63+
baseCtx context.Context // base context to use for the next context
6364
ctx context.Context // Parent context to use for pending GETs. This is bad
6465
cancelCtx func() // cancel the pending GET
6566
idx int // Segment index to request
@@ -91,6 +92,7 @@ func NewTrickleSubscriber(config TrickleSubscriberConfig) (*TrickleSubscriber, e
9192
return &TrickleSubscriber{
9293
client: httpClient(),
9394
url: config.URL,
95+
baseCtx: baseCtx,
9496
ctx: ctx,
9597
cancelCtx: cancel,
9698
idx: idx,
@@ -134,7 +136,7 @@ func (c *TrickleSubscriber) SetSeq(seq int) {
134136
c.mu.Lock()
135137
defer c.mu.Unlock()
136138
c.idx = seq
137-
c.ctx, c.cancelCtx = context.WithCancel(context.Background())
139+
c.ctx, c.cancelCtx = context.WithCancel(c.baseCtx)
138140
c.pendingGet = nil
139141
c.preconnectErrorCount = 0
140142
}

trickle/trickle_test.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414

1515
"github.com/stretchr/testify/require"
16+
"go.uber.org/goleak"
1617
)
1718

1819
func TestTrickle_Close(t *testing.T) {
@@ -23,15 +24,17 @@ func TestTrickle_Close(t *testing.T) {
2324
})
2425
stop := server.Start()
2526
ts := httptest.NewServer(mux)
27+
defer goleak.VerifyNone(t)
2628
defer ts.Close()
2729
defer stop()
2830

2931
channelURL := ts.URL + "/testest"
3032
pub, err := NewTricklePublisher(channelURL)
3133
require.Nil(err)
34+
defer pub.Close()
3235
require.Error(StreamNotFoundErr, pub.Write(bytes.NewReader([]byte("first post"))))
3336

34-
sub, err := NewTrickleSubscriber(subConfig(channelURL))
37+
sub, err := NewTrickleSubscriber(subConfig(t, channelURL))
3538
require.Nil(err)
3639
sub.SetSeq(0)
3740

@@ -50,6 +53,7 @@ func TestTrickle_Close(t *testing.T) {
5053
// now recreate pub, should be ok
5154
pub, err = NewTricklePublisher(channelURL)
5255
require.Nil(err)
56+
defer pub.Close()
5357

5458
// write two segments
5559
segs := []string{"first", "second"}
@@ -82,13 +86,15 @@ func TestTrickle_Close(t *testing.T) {
8286
require.Error(EOS, pub.Write(bytes.NewReader([]byte("invalid"))))
8387

8488
// Spinning up a second subscriber should return 404
85-
sub2, err := NewTrickleSubscriber(subConfig(channelURL))
89+
sub2, err := NewTrickleSubscriber(subConfig(t, channelURL))
8690
require.Nil(err)
8791
_, err = sub2.Read()
8892
require.Error(StreamNotFoundErr, err)
8993

9094
// Spinning up a second publisher should return 404
9195
pub2, err := NewTricklePublisher(channelURL)
96+
require.Nil(err)
97+
defer pub2.Close()
9298
require.Error(StreamNotFoundErr, pub2.Write(bytes.NewReader([]byte("bad post"))))
9399
}
94100

@@ -97,7 +103,8 @@ func TestTrickle_SetSeq(t *testing.T) {
97103

98104
pub, err := NewTricklePublisher(channelURL)
99105
require.Nil(err)
100-
sub, err := NewTrickleSubscriber(subConfig(channelURL))
106+
defer pub.Close()
107+
sub, err := NewTrickleSubscriber(subConfig(t, channelURL))
101108
require.Nil(err)
102109

103110
// give sub preconnect time to latch on
@@ -144,15 +151,17 @@ func TestTrickle_Reset(t *testing.T) {
144151
})
145152
stop := server.Start()
146153
ts := httptest.NewServer(mux)
154+
defer goleak.VerifyNone(t)
147155
defer ts.Close()
148156
defer stop()
149157

150158
channelURL := ts.URL + "/testest"
151159

152160
pub, err := NewTricklePublisher(channelURL)
153161
require.Nil(err)
162+
defer pub.Close()
154163

155-
sub, err := NewTrickleSubscriber(subConfig(channelURL))
164+
sub, err := NewTrickleSubscriber(subConfig(t, channelURL))
156165
require.Nil(err)
157166
wg := &sync.WaitGroup{}
158167

@@ -241,14 +250,15 @@ func TestTrickle_IdleSweep(t *testing.T) {
241250
})
242251
stop := server.Start()
243252
ts := httptest.NewServer(mux)
253+
defer goleak.VerifyNone(t)
244254
defer ts.Close()
245255
defer stop()
246256

247257
channelURL := ts.URL + "/testest"
248258
lp := NewLocalPublisher(server, channelURL, "text/plain")
249259
lp.CreateChannel()
250260

251-
sub, err := NewTrickleSubscriber(subConfig(channelURL))
261+
sub, err := NewTrickleSubscriber(subConfig(t, channelURL))
252262
require.Nil(err)
253263
_, err = sub.Read()
254264
require.ErrorIs(err, StreamNotFoundErr)
@@ -282,6 +292,7 @@ func TestTrickle_SetSubStart(t *testing.T) {
282292
// 1. Subscribe from the beginning
283293
subBeginning, err := NewTrickleSubscriber(TrickleSubscriberConfig{
284294
URL: url,
295+
Ctx: t.Context(),
285296
})
286297
require.Nil(err)
287298
wg.Add(1)
@@ -302,12 +313,15 @@ func TestTrickle_SetSubStart(t *testing.T) {
302313

303314
pub, err := NewTricklePublisher(url)
304315
require.Nil(err)
316+
defer pub.Close()
317+
305318
require.Nil(pub.Write(bytes.NewReader([]byte("zeroth"))))
306319
require.Nil(pub.Write(bytes.NewReader([]byte("first"))))
307320

308321
// 2. Subscribe from the current seq
309322
seq := Current
310323
subCurrent, err := NewTrickleSubscriber(TrickleSubscriberConfig{
324+
Ctx: t.Context(),
311325
URL: url,
312326
Start: &seq,
313327
})
@@ -329,6 +343,7 @@ func TestTrickle_SetSubStart(t *testing.T) {
329343
// 3. Subscribe from the next seq
330344
seq = Next
331345
subNext, err := NewTrickleSubscriber(TrickleSubscriberConfig{
346+
Ctx: t.Context(),
332347
URL: url,
333348
Start: &seq,
334349
})
@@ -356,6 +371,7 @@ func TestTrickle_SetSubStart(t *testing.T) {
356371
// 4. Subscribe from a specific seq
357372
seq = 1
358373
subSeq, err := NewTrickleSubscriber(TrickleSubscriberConfig{
374+
Ctx: t.Context(),
359375
URL: url,
360376
Start: &seq,
361377
})
@@ -395,6 +411,7 @@ func makeServerWithServer(t *testing.T) (*require.Assertions, string, *Server) {
395411
t.Cleanup(func() {
396412
stop()
397413
ts.Close()
414+
goleak.VerifyNone(t)
398415
})
399416

400417
// create the channel locally on the server
@@ -405,6 +422,6 @@ func makeServerWithServer(t *testing.T) (*require.Assertions, string, *Server) {
405422
return require, ts.URL + "/" + chanName, server
406423
}
407424

408-
func subConfig(url string) TrickleSubscriberConfig {
409-
return TrickleSubscriberConfig{URL: url}
425+
func subConfig(t *testing.T, url string) TrickleSubscriberConfig {
426+
return TrickleSubscriberConfig{URL: url, Ctx: t.Context()}
410427
}

0 commit comments

Comments
 (0)