@@ -247,13 +247,16 @@ type handleStreamTest struct {
247
247
ht * serverHandlerTransport
248
248
}
249
249
250
- type mockStatsHandler struct {}
250
+ type mockStatsHandler struct {
251
+ rpcStatsCh chan stats.RPCStats
252
+ }
251
253
252
254
func (h * mockStatsHandler ) TagRPC (ctx context.Context , _ * stats.RPCTagInfo ) context.Context {
253
255
return ctx
254
256
}
255
257
256
- func (h * mockStatsHandler ) HandleRPC (context.Context , stats.RPCStats ) {
258
+ func (h * mockStatsHandler ) HandleRPC (_ context.Context , s stats.RPCStats ) {
259
+ h .rpcStatsCh <- s
257
260
}
258
261
259
262
func (h * mockStatsHandler ) TagConn (ctx context.Context , _ * stats.ConnTagInfo ) context.Context {
@@ -263,7 +266,7 @@ func (h *mockStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) co
263
266
func (h * mockStatsHandler ) HandleConn (context.Context , stats.ConnStats ) {
264
267
}
265
268
266
- func newHandleStreamTest (t * testing.T ) * handleStreamTest {
269
+ func newHandleStreamTest (t * testing.T , statsHandlers []stats. Handler ) * handleStreamTest {
267
270
bodyr , bodyw := io .Pipe ()
268
271
req := & http.Request {
269
272
ProtoMajor : 2 ,
@@ -277,11 +280,6 @@ func newHandleStreamTest(t *testing.T) *handleStreamTest {
277
280
Body : bodyr ,
278
281
}
279
282
rw := newTestHandlerResponseWriter ().(testHandlerResponseWriter )
280
- // Add mock stats handlers to exercise the stats handler code path.
281
- statsHandlers := make ([]stats.Handler , 0 , 5 )
282
- for range 5 {
283
- statsHandlers = append (statsHandlers , & mockStatsHandler {})
284
- }
285
283
ht , err := NewServerHandlerTransport (rw , req , statsHandlers , mem .DefaultBufferPool ())
286
284
if err != nil {
287
285
t .Fatal (err )
@@ -295,7 +293,7 @@ func newHandleStreamTest(t *testing.T) *handleStreamTest {
295
293
}
296
294
297
295
func (s ) TestHandlerTransport_HandleStreams (t * testing.T ) {
298
- st := newHandleStreamTest (t )
296
+ st := newHandleStreamTest (t , nil )
299
297
handleStream := func (s * ServerStream ) {
300
298
if want := "/service/foo.bar" ; s .method != want {
301
299
t .Errorf ("stream method = %q; want %q" , s .method , want )
@@ -364,7 +362,7 @@ func (s) TestHandlerTransport_HandleStreams_InvalidArgument(t *testing.T) {
364
362
}
365
363
366
364
func handleStreamCloseBodyTest (t * testing.T , statusCode codes.Code , msg string ) {
367
- st := newHandleStreamTest (t )
365
+ st := newHandleStreamTest (t , nil )
368
366
369
367
handleStream := func (s * ServerStream ) {
370
368
s .WriteStatus (status .New (statusCode , msg ))
@@ -473,7 +471,7 @@ func (s) TestHandlerTransport_HandleStreams_WriteStatusWrite(t *testing.T) {
473
471
}
474
472
475
473
func testHandlerTransportHandleStreams (t * testing.T , handleStream func (st * handleStreamTest , s * ServerStream )) {
476
- st := newHandleStreamTest (t )
474
+ st := newHandleStreamTest (t , nil )
477
475
ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
478
476
t .Cleanup (cancel )
479
477
st .ht .HandleStreams (
@@ -505,7 +503,59 @@ func (s) TestHandlerTransport_HandleStreams_ErrDetails(t *testing.T) {
505
503
t .Fatal (err )
506
504
}
507
505
508
- hst := newHandleStreamTest (t )
506
+ hst := newHandleStreamTest (t , nil )
507
+ handleStream := func (s * ServerStream ) {
508
+ s .WriteStatus (st )
509
+ }
510
+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
511
+ defer cancel ()
512
+ hst .ht .HandleStreams (
513
+ ctx , func (s * ServerStream ) { go handleStream (s ) },
514
+ )
515
+ wantHeader := http.Header {
516
+ "Date" : nil ,
517
+ "Content-Type" : {"application/grpc" },
518
+ "Trailer" : {"Grpc-Status" , "Grpc-Message" , "Grpc-Status-Details-Bin" },
519
+ }
520
+ wantTrailer := http.Header {
521
+ "Grpc-Status" : {fmt .Sprint (uint32 (statusCode ))},
522
+ "Grpc-Message" : {encodeGrpcMessage (msg )},
523
+ "Grpc-Status-Details-Bin" : {encodeBinHeader (stBytes )},
524
+ }
525
+
526
+ checkHeaderAndTrailer (t , hst .rw , wantHeader , wantTrailer )
527
+ }
528
+
529
+ // Tests the use of stats handlers and ensures there are no data races while
530
+ // accessing trailers.
531
+ func (s ) TestHandlerTransport_HandleStreams_StatsHandlers (t * testing.T ) {
532
+ errDetails := []protoadapt.MessageV1 {
533
+ & epb.RetryInfo {
534
+ RetryDelay : & durationpb.Duration {Seconds : 60 },
535
+ },
536
+ & epb.ResourceInfo {
537
+ ResourceType : "foo bar" ,
538
+ ResourceName : "service.foo.bar" ,
539
+ Owner : "User" ,
540
+ },
541
+ }
542
+
543
+ statusCode := codes .ResourceExhausted
544
+ msg := "you are being throttled"
545
+ st , err := status .New (statusCode , msg ).WithDetails (errDetails ... )
546
+ if err != nil {
547
+ t .Fatal (err )
548
+ }
549
+
550
+ stBytes , err := proto .Marshal (st .Proto ())
551
+ if err != nil {
552
+ t .Fatal (err )
553
+ }
554
+ // Add mock stats handlers to exercise the stats handler code path.
555
+ statsHandler := & mockStatsHandler {
556
+ rpcStatsCh : make (chan stats.RPCStats , 2 ),
557
+ }
558
+ hst := newHandleStreamTest (t , []stats.Handler {statsHandler })
509
559
handleStream := func (s * ServerStream ) {
510
560
if err := s .SendHeader (metadata .New (map [string ]string {})); err != nil {
511
561
t .Error (err )
@@ -533,13 +583,24 @@ func (s) TestHandlerTransport_HandleStreams_ErrDetails(t *testing.T) {
533
583
}
534
584
535
585
checkHeaderAndTrailer (t , hst .rw , wantHeader , wantTrailer )
586
+ wantStatTypes := []stats.RPCStats {& stats.OutHeader {}, & stats.OutTrailer {}}
587
+ for _ , wantType := range wantStatTypes {
588
+ select {
589
+ case <- ctx .Done ():
590
+ t .Fatal ("Context timed out waiting for statsHandler.HandleRPC() to be called." )
591
+ case s := <- statsHandler .rpcStatsCh :
592
+ if reflect .TypeOf (s ) != reflect .TypeOf (wantType ) {
593
+ t .Fatalf ("Received RPCStats of type %T, want %T" , s , wantType )
594
+ }
595
+ }
596
+ }
536
597
}
537
598
538
599
// TestHandlerTransport_Drain verifies that Drain() is not implemented
539
600
// by `serverHandlerTransport`.
540
601
func (s ) TestHandlerTransport_Drain (t * testing.T ) {
541
602
defer func () { recover () }()
542
- st := newHandleStreamTest (t )
603
+ st := newHandleStreamTest (t , nil )
543
604
st .ht .Drain ("whatever" )
544
605
t .Errorf ("serverHandlerTransport.Drain() should have panicked" )
545
606
}
0 commit comments