diff --git a/reporter/grpc_upload_client.go b/reporter/grpc_upload_client.go index 52eaaaa99a..b2216f5d9c 100644 --- a/reporter/grpc_upload_client.go +++ b/reporter/grpc_upload_client.go @@ -60,6 +60,12 @@ func (c *GrpcUploadClient) grpcUpload(ctx context.Context, uploadInstructions *d return 0, fmt.Errorf("initiate upload: %w", err) } + defer func() { + if stream != nil { + stream.CloseAndRecv() + } + }() + err = stream.Send(&debuginfopb.UploadRequest{ Data: &debuginfopb.UploadRequest_Info{ Info: &debuginfopb.UploadInfo{ diff --git a/reporter/parca_reporter.go b/reporter/parca_reporter.go index 1d3033d1f2..4181941f74 100644 --- a/reporter/parca_reporter.go +++ b/reporter/parca_reporter.go @@ -1001,6 +1001,7 @@ func (r *ParcaReporter) reportDataToBackend(ctx context.Context, buf *bytes.Buff if err != nil { return err } + defer client.CloseSend() if err := client.Send(&profilestorepb.WriteRequest{ Record: buf.Bytes(), @@ -1085,7 +1086,20 @@ func (r *ParcaReporter) reportDataToBackend(ctx context.Context, buf *bytes.Buff } r.stacktraceWriteRequestBytes.Add(float64(buf.Len())) - return client.CloseSend() + // CloseSend() is deferred at the top of this function. + // Drain any remaining responses so the gRPC helper goroutine + // (newClientStreamWithParams.func4) can exit. + for { + _, err := client.Recv() + if err == io.EOF { + break + } + if err != nil { + return err + } + } + + return nil } func (r *ParcaReporter) writeCommonLabels(w *SampleWriter, rows uint64) {