Skip to content

Commit 98deb95

Browse files
committed
TUN-5842: Fix flaky TestConcurrentUpdateAndRead by making sure resources are released
1 parent c0f85ab commit 98deb95

File tree

1 file changed

+41
-22
lines changed

1 file changed

+41
-22
lines changed

orchestration/orchestrator_test.go

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,10 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
249249
}
250250
)
251251

252-
orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, &testLogger)
252+
ctx, cancel := context.WithCancel(context.Background())
253+
defer cancel()
254+
255+
orchestrator, err := NewOrchestrator(ctx, initConfig, testTags, &testLogger)
253256
require.NoError(t, err)
254257

255258
updateWithValidation(t, orchestrator, 1, configJSONV1)
@@ -265,8 +268,9 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
265268
wg.Add(1)
266269
go func(i int, originProxy connection.OriginProxy) {
267270
defer wg.Done()
268-
resp, err := proxyHTTP(t, originProxy, hostname)
269-
require.NoError(t, err)
271+
resp, err := proxyHTTP(originProxy, hostname)
272+
require.NoError(t, err, "proxyHTTP %d failed %v", i, err)
273+
defer resp.Body.Close()
270274

271275
var warpRoutingDisabled bool
272276
// The response can be from initOrigin, http_status:204 or http_status:418
@@ -290,7 +294,7 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
290294
// Once we have originProxy, it won't be changed by configuration updates.
291295
// We can infer the version by the ProxyHTTP response code
292296
pr, pw := io.Pipe()
293-
// concurrentRespWriter makes sure ResponseRecorder is not read/write concurrently, and read waits for the first write
297+
294298
w := newRespReadWriteFlusher()
295299

296300
// Write TCP message and make sure it's echo back. This has to be done in a go routune since ProxyTCP doesn't
@@ -303,7 +307,14 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
303307
tcpEyeball(t, pw, tcpBody, w)
304308
}()
305309
}
306-
proxyTCP(t, originProxy, tcpOrigin.Addr().String(), w, pr, warpRoutingDisabled)
310+
311+
err = proxyTCP(ctx, originProxy, tcpOrigin.Addr().String(), w, pr)
312+
if warpRoutingDisabled {
313+
require.Error(t, err, "expect proxyTCP %d to return error", i)
314+
} else {
315+
require.NoError(t, err, "proxyTCP %d failed %v", i, err)
316+
}
317+
307318
}(i, originProxy)
308319

309320
if i == concurrentRequests/4 {
@@ -319,6 +330,7 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
319330
wg.Add(1)
320331
go func() {
321332
defer wg.Done()
333+
// Makes sure v2 is applied before v3
322334
<-appliedV2
323335
updateWithValidation(t, orchestrator, 3, configJSONV3)
324336
}()
@@ -328,14 +340,18 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
328340
wg.Wait()
329341
}
330342

