diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcAttributesGetter.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcAttributesGetter.java index 2e7d0a6e539d..6b0ae2669cc9 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcAttributesGetter.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcAttributesGetter.java @@ -24,4 +24,12 @@ public interface RpcAttributesGetter { @Nullable String getMethod(REQUEST request); + + default Long getRequestSize(REQUEST request) { + return null; + } + + default Long getResponseSize(REQUEST request) { + return null; + } } diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcClientMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcClientMetrics.java index e0e379410f4f..b3b6c211677d 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcClientMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcClientMetrics.java @@ -11,6 +11,8 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.LongHistogramBuilder; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.context.Context; import io.opentelemetry.context.ContextKey; @@ -35,6 +37,8 @@ public final class RpcClientMetrics implements OperationListener { private static final Logger logger = Logger.getLogger(RpcClientMetrics.class.getName()); private final DoubleHistogram clientDurationHistogram; + private final LongHistogram clientRequestSize; + private final LongHistogram clientResponseSize; private RpcClientMetrics(Meter meter) { DoubleHistogramBuilder durationBuilder = @@ -44,6 +48,24 @@ private RpcClientMetrics(Meter meter) { .setUnit("ms"); RpcMetricsAdvice.applyClientDurationAdvice(durationBuilder); clientDurationHistogram = durationBuilder.build(); + + LongHistogramBuilder requestSizeBuilder = + meter + .histogramBuilder("rpc.client.request.size") + .setUnit("By") + .setDescription("Measures the size of RPC request messages (uncompressed).") + .ofLongs(); + RpcMetricsAdvice.applyClientRequestSizeAdvice(requestSizeBuilder); + clientRequestSize = requestSizeBuilder.build(); + + LongHistogramBuilder responseSizeBuilder = + meter + .histogramBuilder("rpc.client.response.size") + .setUnit("By") + .setDescription("Measures the size of RPC response messages (uncompressed).") + .ofLongs(); + RpcMetricsAdvice.applyClientRequestSizeAdvice(responseSizeBuilder); + clientResponseSize = responseSizeBuilder.build(); } /** @@ -72,10 +94,21 @@ public void onEnd(Context context, Attributes endAttributes, long endNanos) { context); return; } + Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build(); clientDurationHistogram.record( - (endNanos - state.startTimeNanos()) / NANOS_PER_MS, - state.startAttributes().toBuilder().putAll(endAttributes).build(), - context); + (endNanos - state.startTimeNanos()) / NANOS_PER_MS, attributes, context); + + Long rpcClientRequestBodySize = + RpcMessageBodySizeUtil.getRpcRequestBodySize(endAttributes, state.startAttributes()); + if (rpcClientRequestBodySize != null) { + clientRequestSize.record(rpcClientRequestBodySize, attributes, context); + } + + Long rpcClientResponseBodySize = + RpcMessageBodySizeUtil.getRpcResponseBodySize(endAttributes, state.startAttributes()); + if (rpcClientResponseBodySize != null) { + clientResponseSize.record(rpcClientResponseBodySize, attributes, context); + } } @AutoValue diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcCommonAttributesExtractor.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcCommonAttributesExtractor.java index f731e703586c..13d4c9af75e6 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcCommonAttributesExtractor.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcCommonAttributesExtractor.java @@ -20,6 +20,10 @@ abstract class RpcCommonAttributesExtractor static final AttributeKey RPC_METHOD = AttributeKey.stringKey("rpc.method"); static final AttributeKey RPC_SERVICE = AttributeKey.stringKey("rpc.service"); static final AttributeKey RPC_SYSTEM = AttributeKey.stringKey("rpc.system"); + static final AttributeKey RPC_REQUEST_BODY_SIZE = + AttributeKey.longKey("rpc.request.body.size"); + static final AttributeKey RPC_RESPONSE_BODY_SIZE = + AttributeKey.longKey("rpc.response.body.size"); private final RpcAttributesGetter getter; @@ -41,6 +45,24 @@ public final void onEnd( REQUEST request, @Nullable RESPONSE response, @Nullable Throwable error) { - // No response attributes + Long requestSize = getter.getRequestSize(request); + Long responseSize = getter.getResponseSize(request); + if (this instanceof RpcClientAttributesExtractor) { + if (requestSize != null) { + internalSet(attributes, RPC_REQUEST_BODY_SIZE, requestSize); + } + if (responseSize != null) { + internalSet(attributes, RPC_RESPONSE_BODY_SIZE, responseSize); + } + } + + if (this instanceof RpcServerAttributesExtractor) { + if (requestSize != null) { + internalSet(attributes, RPC_REQUEST_BODY_SIZE, requestSize); + } + if (responseSize != null) { + internalSet(attributes, RPC_RESPONSE_BODY_SIZE, responseSize); + } + } } } diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcMessageBodySizeUtil.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcMessageBodySizeUtil.java new file mode 100644 index 000000000000..6ebc66e26e75 --- /dev/null +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcMessageBodySizeUtil.java @@ -0,0 +1,36 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.incubator.semconv.rpc; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import javax.annotation.Nullable; + +final class RpcMessageBodySizeUtil { + + @Nullable + static Long getRpcRequestBodySize(Attributes... attributesList) { + return getAttribute(RpcCommonAttributesExtractor.RPC_REQUEST_BODY_SIZE, attributesList); + } + + @Nullable + static Long getRpcResponseBodySize(Attributes... attributesList) { + return getAttribute(RpcCommonAttributesExtractor.RPC_RESPONSE_BODY_SIZE, attributesList); + } + + @Nullable + private static T getAttribute(AttributeKey key, Attributes... attributesList) { + for (Attributes attributes : attributesList) { + T value = attributes.get(key); + if (value != null) { + return value; + } + } + return null; + } + + private RpcMessageBodySizeUtil() {} +} diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcMetricsAdvice.java index 0a24cca3f39c..af317d876d62 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcMetricsAdvice.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcMetricsAdvice.java @@ -5,18 +5,32 @@ package io.opentelemetry.instrumentation.api.incubator.semconv.rpc; +import static java.util.Arrays.asList; + import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.incubator.metrics.ExtendedDoubleHistogramBuilder; +import io.opentelemetry.api.incubator.metrics.ExtendedLongHistogramBuilder; import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.LongHistogramBuilder; import io.opentelemetry.semconv.NetworkAttributes; import io.opentelemetry.semconv.ServerAttributes; -import java.util.Arrays; +import java.util.List; final class RpcMetricsAdvice { // copied from RpcIncubatingAttributes private static final AttributeKey RPC_GRPC_STATUS_CODE = AttributeKey.longKey("rpc.grpc.status_code"); + private static final List> RPC_METRICS_ATTRIBUTE_KEYS = + asList( + RpcCommonAttributesExtractor.RPC_SYSTEM, + RpcCommonAttributesExtractor.RPC_SERVICE, + RpcCommonAttributesExtractor.RPC_METHOD, + RPC_GRPC_STATUS_CODE, + NetworkAttributes.NETWORK_TYPE, + NetworkAttributes.NETWORK_TRANSPORT, + ServerAttributes.SERVER_ADDRESS, + ServerAttributes.SERVER_PORT); static void applyClientDurationAdvice(DoubleHistogramBuilder builder) { if (!(builder instanceof ExtendedDoubleHistogramBuilder)) { @@ -24,17 +38,7 @@ static void applyClientDurationAdvice(DoubleHistogramBuilder builder) { } // the list of recommended metrics attributes is from // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-metrics.md - ((ExtendedDoubleHistogramBuilder) builder) - .setAttributesAdvice( - Arrays.asList( - RpcCommonAttributesExtractor.RPC_SYSTEM, - RpcCommonAttributesExtractor.RPC_SERVICE, - RpcCommonAttributesExtractor.RPC_METHOD, - RPC_GRPC_STATUS_CODE, - NetworkAttributes.NETWORK_TYPE, - NetworkAttributes.NETWORK_TRANSPORT, - ServerAttributes.SERVER_ADDRESS, - ServerAttributes.SERVER_PORT)); + ((ExtendedDoubleHistogramBuilder) builder).setAttributesAdvice(RPC_METRICS_ATTRIBUTE_KEYS); } static void applyServerDurationAdvice(DoubleHistogramBuilder builder) { @@ -43,17 +47,25 @@ static void applyServerDurationAdvice(DoubleHistogramBuilder builder) { } // the list of recommended metrics attributes is from // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-metrics.md - ((ExtendedDoubleHistogramBuilder) builder) - .setAttributesAdvice( - Arrays.asList( - RpcCommonAttributesExtractor.RPC_SYSTEM, - RpcCommonAttributesExtractor.RPC_SERVICE, - RpcCommonAttributesExtractor.RPC_METHOD, - RPC_GRPC_STATUS_CODE, - NetworkAttributes.NETWORK_TYPE, - NetworkAttributes.NETWORK_TRANSPORT, - ServerAttributes.SERVER_ADDRESS, - ServerAttributes.SERVER_PORT)); + ((ExtendedDoubleHistogramBuilder) builder).setAttributesAdvice(RPC_METRICS_ATTRIBUTE_KEYS); + } + + static void applyClientRequestSizeAdvice(LongHistogramBuilder builder) { + if (!(builder instanceof ExtendedLongHistogramBuilder)) { + return; + } + // the list of recommended metrics attributes is from + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-metrics.md + ((ExtendedLongHistogramBuilder) builder).setAttributesAdvice(RPC_METRICS_ATTRIBUTE_KEYS); + } + + static void applyServerRequestSizeAdvice(LongHistogramBuilder builder) { + if (!(builder instanceof ExtendedLongHistogramBuilder)) { + return; + } + // the list of recommended metrics attributes is from + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-metrics.md + ((ExtendedLongHistogramBuilder) builder).setAttributesAdvice(RPC_METRICS_ATTRIBUTE_KEYS); } private RpcMetricsAdvice() {} diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcServerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcServerMetrics.java index 526db167bdfd..0755e315dc85 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcServerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcServerMetrics.java @@ -11,6 +11,8 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.LongHistogramBuilder; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.context.Context; import io.opentelemetry.context.ContextKey; @@ -35,6 +37,8 @@ public final class RpcServerMetrics implements OperationListener { private static final Logger logger = Logger.getLogger(RpcServerMetrics.class.getName()); private final DoubleHistogram serverDurationHistogram; + private final LongHistogram serverRequestSize; + private final LongHistogram serverResponseSize; private RpcServerMetrics(Meter meter) { DoubleHistogramBuilder durationBuilder = @@ -44,6 +48,24 @@ private RpcServerMetrics(Meter meter) { .setUnit("ms"); RpcMetricsAdvice.applyServerDurationAdvice(durationBuilder); serverDurationHistogram = durationBuilder.build(); + + LongHistogramBuilder requestSizeBuilder = + meter + .histogramBuilder("rpc.server.request.size") + .setUnit("By") + .setDescription("Measures the size of RPC request messages (uncompressed).") + .ofLongs(); + RpcMetricsAdvice.applyServerRequestSizeAdvice(requestSizeBuilder); + serverRequestSize = requestSizeBuilder.build(); + + LongHistogramBuilder responseSizeBuilder = + meter + .histogramBuilder("rpc.server.response.size") + .setUnit("By") + .setDescription("Measures the size of RPC response messages (uncompressed).") + .ofLongs(); + RpcMetricsAdvice.applyServerRequestSizeAdvice(responseSizeBuilder); + serverResponseSize = responseSizeBuilder.build(); } /** @@ -72,10 +94,21 @@ public void onEnd(Context context, Attributes endAttributes, long endNanos) { context); return; } + Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build(); serverDurationHistogram.record( - (endNanos - state.startTimeNanos()) / NANOS_PER_MS, - state.startAttributes().toBuilder().putAll(endAttributes).build(), - context); + (endNanos - state.startTimeNanos()) / NANOS_PER_MS, attributes, context); + + Long rpcServerRequestBodySize = + RpcMessageBodySizeUtil.getRpcRequestBodySize(endAttributes, state.startAttributes()); + if (rpcServerRequestBodySize != null) { + serverRequestSize.record(rpcServerRequestBodySize, attributes, context); + } + + Long rpcServerResponseBodySize = + RpcMessageBodySizeUtil.getRpcResponseBodySize(endAttributes, state.startAttributes()); + if (rpcServerResponseBodySize != null) { + serverResponseSize.record(rpcServerResponseBodySize, attributes, context); + } } @AutoValue diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcClientMetricsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcClientMetricsTest.java index 768a67de0bb2..a75c9b55e7aa 100644 --- a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcClientMetricsTest.java +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcClientMetricsTest.java @@ -46,6 +46,8 @@ void collectsMetrics() { .put(ServerAttributes.SERVER_PORT, 8080) .put(NetworkAttributes.NETWORK_TRANSPORT, "tcp") .put(NetworkAttributes.NETWORK_TYPE, "ipv4") + .put(RpcCommonAttributesExtractor.RPC_REQUEST_BODY_SIZE, 10) + .put(RpcCommonAttributesExtractor.RPC_RESPONSE_BODY_SIZE, 20) .build(); Attributes responseAttributes2 = @@ -76,6 +78,62 @@ void collectsMetrics() { assertThat(metricReader.collectAllMetrics()) .satisfiesExactlyInAnyOrder( + metric -> + assertThat(metric) + .hasName("rpc.client.response.size") + .hasUnit("By") + .hasDescription("Measures the size of RPC response messages (uncompressed).") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSum(20 /* bytes */) + .hasAttributesSatisfying( + equalTo(RpcIncubatingAttributes.RPC_SYSTEM, "grpc"), + equalTo( + RpcIncubatingAttributes.RPC_SERVICE, + "myservice.EchoService"), + equalTo( + RpcIncubatingAttributes.RPC_METHOD, + "exampleMethod"), + equalTo(ServerAttributes.SERVER_ADDRESS, "example.com"), + equalTo(ServerAttributes.SERVER_PORT, 8080), + equalTo(NetworkAttributes.NETWORK_TRANSPORT, "tcp"), + equalTo(NetworkAttributes.NETWORK_TYPE, "ipv4")) + .hasExemplarsSatisfying( + exemplar -> + exemplar + .hasTraceId("ff01020304050600ff0a0b0c0d0e0f00") + .hasSpanId("090a0b0c0d0e0f00")))), + metric -> + assertThat(metric) + .hasName("rpc.client.request.size") + .hasUnit("By") + .hasDescription("Measures the size of RPC request messages (uncompressed).") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSum(10 /* bytes */) + .hasAttributesSatisfying( + equalTo(RpcIncubatingAttributes.RPC_SYSTEM, "grpc"), + equalTo( + RpcIncubatingAttributes.RPC_SERVICE, + "myservice.EchoService"), + equalTo( + RpcIncubatingAttributes.RPC_METHOD, + "exampleMethod"), + equalTo(ServerAttributes.SERVER_ADDRESS, "example.com"), + equalTo(ServerAttributes.SERVER_PORT, 8080), + equalTo(NetworkAttributes.NETWORK_TRANSPORT, "tcp"), + equalTo(NetworkAttributes.NETWORK_TYPE, "ipv4")) + .hasExemplarsSatisfying( + exemplar -> + exemplar + .hasTraceId("ff01020304050600ff0a0b0c0d0e0f00") + .hasSpanId("090a0b0c0d0e0f00")))), metric -> assertThat(metric) .hasName("rpc.client.duration") diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcServerMetricsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcServerMetricsTest.java index bb8b59082aa5..3a4a322b5f2e 100644 --- a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcServerMetricsTest.java +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/rpc/RpcServerMetricsTest.java @@ -47,6 +47,8 @@ void collectsMetrics() { .put(NetworkAttributes.NETWORK_LOCAL_ADDRESS, "127.0.0.1") .put(NetworkAttributes.NETWORK_TRANSPORT, "tcp") .put(NetworkAttributes.NETWORK_TYPE, "ipv4") + .put(RpcCommonAttributesExtractor.RPC_REQUEST_BODY_SIZE, 10) + .put(RpcCommonAttributesExtractor.RPC_RESPONSE_BODY_SIZE, 20) .build(); Attributes responseAttributes2 = @@ -88,6 +90,60 @@ void collectsMetrics() { point -> point .hasSum(150 /* millis */) + .hasAttributesSatisfying( + equalTo(RpcIncubatingAttributes.RPC_SYSTEM, "grpc"), + equalTo( + RpcIncubatingAttributes.RPC_SERVICE, + "myservice.EchoService"), + equalTo( + RpcIncubatingAttributes.RPC_METHOD, + "exampleMethod"), + equalTo(ServerAttributes.SERVER_ADDRESS, "example.com"), + equalTo(NetworkAttributes.NETWORK_TRANSPORT, "tcp"), + equalTo(NetworkAttributes.NETWORK_TYPE, "ipv4")) + .hasExemplarsSatisfying( + exemplar -> + exemplar + .hasTraceId("ff01020304050600ff0a0b0c0d0e0f00") + .hasSpanId("090a0b0c0d0e0f00")))), + metric -> + assertThat(metric) + .hasName("rpc.server.response.size") + .hasUnit("By") + .hasDescription("Measures the size of RPC response messages (uncompressed).") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSum(20 /* bytes */) + .hasAttributesSatisfying( + equalTo(RpcIncubatingAttributes.RPC_SYSTEM, "grpc"), + equalTo( + RpcIncubatingAttributes.RPC_SERVICE, + "myservice.EchoService"), + equalTo( + RpcIncubatingAttributes.RPC_METHOD, + "exampleMethod"), + equalTo(ServerAttributes.SERVER_ADDRESS, "example.com"), + equalTo(NetworkAttributes.NETWORK_TRANSPORT, "tcp"), + equalTo(NetworkAttributes.NETWORK_TYPE, "ipv4")) + .hasExemplarsSatisfying( + exemplar -> + exemplar + .hasTraceId("ff01020304050600ff0a0b0c0d0e0f00") + .hasSpanId("090a0b0c0d0e0f00")))), + metric -> + assertThat(metric) + .hasName("rpc.server.request.size") + .hasUnit("By") + .hasDescription("Measures the size of RPC request messages (uncompressed).") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSum(10 /* bytes */) .hasAttributesSatisfying( equalTo(RpcIncubatingAttributes.RPC_SYSTEM, "grpc"), equalTo( diff --git a/instrumentation/armeria/armeria-grpc-1.14/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/armeria/grpc/v1_14/ArmeriaGrpcTest.java b/instrumentation/armeria/armeria-grpc-1.14/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/armeria/grpc/v1_14/ArmeriaGrpcTest.java index 188dcbcc492a..9dddec381680 100644 --- a/instrumentation/armeria/armeria-grpc-1.14/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/armeria/grpc/v1_14/ArmeriaGrpcTest.java +++ b/instrumentation/armeria/armeria-grpc-1.14/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/armeria/grpc/v1_14/ArmeriaGrpcTest.java @@ -22,6 +22,7 @@ import example.Helloworld; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.semconv.incubating.MessageIncubatingAttributes; @@ -33,6 +34,11 @@ class ArmeriaGrpcTest { @RegisterExtension static final AgentInstrumentationExtension testing = AgentInstrumentationExtension.create(); + private static final AttributeKey RPC_REQUEST_BODY_SIZE = + AttributeKey.longKey("rpc.request.body.size"); + private static final AttributeKey RPC_RESPONSE_BODY_SIZE = + AttributeKey.longKey("rpc.response.body.size"); + @RegisterExtension static final ServerExtension server = new ServerExtension() { @@ -63,12 +69,12 @@ void grpcInstrumentation() { GreeterGrpc.GreeterBlockingStub client = GrpcClients.builder(server.httpUri()).build(GreeterGrpc.GreeterBlockingStub.class); - Helloworld.Response response = - testing.runWithSpan( - "parent", - () -> client.sayHello(Helloworld.Request.newBuilder().setName("test").build())); + Helloworld.Request request = Helloworld.Request.newBuilder().setName("test").build(); + Helloworld.Response response = testing.runWithSpan("parent", () -> client.sayHello(request)); assertThat(response.getMessage()).isEqualTo("Hello test"); + int requestSerializedSize = request.getSerializedSize(); + int responseSerializedSize = response.getSerializedSize(); testing.waitAndAssertTraces( trace -> @@ -83,6 +89,8 @@ void grpcInstrumentation() { equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_RESPONSE_BODY_SIZE, responseSerializedSize), + equalTo(RPC_REQUEST_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "127.0.0.1"), equalTo(SERVER_PORT, (long) server.httpPort())) .hasEventsSatisfyingExactly( @@ -108,6 +116,8 @@ void grpcInstrumentation() { equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_REQUEST_BODY_SIZE, responseSerializedSize), + equalTo(RPC_RESPONSE_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "127.0.0.1"), equalTo(SERVER_PORT, server.httpPort())) .hasEventsSatisfyingExactly( diff --git a/instrumentation/grpc-1.6/javaagent/build.gradle.kts b/instrumentation/grpc-1.6/javaagent/build.gradle.kts index 671fdb315e90..11cc03fdf78c 100644 --- a/instrumentation/grpc-1.6/javaagent/build.gradle.kts +++ b/instrumentation/grpc-1.6/javaagent/build.gradle.kts @@ -8,6 +8,7 @@ muzzle { module.set("grpc-core") versions.set("[1.6.0,)") assertInverse.set(true) + extraDependency("io.grpc:grpc-protobuf:1.5.0") } } @@ -21,7 +22,7 @@ dependencies { testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent")) testLibrary("io.grpc:grpc-netty:$grpcVersion") - testLibrary("io.grpc:grpc-protobuf:$grpcVersion") + library("io.grpc:grpc-protobuf:$grpcVersion") testLibrary("io.grpc:grpc-services:$grpcVersion") testLibrary("io.grpc:grpc-stub:$grpcVersion") diff --git a/instrumentation/grpc-1.6/library/build.gradle.kts b/instrumentation/grpc-1.6/library/build.gradle.kts index 00ace4566efb..f35591d4f0b3 100644 --- a/instrumentation/grpc-1.6/library/build.gradle.kts +++ b/instrumentation/grpc-1.6/library/build.gradle.kts @@ -7,9 +7,9 @@ val grpcVersion = "1.6.0" dependencies { library("io.grpc:grpc-core:$grpcVersion") + library("io.grpc:grpc-protobuf:$grpcVersion") testLibrary("io.grpc:grpc-netty:$grpcVersion") - testLibrary("io.grpc:grpc-protobuf:$grpcVersion") testLibrary("io.grpc:grpc-services:$grpcVersion") testLibrary("io.grpc:grpc-stub:$grpcVersion") diff --git a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/BodySizeUtil.java b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/BodySizeUtil.java new file mode 100644 index 000000000000..c878882386e9 --- /dev/null +++ b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/BodySizeUtil.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.grpc.v1_6; + +import com.google.protobuf.MessageLite; + +final class BodySizeUtil { + + static Long getBodySize(T message) { + if (message instanceof MessageLite) { + return (long) ((MessageLite) message).getSerializedSize(); + } else { + // Message is not a protobuf message + return null; + } + } + + private BodySizeUtil() {} +} diff --git a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcRequest.java b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcRequest.java index 89f6746e1ceb..926799191bb2 100644 --- a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcRequest.java +++ b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcRequest.java @@ -20,6 +20,9 @@ public final class GrpcRequest { private volatile int logicalPort = -1; @Nullable private volatile SocketAddress peerSocketAddress; + private Long requestSize; + private Long responseSize; + GrpcRequest( MethodDescriptor method, @Nullable Metadata metadata, @@ -78,4 +81,20 @@ public SocketAddress getPeerSocketAddress() { void setPeerSocketAddress(SocketAddress peerSocketAddress) { this.peerSocketAddress = peerSocketAddress; } + + public Long getRequestSize() { + return requestSize; + } + + public void setRequestSize(Long requestSize) { + this.requestSize = requestSize; + } + + public Long getResponseSize() { + return responseSize; + } + + public void setResponseSize(Long responseSize) { + this.responseSize = responseSize; + } } diff --git a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcRpcAttributesGetter.java b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcRpcAttributesGetter.java index ee7214bb1a87..d114b0090cf2 100644 --- a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcRpcAttributesGetter.java +++ b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcRpcAttributesGetter.java @@ -43,6 +43,16 @@ public String getMethod(GrpcRequest request) { return fullMethodName.substring(slashIndex + 1); } + @Override + public Long getRequestSize(GrpcRequest request) { + return request.getRequestSize(); + } + + @Override + public Long getResponseSize(GrpcRequest request) { + return request.getResponseSize(); + } + List metadataValue(GrpcRequest request, String key) { if (request.getMetadata() == null) { return Collections.emptyList(); diff --git a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java index 7e89b193b54a..b2a0deb22c62 100644 --- a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java +++ b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java @@ -126,6 +126,7 @@ public void start(Listener responseListener, Metadata headers) { @Override public void sendMessage(REQUEST message) { + request.setRequestSize(BodySizeUtil.getBodySize(message)); try (Scope ignored = context.makeCurrent()) { super.sendMessage(message); } catch (Throwable e) { @@ -159,6 +160,7 @@ final class TracingClientCallListener @Override public void onMessage(RESPONSE message) { + request.setResponseSize(BodySizeUtil.getBodySize(message)); long messageId = RECEIVED_MESSAGE_ID_UPDATER.incrementAndGet(TracingClientCall.this); if (emitMessageEvents) { Attributes attributes = Attributes.of(MESSAGE_TYPE, RECEIVED, MESSAGE_ID, messageId); diff --git a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingServerInterceptor.java b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingServerInterceptor.java index d52c332ec50a..688099084146 100644 --- a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingServerInterceptor.java +++ b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingServerInterceptor.java @@ -119,6 +119,7 @@ TracingServerCallListener start(Metadata headers, ServerCallHandler RPC_REQUEST_BODY_SIZE = + AttributeKey.longKey("rpc.request.body.size"); + private static final AttributeKey RPC_RESPONSE_BODY_SIZE = + AttributeKey.longKey("rpc.response.body.size"); + protected abstract ServerBuilder configureServer(ServerBuilder server); protected abstract ManagedChannelBuilder configureClient(ManagedChannelBuilder client); @@ -159,6 +165,9 @@ public void onCompleted() { .sorted() .collect(Collectors.toList())); + Helloworld.Response message = Helloworld.Response.newBuilder().setMessage("call " + 1).build(); + int requestSerializedSize = message.getSerializedSize(); + List> clientEvents = new ArrayList<>(); List> serverEvents = new ArrayList<>(); for (long i = 0; i < clientMessageCount; i++) { @@ -227,6 +236,8 @@ public void onCompleted() { equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "Conversation"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_RESPONSE_BODY_SIZE, requestSerializedSize), + equalTo(RPC_REQUEST_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, (long) server.getPort()))) .satisfies( @@ -242,6 +253,8 @@ public void onCompleted() { equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "Conversation"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_REQUEST_BODY_SIZE, requestSerializedSize), + equalTo(RPC_RESPONSE_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, server.getPort()), equalTo(NETWORK_TYPE, "ipv4"), diff --git a/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java b/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java index 31db3557c5b8..9a9075f9a067 100644 --- a/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java +++ b/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java @@ -85,6 +85,11 @@ public abstract class AbstractGrpcTest { protected static final String SERVER_REQUEST_METADATA_KEY = "some-server-key"; + private static final AttributeKey RPC_REQUEST_BODY_SIZE = + AttributeKey.longKey("rpc.request.body.size"); + private static final AttributeKey RPC_RESPONSE_BODY_SIZE = + AttributeKey.longKey("rpc.response.body.size"); + protected abstract ServerBuilder configureServer(ServerBuilder server); protected abstract ManagedChannelBuilder configureClient(ManagedChannelBuilder client); @@ -127,7 +132,9 @@ public void sayHello( "parent", () -> client.sayHello(Helloworld.Request.newBuilder().setName(paramName).build())); - assertThat(response.getMessage()).isEqualTo("Hello " + paramName); + String prefix = "Hello "; + assertThat(response.getMessage()).isEqualTo(prefix + paramName); + int responseSerializedSize = response.getSerializedSize(); testing() .waitAndAssertTraces( @@ -144,6 +151,10 @@ public void sayHello( equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_RESPONSE_BODY_SIZE, responseSerializedSize), + equalTo( + RPC_REQUEST_BODY_SIZE, + responseSerializedSize - prefix.length()), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, (long) server.getPort()))) .hasEventsSatisfyingExactly( @@ -171,6 +182,10 @@ public void sayHello( equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo( + RPC_RESPONSE_BODY_SIZE, + responseSerializedSize - prefix.length()), + equalTo(RPC_REQUEST_BODY_SIZE, responseSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, server.getPort()), equalTo(NETWORK_TYPE, "ipv4"), @@ -260,13 +275,14 @@ public void sayHello( AtomicReference response = new AtomicReference<>(); AtomicReference error = new AtomicReference<>(); + Helloworld.Request request = Helloworld.Request.newBuilder().setName("test").build(); testing() .runWithSpan( "parent", () -> { ListenableFuture future = Futures.transform( - client.sayHello(Helloworld.Request.newBuilder().setName("test").build()), + client.sayHello(request), resp -> { testing().runWithSpan("child", () -> {}); return resp; @@ -280,7 +296,10 @@ public void sayHello( }); assertThat(error).hasValue(null); - assertThat(response.get().getMessage()).isEqualTo("Hello test"); + Helloworld.Response res = response.get(); + assertThat(res.getMessage()).isEqualTo("Hello test"); + int requestSerializedSize = request.getSerializedSize(); + int responseSerializedSize = res.getSerializedSize(); testing() .waitAndAssertTraces( @@ -297,6 +316,8 @@ public void sayHello( equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_RESPONSE_BODY_SIZE, responseSerializedSize), + equalTo(RPC_REQUEST_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, (long) server.getPort()))) .hasEventsSatisfyingExactly( @@ -324,6 +345,8 @@ public void sayHello( equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_REQUEST_BODY_SIZE, responseSerializedSize), + equalTo(RPC_RESPONSE_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, server.getPort()), equalTo(NETWORK_TYPE, "ipv4"), @@ -418,12 +441,13 @@ public void sayHello( AtomicReference response = new AtomicReference<>(); AtomicReference error = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); + Helloworld.Request request = Helloworld.Request.newBuilder().setName("test").build(); testing() .runWithSpan( "parent", () -> client.sayHello( - Helloworld.Request.newBuilder().setName("test").build(), + request, new StreamObserver() { @Override public void onNext(Helloworld.Response r) { @@ -445,7 +469,10 @@ public void onCompleted() { latch.await(10, TimeUnit.SECONDS); assertThat(error).hasValue(null); - assertThat(response.get().getMessage()).isEqualTo("Hello test"); + Helloworld.Response res = response.get(); + assertThat(res.getMessage()).isEqualTo("Hello test"); + int requestSerializedSize = request.getSerializedSize(); + int responseSerializedSize = res.getSerializedSize(); testing() .waitAndAssertTraces( @@ -462,6 +489,8 @@ public void onCompleted() { equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_RESPONSE_BODY_SIZE, responseSerializedSize), + equalTo(RPC_REQUEST_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, (long) server.getPort()))) .hasEventsSatisfyingExactly( @@ -489,6 +518,8 @@ public void onCompleted() { equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_REQUEST_BODY_SIZE, responseSerializedSize), + equalTo(RPC_RESPONSE_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, server.getPort()), equalTo(NETWORK_TYPE, "ipv4"), @@ -577,8 +608,9 @@ public void sayHello( GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel); - assertThatThrownBy( - () -> client.sayHello(Helloworld.Request.newBuilder().setName("error").build())) + Helloworld.Request request = Helloworld.Request.newBuilder().setName("error").build(); + int requestSerializedSize = request.getSerializedSize(); + assertThatThrownBy(() -> client.sayHello(request)) .isInstanceOfSatisfying( StatusRuntimeException.class, t -> { @@ -602,6 +634,7 @@ public void sayHello( equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) status.getCode().value()), + equalTo(RPC_REQUEST_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, (long) server.getPort()))) .hasEventsSatisfyingExactly( @@ -622,6 +655,7 @@ public void sayHello( equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) status.getCode().value()), + equalTo(RPC_RESPONSE_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, server.getPort()), equalTo(NETWORK_TYPE, "ipv4"), @@ -705,9 +739,9 @@ public void sayHello( closer.add(() -> server.shutdownNow().awaitTermination()); GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel); - - assertThatThrownBy( - () -> client.sayHello(Helloworld.Request.newBuilder().setName("error").build())) + Helloworld.Request request = Helloworld.Request.newBuilder().setName("error").build(); + int requestSerializedSize = request.getSerializedSize(); + assertThatThrownBy(() -> client.sayHello(request)) .isInstanceOfSatisfying( StatusRuntimeException.class, t -> { @@ -741,6 +775,7 @@ public void sayHello( equalTo( RPC_GRPC_STATUS_CODE, (long) Status.UNKNOWN.getCode().value()), + equalTo(RPC_REQUEST_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, (long) server.getPort()))) .hasEventsSatisfyingExactly( @@ -761,6 +796,7 @@ public void sayHello( equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.UNKNOWN.value()), + equalTo(RPC_RESPONSE_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, server.getPort()), equalTo(NETWORK_TYPE, "ipv4"), @@ -918,12 +954,13 @@ public ClientCall interceptCall( AtomicReference response = new AtomicReference<>(); AtomicReference error = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); + Helloworld.Request request = Helloworld.Request.newBuilder().setName("test").build(); testing() .runWithSpan( "parent", () -> client.sayHello( - Helloworld.Request.newBuilder().setName("test").build(), + request, new StreamObserver() { @Override public void onNext(Helloworld.Response r) { @@ -954,7 +991,10 @@ public void onCompleted() { latch.await(10, TimeUnit.SECONDS); assertThat(error).hasValue(null); - assertThat(response.get().getMessage()).isEqualTo("Hello test"); + Helloworld.Response res = response.get(); + assertThat(res.getMessage()).isEqualTo("Hello test"); + int requestSerializedSize = request.getSerializedSize(); + int responseSerializedSize = res.getSerializedSize(); testing() .waitAndAssertTraces( @@ -971,6 +1011,8 @@ public void onCompleted() { equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_RESPONSE_BODY_SIZE, responseSerializedSize), + equalTo(RPC_REQUEST_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, (long) server.getPort()))) .hasEventsSatisfyingExactly( @@ -998,6 +1040,8 @@ public void onCompleted() { equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_RESPONSE_BODY_SIZE, requestSerializedSize), + equalTo(RPC_REQUEST_BODY_SIZE, responseSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, server.getPort()), equalTo(NETWORK_TYPE, "ipv4"), @@ -1041,17 +1085,20 @@ public void sayMultipleHello( GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel); IllegalStateException thrown = new IllegalStateException("illegal"); + AtomicReference response = new AtomicReference<>(); AtomicReference error = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); + Helloworld.Request request = Helloworld.Request.newBuilder().setName("test").build(); testing() .runWithSpan( "parent", () -> client.sayMultipleHello( - Helloworld.Request.newBuilder().setName("test").build(), + request, new StreamObserver() { @Override public void onNext(Helloworld.Response r) { + response.set(r); throw thrown; } @@ -1071,6 +1118,9 @@ public void onCompleted() { latch.await(10, TimeUnit.SECONDS); assertThat(error.get()).isNotNull(); + int requestSerializedSize = request.getSerializedSize(); + Helloworld.Response res = response.get(); + int responseSerializedSize = res.getSerializedSize(); testing() .waitAndAssertTraces( @@ -1089,6 +1139,8 @@ public void onCompleted() { equalTo(RPC_METHOD, "SayMultipleHello"), equalTo( RPC_GRPC_STATUS_CODE, (long) Status.Code.CANCELLED.value()), + equalTo(RPC_RESPONSE_BODY_SIZE, responseSerializedSize), + equalTo(RPC_REQUEST_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, (long) server.getPort()))) .hasEventsSatisfying( @@ -1120,6 +1172,8 @@ public void onCompleted() { equalTo(SERVER_PORT, server.getPort()), equalTo(NETWORK_TYPE, "ipv4"), equalTo(NETWORK_PEER_ADDRESS, "127.0.0.1"), + equalTo(RPC_REQUEST_BODY_SIZE, responseSerializedSize), + equalTo(RPC_RESPONSE_BODY_SIZE, requestSerializedSize), satisfies(NETWORK_PEER_PORT, val -> assertThat(val).isNotNull())) .hasEventsSatisfyingExactly( event -> @@ -1175,17 +1229,21 @@ public void onCompleted() { } }); - request.onNext( + ServerReflectionRequest serverReflectionRequest = ServerReflectionRequest.newBuilder() .setListServices("The content will not be checked?") - .build()); + .build(); + request.onNext(serverReflectionRequest); request.onCompleted(); latch.await(10, TimeUnit.SECONDS); assertThat(error).hasValue(null); - assertThat(response.get().getListServicesResponse().getService(0).getName()) + ServerReflectionResponse serverReflectionResponse = response.get(); + assertThat(serverReflectionResponse.getListServicesResponse().getService(0).getName()) .isEqualTo("grpc.reflection.v1alpha.ServerReflection"); + int requestSerializedSize = serverReflectionRequest.getSerializedSize(); + int responseSerializedSize = serverReflectionResponse.getSerializedSize(); testing() .waitAndAssertTraces( @@ -1203,6 +1261,8 @@ public void onCompleted() { RPC_SERVICE, "grpc.reflection.v1alpha.ServerReflection"), equalTo(RPC_METHOD, "ServerReflectionInfo"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_RESPONSE_BODY_SIZE, responseSerializedSize), + equalTo(RPC_REQUEST_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, (long) server.getPort()))) .hasEventsSatisfyingExactly( @@ -1231,6 +1291,8 @@ public void onCompleted() { equalTo(RPC_SERVICE, "grpc.reflection.v1alpha.ServerReflection"), equalTo(RPC_METHOD, "ServerReflectionInfo"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_RESPONSE_BODY_SIZE, requestSerializedSize), + equalTo(RPC_REQUEST_BODY_SIZE, responseSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, server.getPort()), equalTo(NETWORK_TYPE, "ipv4"), @@ -1282,13 +1344,12 @@ public void sayHello( GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel); - Helloworld.Response response = - testing() - .runWithSpan( - "parent", - () -> client.sayHello(Helloworld.Request.newBuilder().setName("test").build())); + Helloworld.Request request = Helloworld.Request.newBuilder().setName("test").build(); + Helloworld.Response response = testing().runWithSpan("parent", () -> client.sayHello(request)); assertThat(response.getMessage()).isEqualTo("Hello test"); + int requestSerializedSize = request.getSerializedSize(); + int responseSerializedSize = response.getSerializedSize(); testing() .waitAndAssertTraces( @@ -1305,6 +1366,8 @@ public void sayHello( equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_RESPONSE_BODY_SIZE, responseSerializedSize), + equalTo(RPC_REQUEST_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, (long) server.getPort()))) .hasEventsSatisfyingExactly( @@ -1332,6 +1395,8 @@ public void sayHello( equalTo(RPC_SERVICE, "example.Greeter"), equalTo(RPC_METHOD, "SayHello"), equalTo(RPC_GRPC_STATUS_CODE, (long) Status.Code.OK.value()), + equalTo(RPC_REQUEST_BODY_SIZE, responseSerializedSize), + equalTo(RPC_RESPONSE_BODY_SIZE, requestSerializedSize), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, server.getPort()), equalTo(NETWORK_TYPE, "ipv4"),