Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 82 additions & 95 deletions stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"context"
"fmt"
"io"
"net"
"reflect"
"sync"
"testing"
Expand Down Expand Up @@ -116,87 +115,84 @@ func getOutgoingStats(ctx context.Context, mdKey string) []byte {
return []byte(tagValues[len(tagValues)-1])
}

type testServer struct {
testgrpc.UnimplementedTestServiceServer
}
func newStatsTestStubServer() *stubserver.StubServer {
return &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
if err := grpc.SendHeader(ctx, testHeaderMetadata); err != nil {
return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testHeaderMetadata, err)
}
if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
}

func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
if err := grpc.SendHeader(ctx, testHeaderMetadata); err != nil {
return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testHeaderMetadata, err)
}
if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
}
if id := payloadToID(in.Payload); id == errorID {
return nil, fmt.Errorf("got error id: %v", id)
}

if id := payloadToID(in.Payload); id == errorID {
return nil, fmt.Errorf("got error id: %v", id)
}
return &testpb.SimpleResponse{Payload: in.Payload}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
if err := stream.SendHeader(testHeaderMetadata); err != nil {
return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil)
}
stream.SetTrailer(testTrailerMetadata)
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
return nil
}
if err != nil {
return err
}

if id := payloadToID(in.Payload); id == errorID {
return fmt.Errorf("got error id: %v", id)
}

if err := stream.Send(&testpb.StreamingOutputCallResponse{Payload: in.Payload}); err != nil {
return err
}
}
},
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
if err := stream.SendHeader(testHeaderMetadata); err != nil {
return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil)
}
stream.SetTrailer(testTrailerMetadata)
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
return stream.SendAndClose(&testpb.StreamingInputCallResponse{AggregatedPayloadSize: 0})
}
if err != nil {
return err
}

if id := payloadToID(in.Payload); id == errorID {
return fmt.Errorf("got error id: %v", id)
}
}
},
StreamingOutputCallF: func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
if err := stream.SendHeader(testHeaderMetadata); err != nil {
return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil)
}
stream.SetTrailer(testTrailerMetadata)

return &testpb.SimpleResponse{Payload: in.Payload}, nil
}
if id := payloadToID(req.Payload); id == errorID {
return fmt.Errorf("got error id: %v", id)
}

func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
if err := stream.SendHeader(testHeaderMetadata); err != nil {
return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil)
}
stream.SetTrailer(testTrailerMetadata)
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
for i := 0; i < 5; i++ {
if err := stream.Send(&testpb.StreamingOutputCallResponse{Payload: req.Payload}); err != nil {
return err
}
}
return nil
}
if err != nil {
return err
}

if id := payloadToID(in.Payload); id == errorID {
return fmt.Errorf("got error id: %v", id)
}

if err := stream.Send(&testpb.StreamingOutputCallResponse{Payload: in.Payload}); err != nil {
return err
}
}
}

func (s *testServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error {
if err := stream.SendHeader(testHeaderMetadata); err != nil {
return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil)
}
stream.SetTrailer(testTrailerMetadata)
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
return stream.SendAndClose(&testpb.StreamingInputCallResponse{AggregatedPayloadSize: 0})
}
if err != nil {
return err
}

if id := payloadToID(in.Payload); id == errorID {
return fmt.Errorf("got error id: %v", id)
}
}
}

func (s *testServer) StreamingOutputCall(in *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
if err := stream.SendHeader(testHeaderMetadata); err != nil {
return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil)
}
stream.SetTrailer(testTrailerMetadata)

if id := payloadToID(in.Payload); id == errorID {
return fmt.Errorf("got error id: %v", id)
}

for i := 0; i < 5; i++ {
if err := stream.Send(&testpb.StreamingOutputCallResponse{Payload: in.Payload}); err != nil {
return err
}
},
}
return nil
}

// test is an end-to-end test. It should be created with the newTest
Expand All @@ -208,9 +204,8 @@ type test struct {
clientStatsHandlers []stats.Handler
serverStatsHandlers []stats.Handler

testServer testgrpc.TestServiceServer // nil means none
// srv and srvAddr are set once startServer is called.
srv *grpc.Server
srv stubserver.GRPCServer
srvAddr string

cc *grpc.ClientConn // nil until requested via clientConn
Expand Down Expand Up @@ -243,12 +238,7 @@ func newTest(t *testing.T, tc *testConfig, chs []stats.Handler, shs []stats.Hand

// startServer starts a gRPC server listening. Callers should defer a
// call to te.tearDown to clean up.
func (te *test) startServer(ts testgrpc.TestServiceServer) {
te.testServer = ts
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
te.t.Fatalf("Failed to listen: %v", err)
}
func (te *test) startServer(ss *stubserver.StubServer) {
var opts []grpc.ServerOption
if te.compress == "gzip" {
opts = append(opts,
Expand All @@ -259,14 +249,11 @@ func (te *test) startServer(ts testgrpc.TestServiceServer) {
for _, sh := range te.serverStatsHandlers {
opts = append(opts, grpc.StatsHandler(sh))
}
s := grpc.NewServer(opts...)
te.srv = s
if te.testServer != nil {
testgrpc.RegisterTestServiceServer(s, te.testServer)
if err := ss.StartServer(opts...); err != nil {
te.t.Fatalf("StubServer.StartServer() failed: %v", err)
}

go s.Serve(lis)
te.srvAddr = lis.Addr().String()
te.srv = ss.S
te.srvAddr = ss.Address
}

func (te *test) clientConn(ctx context.Context) *grpc.ClientConn {
Expand Down Expand Up @@ -842,7 +829,7 @@ func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkF
func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) {
h := &statshandler{}
te := newTest(t, tc, nil, []stats.Handler{h})
te.startServer(&testServer{})
te.startServer(newStatsTestStubServer())
defer te.tearDown()

var (
Expand Down Expand Up @@ -1142,7 +1129,7 @@ func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkF
func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map[int]*checkFuncWithCount) {
h := &statshandler{}
te := newTest(t, tc, []stats.Handler{h}, nil)
te.startServer(&testServer{})
te.startServer(newStatsTestStubServer())
defer te.tearDown()

var (
Expand Down Expand Up @@ -1375,7 +1362,7 @@ func (s) TestMultipleClientStatsHandler(t *testing.T) {
h := &statshandler{}
tc := &testConfig{compress: ""}
te := newTest(t, tc, []stats.Handler{h, h}, nil)
te.startServer(&testServer{})
te.startServer(newStatsTestStubServer())
defer te.tearDown()

cc := &rpcConfig{success: false, failfast: false, callType: unaryRPC}
Expand Down Expand Up @@ -1421,7 +1408,7 @@ func (s) TestMultipleServerStatsHandler(t *testing.T) {
h := &statshandler{}
tc := &testConfig{compress: ""}
te := newTest(t, tc, nil, []stats.Handler{h, h})
te.startServer(&testServer{})
te.startServer(newStatsTestStubServer())
defer te.tearDown()

cc := &rpcConfig{success: false, failfast: false, callType: unaryRPC}
Expand Down
Loading