Skip to content

Commit 62ec29f

Browse files
grpc: Fix cardinality violations in non-client streaming RPCs. (#8385)
1 parent 85240a5 commit 62ec29f

File tree

3 files changed

+262
-1
lines changed

3 files changed

+262
-1
lines changed

server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1598,6 +1598,7 @@ func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.Serv
15981598
s: stream,
15991599
p: &parser{r: stream, bufferPool: s.opts.bufferPool},
16001600
codec: s.getCodec(stream.ContentSubtype()),
1601+
desc: sd,
16011602
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
16021603
maxSendMessageSize: s.opts.maxSendMessageSize,
16031604
trInfo: trInfo,

stream.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1586,6 +1586,7 @@ type serverStream struct {
15861586
s *transport.ServerStream
15871587
p *parser
15881588
codec baseCodec
1589+
desc *StreamDesc
15891590

15901591
compressorV0 Compressor
15911592
compressorV1 encoding.Compressor
@@ -1594,6 +1595,8 @@ type serverStream struct {
15941595

15951596
sendCompressorName string
15961597

1598+
recvFirstMsg bool // set after the first message is received
1599+
15971600
maxReceiveMessageSize int
15981601
maxSendMessageSize int
15991602
trInfo *traceInfo
@@ -1780,13 +1783,18 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
17801783
binlog.Log(ss.ctx, chc)
17811784
}
17821785
}
1786+
// Received no request msg for non-client streaming rpcs.
1787+
if !ss.desc.ClientStreams && !ss.recvFirstMsg {
1788+
return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC")
1789+
}
17831790
return err
17841791
}
17851792
if err == io.ErrUnexpectedEOF {
17861793
err = status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())
17871794
}
17881795
return toRPCErr(err)
17891796
}
1797+
ss.recvFirstMsg = true
17901798
if len(ss.statsHandler) != 0 {
17911799
for _, sh := range ss.statsHandler {
17921800
sh.HandleRPC(ss.s.Context(), &stats.InPayload{
@@ -1806,7 +1814,19 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
18061814
binlog.Log(ss.ctx, cm)
18071815
}
18081816
}
1809-
return nil
1817+
1818+
if ss.desc.ClientStreams {
1819+
// Subsequent messages should be received by subsequent RecvMsg calls.
1820+
return nil
1821+
}
1822+
// Special handling for non-client-stream rpcs.
1823+
// This recv expects EOF or errors, so we don't collect inPayload.
1824+
if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, nil, ss.decompressorV1, true); err == io.EOF {
1825+
return nil
1826+
} else if err != nil {
1827+
return err
1828+
}
1829+
return status.Error(codes.Internal, "cardinality violation: received multiple request messages for non-client-streaming RPC")
18101830
}
18111831

18121832
// MethodFromServerStream returns the method string for the input stream.

test/end2end_test.go

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import (
5151
"google.golang.org/grpc/connectivity"
5252
"google.golang.org/grpc/credentials"
5353
"google.golang.org/grpc/credentials/insecure"
54+
"google.golang.org/grpc/credentials/local"
5455
"google.golang.org/grpc/health"
5556
"google.golang.org/grpc/internal"
5657
"google.golang.org/grpc/internal/binarylog"
@@ -3740,6 +3741,245 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) {
37403741
}
37413742
}
37423743

