Skip to content

Commit f9cd72a

Browse files
jmacdmwear
andauthored
[otelarrowreceiver] New shutdown testing (#34236)
**Description:** Several test flakes were identified in #34179. This improves the tests and adds one new test. Adds one new context-canceled return path, fixing a potential goroutine leak covered by new testing. **Link to tracking Issue:** Fixes open-telemetry/otel-arrow#237 **Testing:** - For the test change in `receiver/otelarrowreceiver/internal/arrow/arrow_test.go` note that the test now allows out-of-order responses, which is explicitly a feature. Out-of-order responses may have resulted from performance changes in Arrow v17. - For the context handling in `receiver/otelarrowreceiver/internal/arrow/arrow.go` this is needed to satisfy the HalfOpen test introduced here. A comment in the code explains how previously gRPC stream resources could leak - The ` TestOTelArrowShutdown` test improves readability. It had been using a test helper designed originally for the OTLP (non-streaming) test, which made it convoluted here. A sync.Once has been eliminated. - The new test `TestOTelArrowHalfOpenShutdown` exercises the "adversarial" condition where a client never calls `Recv()` on their stream handle. The new context handling is required to pass the test w/o goroutine leaks. --------- Co-authored-by: Matthew Wear <[email protected]>
1 parent 79c0bf1 commit f9cd72a

File tree

4 files changed

+181
-36
lines changed

4 files changed

+181
-36
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: otelarrowreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix potential goroutine leak when in stream-shutdown.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [34236]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/otelarrowreceiver/internal/arrow/arrow.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -477,14 +477,19 @@ func (id *inFlightData) consumeDone(ctx context.Context, consumeErrPtr *error) {
477477
id.span.SetStatus(otelcodes.Error, retErr.Error())
478478
}
479479

480-
id.replyToCaller(retErr)
480+
id.replyToCaller(ctx, retErr)
481481
id.anyDone(ctx)
482482
}
483483

