Skip to content

Commit 39400b9

Browse files
client: Roll-forward PR #8278(with changes): Restore the existing behavior to return io.EOF on repeated RecvMsg() calls for client-streaming RPCs (#8523)
Partially addresses: #7286 This reverts commit 20bd1e7 Changes: - Modifies client.RecvMsg() so that successive calls after stream ends return io.EOF. - Adds extra state to track calls to client.recvmsg(required to return Cardinality Violation only in case zero response) RELEASE NOTES: * client: Return status code INTERNAL when a server sends 0 response messages for a unary or client streaming RPC.
1 parent 3c743c9 commit 39400b9

File tree

2 files changed

+175
-7
lines changed

2 files changed

+175
-7
lines changed

stream.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,8 @@ type clientStream struct {
549549

550550
sentLast bool // sent an end stream
551551

552+
receivedFirstMsg bool // set after the first message is received
553+
552554
methodConfig *MethodConfig
553555

554556
ctx context.Context // the application's context, wrapped by stats/tracing
@@ -1144,11 +1146,16 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
11441146
if statusErr := a.transportStream.Status().Err(); statusErr != nil {
11451147
return statusErr
11461148
}
1149+
// Received no msg and status OK for non-server streaming rpcs.
1150+
if !cs.desc.ServerStreams && !cs.receivedFirstMsg {
1151+
return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
1152+
}
11471153
return io.EOF // indicates successful end of stream.
11481154
}
11491155

11501156
return toRPCErr(err)
11511157
}
1158+
cs.receivedFirstMsg = true
11521159
if a.trInfo != nil {
11531160
a.mu.Lock()
11541161
if a.trInfo.tr != nil {
@@ -1177,7 +1184,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
11771184
} else if err != nil {
11781185
return toRPCErr(err)
11791186
}
1180-
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
1187+
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
11811188
}
11821189

11831190
func (a *csAttempt) finish(err error) {
@@ -1359,6 +1366,7 @@ type addrConnStream struct {
13591366
transport transport.ClientTransport
13601367
ctx context.Context
13611368
sentLast bool
1369+
receivedFirstMsg bool
13621370
desc *StreamDesc
13631371
codec baseCodec
13641372
sendCompressorV0 Compressor
@@ -1484,10 +1492,15 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
14841492
if statusErr := as.transportStream.Status().Err(); statusErr != nil {
14851493
return statusErr
14861494
}
1495+
// Received no msg and status OK for non-server streaming rpcs.
1496+
if !as.desc.ServerStreams && !as.receivedFirstMsg {
1497+
return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
1498+
}
14871499
return io.EOF // indicates successful end of stream.
14881500
}
14891501
return toRPCErr(err)
14901502
}
1503+
as.receivedFirstMsg = true
14911504

14921505
if as.desc.ServerStreams {
14931506
// Subsequent messages should be received by subsequent RecvMsg calls.
@@ -1501,7 +1514,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
15011514
} else if err != nil {
15021515
return toRPCErr(err)
15031516
}
1504-
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
1517+
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
15051518
}
15061519

