8282@ ThreadSafe
8383final class InProcessTransport implements ServerTransport , ConnectionClientTransport {
8484 private static final Logger log = Logger .getLogger (InProcessTransport .class .getName ());
85+ static boolean isEnabledSupportTracingMessageSizes =
86+ GrpcUtil .getFlag ("GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES" , false );
8587
8688 private final InternalLogId logId ;
8789 private final SocketAddress address ;
@@ -485,22 +487,25 @@ private void clientCancelled(Status status) {
485487
486488 @ Override
487489 public void writeMessage (InputStream message ) {
488- long messageLength ;
489- try {
490- if (assumedMessageSize != -1 ) {
491- messageLength = assumedMessageSize ;
492- } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream ) {
493- messageLength = message .available ();
494- } else {
495- InputStream oldMessage = message ;
496- byte [] payload = ByteStreams .toByteArray (message );
497- messageLength = payload .length ;
498- message = new ByteArrayInputStream (payload );
499- oldMessage .close ();
490+ long messageLength = 0 ;
491+ if (isEnabledSupportTracingMessageSizes ) {
492+ try {
493+ if (assumedMessageSize != -1 ) {
494+ messageLength = assumedMessageSize ;
495+ } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream ) {
496+ messageLength = message .available ();
497+ } else {
498+ InputStream oldMessage = message ;
499+ byte [] payload = ByteStreams .toByteArray (message );
500+ messageLength = payload .length ;
501+ message = new ByteArrayInputStream (payload );
502+ oldMessage .close ();
503+ }
504+ } catch (Exception e ) {
505+ throw new RuntimeException ("Error processing the message length" , e );
500506 }
501- } catch (Exception e ) {
502- throw new RuntimeException ("Error processing the message length" , e );
503507 }
508+
504509 synchronized (this ) {
505510 if (closed ) {
506511 return ;
@@ -509,11 +514,13 @@ public void writeMessage(InputStream message) {
509514 statsTraceCtx .outboundMessageSent (outboundSeqNo , -1 , -1 );
510515 clientStream .statsTraceCtx .inboundMessage (outboundSeqNo );
511516 clientStream .statsTraceCtx .inboundMessageRead (outboundSeqNo , -1 , -1 );
512- statsTraceCtx .outboundUncompressedSize (messageLength );
513- statsTraceCtx .outboundWireSize (messageLength );
514- // messageLength should be same at receiver's end as no actual wire is involved.
515- clientStream .statsTraceCtx .inboundUncompressedSize (messageLength );
516- clientStream .statsTraceCtx .inboundWireSize (messageLength );
517+ if (isEnabledSupportTracingMessageSizes ) {
518+ statsTraceCtx .outboundUncompressedSize (messageLength );
519+ statsTraceCtx .outboundWireSize (messageLength );
520+ // messageLength should be same at receiver's end as no actual wire is involved.
521+ clientStream .statsTraceCtx .inboundUncompressedSize (messageLength );
522+ clientStream .statsTraceCtx .inboundWireSize (messageLength );
523+ }
517524 outboundSeqNo ++;
518525 StreamListener .MessageProducer producer = new SingleMessageProducer (message );
519526 if (clientRequested > 0 ) {
@@ -523,7 +530,6 @@ public void writeMessage(InputStream message) {
523530 clientReceiveQueue .add (producer );
524531 }
525532 }
526-
527533 syncContext .drain ();
528534 }
529535
@@ -777,21 +783,23 @@ private void serverClosed(Status serverListenerStatus, Status serverTracerStatus
777783
778784 @ Override
779785 public void writeMessage (InputStream message ) {
780- long messageLength ;
781- try {
782- if (assumedMessageSize != -1 ) {
783- messageLength = assumedMessageSize ;
784- } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream ) {
785- messageLength = message .available ();
786- } else {
787- InputStream oldMessage = message ;
788- byte [] payload = ByteStreams .toByteArray (message );
789- messageLength = payload .length ;
790- message = new ByteArrayInputStream (payload );
791- oldMessage .close ();
786+ long messageLength = 0 ;
787+ if (isEnabledSupportTracingMessageSizes ) {
788+ try {
789+ if (assumedMessageSize != -1 ) {
790+ messageLength = assumedMessageSize ;
791+ } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream ) {
792+ messageLength = message .available ();
793+ } else {
794+ InputStream oldMessage = message ;
795+ byte [] payload = ByteStreams .toByteArray (message );
796+ messageLength = payload .length ;
797+ message = new ByteArrayInputStream (payload );
798+ oldMessage .close ();
799+ }
800+ } catch (Exception e ) {
801+ throw new RuntimeException ("Error processing the message length" , e );
792802 }
793- } catch (Exception e ) {
794- throw new RuntimeException ("Error processing the message length" , e );
795803 }
796804 synchronized (this ) {
797805 if (closed ) {
@@ -801,11 +809,13 @@ public void writeMessage(InputStream message) {
801809 statsTraceCtx .outboundMessageSent (outboundSeqNo , -1 , -1 );
802810 serverStream .statsTraceCtx .inboundMessage (outboundSeqNo );
803811 serverStream .statsTraceCtx .inboundMessageRead (outboundSeqNo , -1 , -1 );
804- statsTraceCtx .outboundUncompressedSize (messageLength );
805- statsTraceCtx .outboundWireSize (messageLength );
806- // messageLength should be same at receiver's end as no actual wire is involved.
807- serverStream .statsTraceCtx .inboundUncompressedSize (messageLength );
808- serverStream .statsTraceCtx .inboundWireSize (messageLength );
812+ if (isEnabledSupportTracingMessageSizes ) {
813+ statsTraceCtx .outboundUncompressedSize (messageLength );
814+ statsTraceCtx .outboundWireSize (messageLength );
815+ // messageLength should be same at receiver's end as no actual wire is involved.
816+ serverStream .statsTraceCtx .inboundUncompressedSize (messageLength );
817+ serverStream .statsTraceCtx .inboundWireSize (messageLength );
818+ }
809819 outboundSeqNo ++;
810820 StreamListener .MessageProducer producer = new SingleMessageProducer (message );
811821 if (serverRequested > 0 ) {
0 commit comments