484-
func (id *inFlightData) replyToCaller(callerErr error) {
485-
id.pendingCh <- batchResp{
484+
func (id *inFlightData) replyToCaller(ctx context.Context, callerErr error) {
485+
select {
486+
case id.pendingCh <- batchResp{
486487
id: id.batchID,
487488
err: callerErr,
489+
}:
490+
// OK: Responded.
491+
case <-ctx.Done():
492+
// OK: Never responded due to cancelation.
488493
}
489494
}
490495

@@ -585,7 +590,7 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
585590
var authErr error
586591
inflightCtx, authErr = r.authServer.Authenticate(inflightCtx, authHdrs)
587592
if authErr != nil {
588-
flight.replyToCaller(status.Error(codes.Unauthenticated, authErr.Error()))
593+
flight.replyToCaller(inflightCtx, status.Error(codes.Unauthenticated, authErr.Error()))
589594
return nil
590595
}
591596
}

receiver/otelarrowreceiver/internal/arrow/arrow_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,10 +1188,11 @@ func testReceiverAuthHeaders(t *testing.T, includeMeta bool, dataAuth bool) {
11881188
nil,
11891189
}
11901190

1191-
var recvBatches []*arrowpb.BatchStatus
1191+
recvBatches := make([]*arrowpb.BatchStatus, len(expectData))
11921192

11931193
ctc.stream.EXPECT().Send(gomock.Any()).Times(len(expectData)).DoAndReturn(func(batch *arrowpb.BatchStatus) error {
1194-
recvBatches = append(recvBatches, batch)
1194+
require.Nil(t, recvBatches[batch.BatchId])
1195+
recvBatches[batch.BatchId] = batch
11951196
return nil
11961197
})
11971198

receiver/otelarrowreceiver/otelarrow_test.go

Lines changed: 142 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"context"
99
"errors"
1010
"fmt"
11+
"io"
1112
"net"
1213
"strconv"
1314
"sync"
@@ -316,7 +317,6 @@ func TestOTelArrowShutdown(t *testing.T) {
316317
cfg.GRPC.Keepalive = &configgrpc.KeepaliveServerConfig{
317318
ServerParameters: &configgrpc.KeepaliveServerParameters{},
318319
}
319-
// Note that keepalive parameters are set very high
320320
if !cooperative {
321321
cfg.GRPC.Keepalive.ServerParameters.MaxConnectionAge = time.Second
322322
cfg.GRPC.Keepalive.ServerParameters.MaxConnectionAgeGrace = 5 * time.Second
@@ -338,36 +338,48 @@ func TestOTelArrowShutdown(t *testing.T) {
338338

339339
conn, err := grpc.NewClient(endpointGrpc, grpc.WithTransportCredentials(insecure.NewCredentials()))
340340
require.NoError(t, err)
341-
defer conn.Close()
342-
343-
doneSignalGrpc := make(chan bool)
341+
defer func() {
342+
require.NoError(t, conn.Close())
343+
}()
344344

345345
client := arrowpb.NewArrowTracesServiceClient(conn)
346346
stream, err := client.ArrowTraces(ctx, grpc.WaitForReady(true))
347347
require.NoError(t, err)
348348
producer := arrowRecord.NewProducer()
349-
defer func() {
350-
require.NoError(t, conn.Close())
351-
}()
352349

353350
start := time.Now()
354-
var once sync.Once
355-
356-
// Send traces to the receiver until we signal via done channel, and then
357-
// send one more trace after that.
358-
go generateTraces(func(td ptrace.Traces) {
359-
if time.Since(start) > 5*time.Second {
360-
once.Do(func() {
361-
if cooperative {
362-
require.NoError(t, stream.CloseSend())
363-
}
364-
})
365-
return
351+
352+
// Send traces to the receiver until we signal.
353+
go func() {
354+
for time.Since(start) < 5*time.Second {
355+
td := testdata.GenerateTraces(1)
356+
batch, batchErr := producer.BatchArrowRecordsFromTraces(td)
357+
require.NoError(t, batchErr)
358+
require.NoError(t, stream.Send(batch))
366359
}
367-
batch, batchErr := producer.BatchArrowRecordsFromTraces(td)
368-
require.NoError(t, batchErr)
369-
require.NoError(t, stream.Send(batch))
370-
}, doneSignalGrpc)
360+
361+
if cooperative {
362+
require.NoError(t, stream.CloseSend())
363+
}
364+
}()
365+
366+
var recvWG sync.WaitGroup
367+
recvWG.Add(1)
368+
369+
// Receive batch responses. See the comment on
370+
// https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
371+
// to explain why this must be done. We do not use the
372+
// return value, this just avoids leaking the stream context,
373+
// which can otherwise hang this test.
374+
go func() {
375+
defer recvWG.Done()
376+
for {
377+
if _, recvErr := stream.Recv(); recvErr == nil {
378+
continue
379+
}
380+
break
381+
}
382+
}()
371383

372384
// Wait until the receiver outputs anything to the sink.
373385
assert.Eventually(t, func() bool {
@@ -380,18 +392,14 @@ func TestOTelArrowShutdown(t *testing.T) {
380392
err = r.Shutdown(context.Background())
381393
assert.NoError(t, err)
382394

395+
// recvWG ensures the stream has been read before the test exits.
396+
recvWG.Wait()
397+
383398
// Remember how many spans the sink received. This number should not change after this
384399
// point because after Shutdown() returns the component is not allowed to produce
385400
// any more data.
386401
sinkSpanCountAfterShutdown := nextSink.SpanCount()
387402

388-
// Now signal to generateTraces to exit the main generation loop, then send
389-
// one more trace and stop.
390-
doneSignalGrpc <- true
391-
392-
// Wait until all follow up traces are sent.
393-
<-doneSignalGrpc
394-
395403
// The last, additional trace should not be received by sink, so the number of spans in
396404
// the sink should not change.
397405
assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpanCount())
@@ -413,6 +421,7 @@ func TestOTelArrowShutdown(t *testing.T) {
413421
}
414422
}
415423

424+
// generateTraces originates from the OTLP receiver "standard" shutdown test.
416425
func generateTraces(senderFn senderFunc, doneSignal chan bool) {
417426
// Continuously generate spans until signaled to stop.
418427
loop:
@@ -792,3 +801,106 @@ func TestConcurrentArrowReceiver(t *testing.T) {
792801
require.Equal(t, numStreams, counts[i])
793802
}
794803
}
804+
805+
// TestOTelArrowHalfOpenShutdown exercises a known condition in which Shutdown
806+
// can't succeed until the stream is canceled by an external signal.
807+
func TestOTelArrowHalfOpenShutdown(t *testing.T) {
808+
ctx, testCancel := context.WithCancel(context.Background())
809+
defer testCancel()
810+
811+
endpointGrpc := testutil.GetAvailableLocalAddress(t)
812+
813+
nextSink := new(consumertest.TracesSink)
814+
815+
factory := NewFactory()
816+
cfg := factory.CreateDefaultConfig().(*Config)
817+
cfg.GRPC.Keepalive = &configgrpc.KeepaliveServerConfig{
818+
ServerParameters: &configgrpc.KeepaliveServerParameters{},
819+
}
820+
// No keepalive parameters are set
821+
cfg.GRPC.NetAddr.Endpoint = endpointGrpc
822+
set := receivertest.NewNopSettings()
823+
824+
set.ID = testReceiverID
825+
r, err := NewFactory().CreateTracesReceiver(
826+
ctx,
827+
set,
828+
cfg,
829+
nextSink)
830+
require.NoError(t, err)
831+
require.NotNil(t, r)
832+
require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
833+
834+
conn, err := grpc.NewClient(endpointGrpc, grpc.WithTransportCredentials(insecure.NewCredentials()))
835+
require.NoError(t, err)
836+
defer func() {
837+
require.NoError(t, conn.Close())
838+
}()
839+
840+
client := arrowpb.NewArrowTracesServiceClient(conn)
841+
stream, err := client.ArrowTraces(ctx, grpc.WaitForReady(true))
842+
require.NoError(t, err)
843+
producer := arrowRecord.NewProducer()
844+
845+
start := time.Now()
846+
847+
// Send traces to the receiver until we signal.
848+
go func() {
849+
for time.Since(start) < 5*time.Second {
850+
select {
851+
case <-ctx.Done():
852+
return
853+
default:
854+
}
855+
td := testdata.GenerateTraces(1)
856+
batch, batchErr := producer.BatchArrowRecordsFromTraces(td)
857+
require.NoError(t, batchErr)
858+
859+
sendErr := stream.Send(batch)
860+
select {
861+
case <-ctx.Done():
862+
if sendErr != nil {
863+
require.ErrorIs(t, sendErr, io.EOF)
864+
}
865+
return
866+
default:
867+
require.NoError(t, sendErr)
868+
}
869+
}
870+
}()
871+
872+
// Do not receive batch responses.
873+
874+
// Wait until the receiver outputs anything to the sink.
875+
assert.Eventually(t, func() bool {
876+
return nextSink.SpanCount() > 0
877+
}, time.Second, 10*time.Millisecond)
878+
879+
// Let more load pile up.
880+
time.Sleep(time.Second)
881+
882+
// The receiver has wedged itself in a call to Send() that is blocked
883+
// and there is not a graceful way to recover. Schedule an operation
884+
// that will unblock it un-gracefully.
885+
go func() {
886+
// Without this cancel, the test hangs.
887+
time.Sleep(3 * time.Second)
888+
testCancel()
889+
}()
890+
891+
// Now shutdown the receiver, while continuing sending traces to it.
892+
err = r.Shutdown(context.Background())
893+
assert.NoError(t, err)
894+
895+
// Ensure that calls to Recv() get canceled
896+
for {
897+
_, err := stream.Recv()
898+
if err == nil {
899+
continue
900+
}
901+
status, ok := status.FromError(err)
902+
require.True(t, ok, "is a status error")
903+
require.Equal(t, codes.Canceled, status.Code())
904+
break
905+
}
906+
}

0 commit comments

Comments
 (0)