15071520
func (as *addrConnStream) finish(err error) {

test/end2end_test.go

Lines changed: 160 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3590,9 +3590,6 @@ func testClientStreamingError(t *testing.T, e env) {
35903590
// Tests that a client receives a cardinality violation error for client-streaming
35913591
// RPCs if the server doesn't send a message before returning status OK.
35923592
func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) {
3593-
// TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()`
3594-
// after this is fixed.
3595-
t.Skip()
35963593
ss := &stubserver.StubServer{
35973594
StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error {
35983595
// Returning status OK without sending a response message.This is a
@@ -3741,6 +3738,165 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) {
37413738
}
37423739
}
37433740

3741+
// Tests that client receives a cardinality violation error for unary
3742+
// RPCs if the server doesn't send a message before returning status OK.
3743+
func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) {
3744+
lis, err := testutils.LocalTCPListener()
3745+
if err != nil {
3746+
t.Fatal(err)
3747+
}
3748+
defer lis.Close()
3749+
3750+
ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error {
3751+
return nil
3752+
})
3753+
3754+
s := grpc.NewServer(ss)
3755+
go s.Serve(lis)
3756+
defer s.Stop()
3757+
3758+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3759+
defer cancel()
3760+
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
3761+
if err != nil {
3762+
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err)
3763+
}
3764+
defer cc.Close()
3765+
3766+
client := testgrpc.NewTestServiceClient(cc)
3767+
if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal {
3768+
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal)
3769+
}
3770+
}
3771+
3772+
// Tests the behavior for unary RPC when client calls RecvMsg() twice.
3773+
// Second call to RecvMsg should fail with io.EOF.
3774+
func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) {
3775+
e := tcpTLSEnv
3776+
te := newTest(t, e)
3777+
defer te.tearDown()
3778+
3779+
te.startServer(&testServer{security: e.security})
3780+
3781+
cc := te.clientConn()
3782+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3783+
defer cancel()
3784+
3785+
desc := &grpc.StreamDesc{
3786+
StreamName: "UnaryCall",
3787+
ServerStreams: false,
3788+
ClientStreams: false,
3789+
}
3790+
stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall")
3791+
if err != nil {
3792+
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
3793+
}
3794+
3795+
if err := stream.SendMsg(&testpb.SimpleRequest{}); err != nil {
3796+
t.Fatalf("stream.SendMsg(_) = %v, want <nil>", err)
3797+
}
3798+
3799+
resp := &testpb.SimpleResponse{}
3800+
if err := stream.RecvMsg(resp); err != nil {
3801+
t.Fatalf("stream.RecvMsg() = %v , want <nil>", err)
3802+
}
3803+
3804+
if err = stream.RecvMsg(resp); err != io.EOF {
3805+
t.Errorf("stream.RecvMsg() = %v, want error %v", err, io.EOF)
3806+
}
3807+
}
3808+
3809+
// Tests the behavior for unary RPC when server calls SendMsg() twice.
3810+
// Client should fail with cardinality violation error.
3811+
func (s) TestUnaryRPC_ServerCallSendMsgTwice(t *testing.T) {
3812+
lis, err := testutils.LocalTCPListener()
3813+
if err != nil {
3814+
t.Fatal(err)
3815+
}
3816+
defer lis.Close()
3817+
3818+
s := grpc.NewServer()
3819+
serviceDesc := grpc.ServiceDesc{
3820+
ServiceName: "grpc.testing.TestService",
3821+
HandlerType: (*any)(nil),
3822+
Methods: []grpc.MethodDesc{},
3823+
Streams: []grpc.StreamDesc{
3824+
{
3825+
StreamName: "UnaryCall",
3826+
Handler: func(_ any, stream grpc.ServerStream) error {
3827+
if err := stream.RecvMsg(&testpb.Empty{}); err != nil {
3828+
t.Errorf("stream.RecvMsg() = %v, want <nil>", err)
3829+
}
3830+
3831+
if err = stream.SendMsg(&testpb.Empty{}); err != nil {
3832+
t.Errorf("stream.SendMsg() = %v, want <nil>", err)
3833+
}
3834+
3835+
if err = stream.SendMsg(&testpb.Empty{}); err != nil {
3836+
t.Errorf("stream.SendMsg() = %v, want <nil>", err)
3837+
}
3838+
return nil
3839+
},
3840+
ClientStreams: false,
3841+
ServerStreams: false,
3842+
},
3843+
},
3844+
}
3845+
s.RegisterService(&serviceDesc, &testServer{})
3846+
go s.Serve(lis)
3847+
defer s.Stop()
3848+
3849+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3850+
defer cancel()
3851+
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
3852+
if err != nil {
3853+
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err)
3854+
}
3855+
defer cc.Close()
3856+
3857+
client := testgrpc.NewTestServiceClient(cc)
3858+
if _, err = client.UnaryCall(ctx, &testpb.SimpleRequest{}); status.Code(err) != codes.Internal {
3859+
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal)
3860+
}
3861+
}
3862+
3863+
// Tests the behavior for client-streaming RPC when client calls RecvMsg() twice.
3864+
// Second call to RecvMsg should fail with io.EOF.
3865+
func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) {
3866+
ss := stubserver.StubServer{
3867+
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
3868+
if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil {
3869+
t.Errorf("stream.SendAndClose(_) = %v, want <nil>", err)
3870+
}
3871+
return nil
3872+
},
3873+
}
3874+
if err := ss.Start(nil); err != nil {
3875+
t.Fatal("Error starting server:", err)
3876+
}
3877+
defer ss.Stop()
3878+
3879+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3880+
defer cancel()
3881+
stream, err := ss.Client.StreamingInputCall(ctx)
3882+
if err != nil {
3883+
t.Fatalf(".StreamingInputCall(_) = _, %v, want <nil>", err)
3884+
}
3885+
if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil {
3886+
t.Fatalf("stream.Send(_) = %v, want <nil>", err)
3887+
}
3888+
if err := stream.CloseSend(); err != nil {
3889+
t.Fatalf("stream.CloseSend() = %v, want <nil>", err)
3890+
}
3891+
resp := new(testpb.StreamingInputCallResponse)
3892+
if err := stream.RecvMsg(resp); err != nil {
3893+
t.Fatalf("stream.RecvMsg() = %v , want <nil>", err)
3894+
}
3895+
if err = stream.RecvMsg(resp); err != io.EOF {
3896+
t.Errorf("stream.RecvMsg() = %v, want error %v", err, io.EOF)
3897+
}
3898+
}
3899+
37443900
// Tests the behavior for server-side streaming when client calls SendMsg twice.
37453901
// Second call to SendMsg should fail with Internal error and result in closing
37463902
// the connection with a RST_STREAM.
@@ -3802,7 +3958,6 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) {
38023958
<-handlerDone
38033959
}
38043960

3805-
// TODO(i/7286) : Add tests to check server-side behavior for Unary RPC.
38063961
// Tests the behavior for unary RPC when client calls SendMsg twice. Second call
38073962
// to SendMsg should fail with Internal error.
38083963
func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) {
@@ -3981,7 +4136,7 @@ func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) {
39814136
}
39824137

39834138
// Tests that a client receives a cardinality violation error for client-streaming
3984-
// RPCs if the server call SendMsg multiple times.
4139+
// RPCs if the server calls SendMsg() multiple times.
39854140
func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) {
39864141
ss := stubserver.StubServer{
39874142
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {

0 commit comments

Comments
 (0)