3744+
// Tests the behavior for server-side streaming when client calls SendMsg twice.
3745+
// Second call to SendMsg should fail with Internal error and result in closing
3746+
// the connection with a RST_STREAM.
3747+
func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) {
3748+
// To ensure initial call to server.recvMsg() made by the generated code is successfully
3749+
// completed. Otherwise, if the client attempts to send a second request message, that
3750+
// will trigger a RST_STREAM from the client due to the application violating the RPC's
3751+
// protocol. The RST_STREAM could cause the server’s first RecvMsg to fail and will prevent
3752+
// the method handler from being called.
3753+
recvDoneOnServer := make(chan struct{})
3754+
// To ensure goroutine for test does not end before RPC handler performs error
3755+
// checking.
3756+
handlerDone := make(chan struct{})
3757+
ss := stubserver.StubServer{
3758+
StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
3759+
close(recvDoneOnServer)
3760+
// Block until the stream’s context is done. Second call to client.SendMsg
3761+
// triggers a RST_STREAM which cancels the stream context on the server.
3762+
<-stream.Context().Done()
3763+
if err := stream.SendMsg(&testpb.StreamingOutputCallRequest{}); status.Code(err) != codes.Canceled {
3764+
t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Canceled)
3765+
}
3766+
close(handlerDone)
3767+
return nil
3768+
},
3769+
}
3770+
if err := ss.Start(nil); err != nil {
3771+
t.Fatal("Error starting server:", err)
3772+
}
3773+
defer ss.Stop()
3774+
3775+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3776+
defer cancel()
3777+
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials()))
3778+
if err != nil {
3779+
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err)
3780+
}
3781+
defer cc.Close()
3782+
3783+
desc := &grpc.StreamDesc{
3784+
StreamName: "StreamingOutputCall",
3785+
ServerStreams: true,
3786+
ClientStreams: false,
3787+
}
3788+
3789+
stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall")
3790+
if err != nil {
3791+
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
3792+
}
3793+
3794+
if err := stream.SendMsg(&testpb.Empty{}); err != nil {
3795+
t.Errorf("stream.SendMsg() = %v, want <nil>", err)
3796+
}
3797+
3798+
<-recvDoneOnServer
3799+
if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal {
3800+
t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Internal)
3801+
}
3802+
<-handlerDone
3803+
}
3804+
3805+
// TODO(i/7286) : Add tests to check server-side behavior for Unary RPC.
3806+
// Tests the behavior for unary RPC when client calls SendMsg twice. Second call
3807+
// to SendMsg should fail with Internal error.
3808+
func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) {
3809+
ss := stubserver.StubServer{
3810+
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
3811+
return &testpb.SimpleResponse{}, nil
3812+
},
3813+
}
3814+
if err := ss.Start(nil); err != nil {
3815+
t.Fatal("Error starting server:", err)
3816+
}
3817+
defer ss.Stop()
3818+
3819+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3820+
defer cancel()
3821+
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials()))
3822+
if err != nil {
3823+
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err)
3824+
}
3825+
defer cc.Close()
3826+
3827+
desc := &grpc.StreamDesc{
3828+
StreamName: "UnaryCall",
3829+
ServerStreams: false,
3830+
ClientStreams: false,
3831+
}
3832+
3833+
stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall")
3834+
if err != nil {
3835+
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
3836+
}
3837+
3838+
if err := stream.SendMsg(&testpb.Empty{}); err != nil {
3839+
t.Errorf("stream.SendMsg() = %v, want <nil>", err)
3840+
}
3841+
3842+
if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal {
3843+
t.Errorf("stream.SendMsg() = %v, want error %v", status.Code(err), codes.Internal)
3844+
}
3845+
}
3846+
3847+
// Tests the behavior for server-side streaming RPC when client misbehaves as Bidi-streaming
3848+
// and sends multiple messages.
3849+
func (s) TestServerStreaming_ClientSendsMultipleMessages(t *testing.T) {
3850+
// The initial call to recvMsg made by the generated code, will return the error.
3851+
ss := stubserver.StubServer{}
3852+
if err := ss.Start(nil); err != nil {
3853+
t.Fatal("Error starting server:", err)
3854+
}
3855+
defer ss.Stop()
3856+
3857+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3858+
defer cancel()
3859+
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials()))
3860+
if err != nil {
3861+
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err)
3862+
}
3863+
defer cc.Close()
3864+
3865+
// Making the client bi-di to bypass the client side checks that stop a non-streaming client
3866+
// from sending multiple messages.
3867+
desc := &grpc.StreamDesc{
3868+
StreamName: "StreamingOutputCall",
3869+
ServerStreams: true,
3870+
ClientStreams: true,
3871+
}
3872+
3873+
stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall")
3874+
if err != nil {
3875+
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
3876+
}
3877+
3878+
if err := stream.SendMsg(&testpb.Empty{}); err != nil {
3879+
t.Errorf("stream.SendMsg() = %v, want <nil>", err)
3880+
}
3881+
3882+
if err := stream.SendMsg(&testpb.Empty{}); err != nil {
3883+
t.Errorf("stream.SendMsg() = %v, want <nil>", err)
3884+
}
3885+
3886+
if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal {
3887+
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal)
3888+
}
3889+
}
3890+
3891+
// Tests the behavior of server for server-side streaming RPC when client sends zero request messages.
3892+
func (s) TestServerStreaming_ServerRecvZeroRequests(t *testing.T) {
3893+
testCases := []struct {
3894+
name string
3895+
desc *grpc.StreamDesc
3896+
wantCode codes.Code
3897+
}{
3898+
{
3899+
name: "BidiStreaming",
3900+
desc: &grpc.StreamDesc{
3901+
StreamName: "StreamingOutputCall",
3902+
ServerStreams: true,
3903+
ClientStreams: true,
3904+
},
3905+
wantCode: codes.Internal,
3906+
},
3907+
{
3908+
name: "ClientStreaming",
3909+
desc: &grpc.StreamDesc{
3910+
StreamName: "StreamingOutputCall",
3911+
ServerStreams: false,
3912+
ClientStreams: true,
3913+
},
3914+
wantCode: codes.Internal,
3915+
},
3916+
}
3917+
3918+
for _, tc := range testCases {
3919+
// The initial call to recvMsg made by the generated code, will return the error.
3920+
ss := stubserver.StubServer{}
3921+
if err := ss.Start(nil); err != nil {
3922+
t.Fatal("Error starting server:", err)
3923+
}
3924+
defer ss.Stop()
3925+
3926+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3927+
defer cancel()
3928+
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials()))
3929+
if err != nil {
3930+
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err)
3931+
}
3932+
defer cc.Close()
3933+
3934+
stream, err := cc.NewStream(ctx, tc.desc, "/grpc.testing.TestService/StreamingOutputCall")
3935+
if err != nil {
3936+
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
3937+
}
3938+
3939+
if err := stream.CloseSend(); err != nil {
3940+
t.Errorf("stream.CloseSend() = %v, want <nil>", err)
3941+
}
3942+
3943+
if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != tc.wantCode {
3944+
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), tc.wantCode)
3945+
}
3946+
}
3947+
}
3948+
3949+
// Tests the behavior of client for server-side streaming RPC when client sends zero request messages.
3950+
func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) {
3951+
t.Skip("blocked on i/7286")
3952+
// The initial call to recvMsg made by the generated code, will return the error.
3953+
ss := stubserver.StubServer{}
3954+
if err := ss.Start(nil); err != nil {
3955+
t.Fatal("Error starting server:", err)
3956+
}
3957+
defer ss.Stop()
3958+
3959+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3960+
defer cancel()
3961+
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials()))
3962+
if err != nil {
3963+
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err)
3964+
}
3965+
defer cc.Close()
3966+
3967+
desc := &grpc.StreamDesc{
3968+
StreamName: "StreamingOutputCall",
3969+
ServerStreams: true,
3970+
ClientStreams: false,
3971+
}
3972+
3973+
stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall")
3974+
if err != nil {
3975+
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
3976+
}
3977+
3978+
if err := stream.CloseSend(); status.Code(err) != codes.Internal {
3979+
t.Errorf("stream.CloseSend() = %v, want error %v", status.Code(err), codes.Internal)
3980+
}
3981+
}
3982+
37433983
// Tests that a client receives a cardinality violation error for client-streaming
37443984
// RPCs if the server call SendMsg multiple times.
37453985
func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) {

0 commit comments

Comments
 (0)