331-
func proxyHTTP(t *testing.T, originProxy connection.OriginProxy, hostname string) (*http.Response, error) {
343+
func proxyHTTP(originProxy connection.OriginProxy, hostname string) (*http.Response, error) {
332344
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s", hostname), nil)
333-
require.NoError(t, err)
345+
if err != nil {
346+
return nil, err
347+
}
334348

335349
w := httptest.NewRecorder()
336350
log := zerolog.Nop()
337351
respWriter, err := connection.NewHTTP2RespWriter(req, w, connection.TypeHTTP, &log)
338-
require.NoError(t, err)
352+
if err != nil {
353+
return nil, err
354+
}
339355

340356
err = originProxy.ProxyHTTP(respWriter, req, false)
341357
if err != nil {
@@ -356,26 +372,26 @@ func tcpEyeball(t *testing.T, reqWriter io.WriteCloser, body string, respReadWri
356372
require.Equal(t, writeN, n)
357373
}
358374

359-
func proxyTCP(t *testing.T, originProxy connection.OriginProxy, originAddr string, w http.ResponseWriter, reqBody io.ReadCloser, expectErr bool) {
375+
func proxyTCP(ctx context.Context, originProxy connection.OriginProxy, originAddr string, w http.ResponseWriter, reqBody io.ReadCloser) error {
360376
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s", originAddr), reqBody)
361-
require.NoError(t, err)
377+
if err != nil {
378+
return err
379+
}
362380

363381
log := zerolog.Nop()
364382
respWriter, err := connection.NewHTTP2RespWriter(req, w, connection.TypeTCP, &log)
365-
require.NoError(t, err)
383+
if err != nil {
384+
return err
385+
}
366386

367387
tcpReq := &connection.TCPRequest{
368388
Dest: originAddr,
369389
CFRay: "123",
370390
LBProbe: false,
371391
}
372392
rws := connection.NewHTTPResponseReadWriterAcker(respWriter, req)
373-
if expectErr {
374-
require.Error(t, originProxy.ProxyTCP(context.Background(), rws, tcpReq))
375-
return
376-
}
377393

378-
require.NoError(t, originProxy.ProxyTCP(context.Background(), rws, tcpReq))
394+
return originProxy.ProxyTCP(ctx, rws, tcpReq)
379395
}
380396

381397
func serveTCPOrigin(t *testing.T, tcpOrigin net.Listener, wg *sync.WaitGroup) {
@@ -471,20 +487,20 @@ func TestClosePreviousProxies(t *testing.T) {
471487

472488
originProxyV1, err := orchestrator.GetOriginProxy()
473489
require.NoError(t, err)
474-
resp, err := proxyHTTP(t, originProxyV1, hostname)
490+
resp, err := proxyHTTP(originProxyV1, hostname)
475491
require.NoError(t, err)
476492
require.Equal(t, http.StatusOK, resp.StatusCode)
477493

478494
updateWithValidation(t, orchestrator, 2, configTeapot)
479495

480496
originProxyV2, err := orchestrator.GetOriginProxy()
481497
require.NoError(t, err)
482-
resp, err = proxyHTTP(t, originProxyV2, hostname)
498+
resp, err = proxyHTTP(originProxyV2, hostname)
483499
require.NoError(t, err)
484500
require.Equal(t, http.StatusTeapot, resp.StatusCode)
485501

486502
// The hello-world server in config v1 should have been stopped
487-
resp, err = proxyHTTP(t, originProxyV1, hostname)
503+
resp, err = proxyHTTP(originProxyV1, hostname)
488504
require.Error(t, err)
489505
require.Nil(t, resp)
490506

@@ -495,7 +511,7 @@ func TestClosePreviousProxies(t *testing.T) {
495511
require.NoError(t, err)
496512
require.NotEqual(t, originProxyV1, originProxyV3)
497513

498-
resp, err = proxyHTTP(t, originProxyV3, hostname)
514+
resp, err = proxyHTTP(originProxyV3, hostname)
499515
require.NoError(t, err)
500516
require.Equal(t, http.StatusOK, resp.StatusCode)
501517

@@ -504,7 +520,7 @@ func TestClosePreviousProxies(t *testing.T) {
504520
// Wait for proxies to shutdown
505521
time.Sleep(time.Millisecond * 10)
506522

507-
resp, err = proxyHTTP(t, originProxyV3, hostname)
523+
resp, err = proxyHTTP(originProxyV3, hostname)
508524
require.Error(t, err)
509525
require.Nil(t, resp)
510526
}
@@ -553,6 +569,9 @@ func TestPersistentConnection(t *testing.T) {
553569
tcpReqReader, tcpReqWriter := io.Pipe()
554570
tcpRespReadWriter := newRespReadWriteFlusher()
555571

572+
ctx, cancel := context.WithCancel(context.Background())
573+
defer cancel()
574+
556575
var wg sync.WaitGroup
557576
wg.Add(3)
558577
// Start TCP origin
@@ -570,7 +589,7 @@ func TestPersistentConnection(t *testing.T) {
570589
// Simulate cloudflared recieving a TCP connection
571590
go func() {
572591
defer wg.Done()
573-
proxyTCP(t, originProxy, tcpOrigin.Addr().String(), tcpRespReadWriter, tcpReqReader, false)
592+
require.NoError(t, proxyTCP(ctx, originProxy, tcpOrigin.Addr().String(), tcpRespReadWriter, tcpReqReader))
574593
}()
575594
// Simulate cloudflared recieving a WS connection
576595
go func() {

0 commit comments

Comments
 (0)