diff --git a/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentation.java b/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentation.java index cde1a4d4d8c5..cfaed961c8e1 100644 --- a/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentation.java +++ b/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentation.java @@ -19,6 +19,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.opentelemetryapi.context.AgentContextStorage; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -69,14 +70,13 @@ public static boolean methodEnter() { return false; } + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void methodExit( + public static reactor.util.context.Context methodExit( @Advice.Argument(0) reactor.util.context.Context reactorContext, - @Advice.Argument(1) Context applicationContext, - @Advice.Return(readOnly = false) reactor.util.context.Context updatedReactorContext) { - updatedReactorContext = - ContextPropagationOperator.storeOpenTelemetryContext( - reactorContext, AgentContextStorage.getAgentContext(applicationContext)); + @Advice.Argument(1) Context applicationContext) { + return ContextPropagationOperator.storeOpenTelemetryContext( + reactorContext, AgentContextStorage.getAgentContext(applicationContext)); } } @@ -87,19 +87,19 @@ public static boolean methodEnter() { return false; } + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void methodExit( + public static Context methodExit( @Advice.Argument(0) reactor.util.context.Context reactorContext, - @Advice.Argument(1) Context defaultContext, - @Advice.Return(readOnly = false) Context applicationContext) { + @Advice.Argument(1) Context defaultContext) { io.opentelemetry.context.Context agentContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, null); if (agentContext == null) { - applicationContext = defaultContext; - } else { - applicationContext = AgentContextStorage.toApplicationContext(agentContext); + return defaultContext; } + + return AgentContextStorage.toApplicationContext(agentContext); } } diff --git a/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentationModule.java b/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentationModule.java index 1f40d19a3f1b..9dca1677c555 100644 --- a/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentationModule.java +++ b/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentationModule.java @@ -38,4 +38,9 @@ public String getModuleGroup() { // This module uses the api context bridge helpers, therefore must be in the same classloader return "opentelemetry-api-bridge"; } + + @Override + public boolean isIndyReady() { + return true; + } } diff --git a/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34Instrumentation.java b/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34Instrumentation.java index 4c4ab8e8bb7c..920697528b4c 100644 --- a/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34Instrumentation.java +++ b/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34Instrumentation.java @@ -18,6 +18,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.opentelemetryapi.context.AgentContextStorage; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -48,18 +49,18 @@ public static boolean methodEnter() { return false; } + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void methodExit( + public static Context methodExit( @Advice.Argument(0) reactor.util.context.ContextView reactorContext, - @Advice.Argument(1) Context defaultContext, - @Advice.Return(readOnly = false) Context applicationContext) { + @Advice.Argument(1) Context defaultContext) { io.opentelemetry.context.Context agentContext = ContextPropagationOperator.getOpenTelemetryContextFromContextView(reactorContext, null); if (agentContext == null) { - applicationContext = defaultContext; + return defaultContext; } else { - applicationContext = AgentContextStorage.toApplicationContext(agentContext); + return AgentContextStorage.toApplicationContext(agentContext); } } } diff --git a/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34InstrumentationModule.java b/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34InstrumentationModule.java index c5fabc4dccbf..ce14821c0e54 100644 --- a/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34InstrumentationModule.java +++ b/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34InstrumentationModule.java @@ -39,4 +39,9 @@ public String getModuleGroup() { // This module uses the api context bridge helpers, therefore must be in the same classloader return "opentelemetry-api-bridge"; } + + @Override + public boolean isIndyReady() { + return true; + } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java index f2857879ba74..526dce67631a 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java @@ -11,6 +11,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.core.publisher.Flux; @@ -33,11 +34,13 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class ReceiveAdvice { + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit(@Advice.Return(readOnly = false) Flux flux) { - if (!(flux instanceof TracingDisablingKafkaFlux)) { - flux = new TracingDisablingKafkaFlux<>(flux); + public static Flux onExit(@Advice.Return Flux flux) { + if (flux instanceof TracingDisablingKafkaFlux) { + return flux; } + return new TracingDisablingKafkaFlux<>(flux); } } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java index 81c8bf61d996..97a75252105f 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java @@ -11,6 +11,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.core.publisher.Flux; @@ -33,11 +34,13 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class CreateConsumerFluxAdvice { + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit(@Advice.Return(readOnly = false) Flux flux) { - if (!(flux instanceof TracingDisablingKafkaFlux)) { - flux = new TracingDisablingKafkaFlux<>(flux); + public static Flux onExit(@Advice.Return Flux flux) { + if (flux instanceof TracingDisablingKafkaFlux) { + return flux; } + return new TracingDisablingKafkaFlux<>(flux); } } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java index fcae8dc7c455..48f01c7cc35f 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java @@ -12,6 +12,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.kafka.receiver.KafkaReceiver; @@ -33,11 +34,13 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class CreateAdvice { + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit(@Advice.Return(readOnly = false) KafkaReceiver receiver) { - if (!(receiver instanceof InstrumentedKafkaReceiver)) { - receiver = new InstrumentedKafkaReceiver<>(receiver); + public static KafkaReceiver onExit(@Advice.Return KafkaReceiver receiver) { + if (receiver instanceof InstrumentedKafkaReceiver) { + return receiver; } + return new InstrumentedKafkaReceiver<>(receiver); } } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java index 47524c7fec23..60682bcfd536 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java @@ -10,10 +10,12 @@ import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule; import java.util.List; @AutoService(InstrumentationModule.class) -public class ReactorKafkaInstrumentationModule extends InstrumentationModule { +public class ReactorKafkaInstrumentationModule extends InstrumentationModule + implements ExperimentalInstrumentationModule { public ReactorKafkaInstrumentationModule() { super("reactor-kafka", "reactor-kafka-1.0"); @@ -27,4 +29,9 @@ public List typeInstrumentations() { new DefaultKafkaReceiverInstrumentation(), new ConsumerHandlerInstrumentation()); } + + @Override + public boolean isIndyReady() { + return true; + } } diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java index 509ddbdf9ed2..13305afa0865 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java @@ -17,6 +17,8 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.function.BiConsumer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; +import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.netty.Connection; @@ -74,92 +76,111 @@ public void transform(TypeTransformer transformer) { public static class CreateAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter(@Advice.Local("otelCallDepth") CallDepth callDepth) { - callDepth = CallDepth.forClass(HttpClient.class); + public static CallDepth onEnter() { + CallDepth callDepth = CallDepth.forClass(HttpClient.class); callDepth.getAndIncrement(); + return callDepth; } + @AssignReturned.ToReturned @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void stopSpan( + public static HttpClient stopSpan( @Advice.Thrown Throwable throwable, - @Advice.Return(readOnly = false) HttpClient client, - @Advice.Local("otelCallDepth") CallDepth callDepth) { + @Advice.Return HttpClient client, + @Advice.Enter CallDepth callDepth) { if (callDepth.decrementAndGet() == 0 && throwable == null) { - client = client.doOnRequest(new OnRequest()).mapConnect(new MapConnect()); + return client.doOnRequest(new OnRequest()).mapConnect(new MapConnect()); } + return client; } } @SuppressWarnings("unused") public static class OnRequestAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) BiConsumer callback) { if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnRequestDecorator(callback); + return new DecoratorFunctions.OnRequestDecorator(callback); } + return callback; } } @SuppressWarnings("unused") public static class OnRequestErrorAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) BiConsumer callback) { if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnRequestErrorDecorator(callback); + return new DecoratorFunctions.OnRequestErrorDecorator(callback); } + return callback; } } @SuppressWarnings("unused") public static class OnResponseAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback, + public static BiConsumer onEnter( + @Advice.Argument(0) BiConsumer callback, @Advice.Origin("#m") String methodName) { if (DecoratorFunctions.shouldDecorate(callback.getClass())) { boolean forceParentContext = methodName.equals("doAfterResponse"); - callback = new DecoratorFunctions.OnResponseDecorator(callback, forceParentContext); + return new DecoratorFunctions.OnResponseDecorator(callback, forceParentContext); } + return callback; } } @SuppressWarnings("unused") public static class OnResponseErrorAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) BiConsumer callback) { if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnResponseErrorDecorator(callback); + return new DecoratorFunctions.OnResponseErrorDecorator(callback); } + return callback; } } @SuppressWarnings("unused") public static class OnErrorAdvice { + @AssignReturned.ToArguments({ + @ToArgument(value = 0, index = 0), + @ToArgument(value = 1, index = 1) + }) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer requestCallback, - @Advice.Argument(value = 1, readOnly = false) - BiConsumer responseCallback) { + public static Object[] onEnter( + @Advice.Argument(0) + BiConsumer originalRequestCallback, + @Advice.Argument(1) + BiConsumer originalResponseCallback) { + + // intermediate variables needed for inlined instrumentation + BiConsumer requestCallback = + originalRequestCallback; + BiConsumer responseCallback = + originalResponseCallback; + if (DecoratorFunctions.shouldDecorate(requestCallback.getClass())) { requestCallback = new DecoratorFunctions.OnRequestErrorDecorator(requestCallback); } if (DecoratorFunctions.shouldDecorate(responseCallback.getClass())) { responseCallback = new DecoratorFunctions.OnResponseErrorDecorator(responseCallback); } + return new Object[] {requestCallback, responseCallback}; } } } diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java index 8287a59c74e9..cbdca1728ecd 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java @@ -11,6 +11,7 @@ import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule; import java.util.List; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -24,7 +25,8 @@ * HttpClient#doOnRequest(BiConsumer)} to pass context from the caller to Reactor to Netty. */ @AutoService(InstrumentationModule.class) -public class ReactorNettyInstrumentationModule extends InstrumentationModule { +public class ReactorNettyInstrumentationModule extends InstrumentationModule + implements ExperimentalInstrumentationModule { public ReactorNettyInstrumentationModule() { super("reactor-netty", "reactor-netty-0.9"); @@ -40,4 +42,9 @@ public ElementMatcher.Junction classLoaderMatcher() { public List typeInstrumentations() { return singletonList(new HttpClientInstrumentation()); } + + @Override + public boolean isIndyReady() { + return true; + } } diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientConnectInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientConnectInstrumentation.java index 089f0bedd12d..5bfc1a37ac0d 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientConnectInstrumentation.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientConnectInstrumentation.java @@ -11,6 +11,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.core.publisher.Mono; @@ -35,10 +36,10 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class ConnectAdvice { + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit( - @Advice.Return(readOnly = false) Mono connection, - @Advice.This HttpClient httpClient) { + public static Mono onExit( + @Advice.Return Mono connection, @Advice.This HttpClient httpClient) { HttpClientConfig config = httpClient.configuration(); // reactor-netty 1.0.x has a bug: the .mapConnect() function is not applied when deferred @@ -46,8 +47,9 @@ public static void onExit( // we're fixing this bug here, so that our instrumentation can safely add its own // .mapConnect() listener if (HttpClientConfigBuddy.hasDeferredConfig(config)) { - connection = HttpClientConfigBuddy.getConnector(config).apply(connection); + return HttpClientConfigBuddy.getConnector(config).apply(connection); } + return connection; } } } diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java index 9c1a49585ae8..f77680ab9c6f 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java @@ -16,6 +16,8 @@ import io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.DecoratorFunctions.PropagatedContext; import java.util.function.BiConsumer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; +import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.netty.Connection; @@ -79,102 +81,142 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class OnRequestAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) + BiConsumer originalCallBack) { + + // intermediate variable needed for inlined instrumentation + BiConsumer callback = originalCallBack; if (DecoratorFunctions.shouldDecorate(callback.getClass())) { // perform the callback with the client span active (instead of the parent) since this // callback occurs after the connection is made callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.CLIENT); } + return callback; } } @SuppressWarnings("unused") public static class AfterRequestAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) + BiConsumer originalCallBack) { + + // intermediate variable needed for inlined instrumentation + BiConsumer callback = originalCallBack; if (DecoratorFunctions.shouldDecorate(callback.getClass())) { // use client context after request is sent callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.CLIENT); } + return callback; } } @SuppressWarnings("unused") public static class OnRequestErrorAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) + BiConsumer originalCallBack) { + + // using an intermediate variable is required to keep the advice when inlined + BiConsumer callback = originalCallBack; if (DecoratorFunctions.shouldDecorate(callback.getClass())) { callback = new DecoratorFunctions.OnMessageErrorDecorator<>(callback, PropagatedContext.PARENT); } + return callback; } } @SuppressWarnings("unused") public static class OnResponseAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) + BiConsumer originalCallBack) { + + // intermediate variable needed for inlined instrumentation + BiConsumer callback = originalCallBack; if (DecoratorFunctions.shouldDecorate(callback.getClass())) { // use client context just when response status & headers are received callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.CLIENT); } + return callback; } } @SuppressWarnings("unused") public static class AfterResponseAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) + BiConsumer originalCallback) { + + // intermediate variable needed for inlined instrumentation + BiConsumer callback = originalCallback; if (DecoratorFunctions.shouldDecorate(callback.getClass())) { callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.PARENT); } + return callback; } } @SuppressWarnings("unused") public static class OnResponseErrorAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) + BiConsumer originalCallback) { + + // intermediate variable needed for inlined instrumentation + BiConsumer callback = originalCallback; if (DecoratorFunctions.shouldDecorate(callback.getClass())) { callback = new DecoratorFunctions.OnMessageErrorDecorator<>(callback, PropagatedContext.PARENT); } + return callback; } } @SuppressWarnings("unused") public static class OnErrorAdvice { + @AssignReturned.ToArguments({ + @ToArgument(value = 0, index = 0), + @ToArgument(value = 1, index = 1) + }) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer requestCallback, - @Advice.Argument(value = 1, readOnly = false) - BiConsumer responseCallback) { + public static Object[] onEnter( + @Advice.Argument(0) + BiConsumer originalRequestCallback, + @Advice.Argument(1) + BiConsumer originalResponseCallback) { + + // intermediate variables needed for inlined instrumentation + BiConsumer requestCallback = + originalRequestCallback; + BiConsumer responseCallback = + originalResponseCallback; if (DecoratorFunctions.shouldDecorate(requestCallback.getClass())) { requestCallback = @@ -186,6 +228,7 @@ public static void onEnter( new DecoratorFunctions.OnMessageErrorDecorator<>( responseCallback, PropagatedContext.PARENT); } + return new Object[] {requestCallback, responseCallback}; } } } diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java index 2517d4ccbc69..6e9a34ac0b9f 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java @@ -57,4 +57,9 @@ public List typeInstrumentations() { new ResponseReceiverInstrumentation(), new TransportConnectorInstrumentation()); } + + @Override + public boolean isIndyReady() { + return true; + } } diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java index 6752c6cd0bc4..a16f5ff4736d 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java @@ -5,10 +5,12 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; +import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.HttpResponse; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.incubator.builder.internal.DefaultHttpClientInstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.instrumentation.netty.common.v4_0.HttpRequestAndChannel; import io.opentelemetry.instrumentation.netty.common.v4_0.internal.client.NettyClientInstrumenterBuilderFactory; import io.opentelemetry.instrumentation.netty.common.v4_0.internal.client.NettyClientInstrumenterFactory; @@ -31,6 +33,10 @@ public final class ReactorNettySingletons { private static final Instrumenter INSTRUMENTER; private static final NettyConnectionInstrumenter CONNECTION_INSTRUMENTER; + public static final VirtualField + CONNECTION_REQUEST_AND_CONTEXT = + VirtualField.find(ChannelPromise.class, ConnectionRequestAndContext.class); + static { INSTRUMENTER = JavaagentHttpClientInstrumenters.create( diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java index 451c613df261..11ed5b54e0ea 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java @@ -16,7 +16,10 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.function.BiFunction; +import java.util.function.Function; +import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import org.reactivestreams.Publisher; @@ -70,187 +73,175 @@ public void transform(TypeTransformer transformer) { this.getClass().getName() + "$ResponseSingleAdvice"); } - @SuppressWarnings("unused") - public static class ResponseMonoAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) - public static HttpClient.ResponseReceiver onEnter( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.This HttpClient.ResponseReceiver receiver) { + public static class AdviceScope { + @Nullable private final HttpClient.ResponseReceiver modifiedReceiver; + private final CallDepth callDepth; + + /** + * Dedicated advice scope subclass that make instrumentation skip original method body using + * {@code skipOn = Runnable.class } which does not require to expose an extra type + */ + public static class SkipMethodBodyAdviceScope extends AdviceScope implements Runnable { + private SkipMethodBodyAdviceScope( + CallDepth callDepth, HttpClient.ResponseReceiver receiver) { + super(callDepth, receiver); + } - callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); - if (callDepth.getAndIncrement() > 0) { - // execute the original method on nested calls - return null; + @Override + public void run() { + // do nothing, only using Runnable as a marker interface to enable skipping without + // exposing the actual type } + } - // non-null value will skip the original method invocation - return HttpResponseReceiverInstrumenter.instrument(receiver); + private AdviceScope(CallDepth callDepth, @Nullable HttpClient.ResponseReceiver receiver) { + this.modifiedReceiver = receiver; + this.callDepth = callDepth; } - @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, - @Advice.Return(readOnly = false) Mono returnValue) { + public static AdviceScope start(CallDepth callDepth, HttpClient.ResponseReceiver receiver) { + if (callDepth.getAndIncrement() > 0) { + // original method body executed for nested calls + return new AdviceScope(callDepth, null); + } + // original method body will be skipped due to return type and 'skipOn' value + return new SkipMethodBodyAdviceScope( + callDepth, HttpResponseReceiverInstrumenter.instrument(receiver)); + } + public T end(T returnValue, Function, T> receiverFunction) { try { if (modifiedReceiver != null) { - returnValue = modifiedReceiver.response(); + return receiverFunction.apply(modifiedReceiver); } } finally { // needs to be called after original method to prevent StackOverflowError callDepth.decrementAndGet(); } + return returnValue; + } + + public Mono end(Mono returnValue) { + return end(returnValue, HttpClient.ResponseReceiver::response); + } + + public Flux endResponse( + Flux returnValue, + BiFunction> + receiveFunction) { + return end(returnValue, receiver -> receiver.response(receiveFunction)); + } + + public Flux endResponseConnection( + Flux returnValue, + BiFunction> + receiveFunction) { + return end(returnValue, receiver -> receiver.responseConnection(receiveFunction)); + } + + public ByteBufFlux endResponseContent(ByteBufFlux returnValue) { + return end(returnValue, HttpClient.ResponseReceiver::responseContent); + } + + public > Mono endResponseSingle( + Mono returnValue, + BiFunction> + receiveFunction) { + return end(returnValue, receiver -> receiver.responseSingle(receiveFunction)); } } @SuppressWarnings("unused") - public static class ResponseFluxAdvice { + public static class ResponseMonoAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) - public static HttpClient.ResponseReceiver onEnter( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.This HttpClient.ResponseReceiver receiver) { + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Runnable.class) + public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { + return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); + } - callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); - if (callDepth.getAndIncrement() > 0) { - // execute the original method on nested calls - return null; - } + @AssignReturned.ToReturned + @Advice.OnMethodExit(suppress = Throwable.class) + public static Mono onExit( + @Advice.Return Mono returnValue, + @Advice.Enter AdviceScope adviceScope) { + return adviceScope.end(returnValue); + } + } - // non-null value will skip the original method invocation - return HttpResponseReceiverInstrumenter.instrument(receiver); + @SuppressWarnings("unused") + public static class ResponseFluxAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Runnable.class) + public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { + return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); } + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static > void onExit( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + public static > Flux onExit( @Advice.Argument(0) BiFunction> receiveFunction, - @Advice.Return(readOnly = false) Flux returnValue) { - - try { - if (modifiedReceiver != null) { - returnValue = modifiedReceiver.response(receiveFunction); - } - } finally { - // needs to be called after original method to prevent StackOverflowError - callDepth.decrementAndGet(); - } + @Advice.Return Flux returnValue, + @Advice.Enter AdviceScope adviceScope) { + return adviceScope.endResponse(returnValue, receiveFunction); } } @SuppressWarnings("unused") public static class ResponseConnectionAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) - public static HttpClient.ResponseReceiver onEnter( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.This HttpClient.ResponseReceiver receiver) { - - callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); - if (callDepth.getAndIncrement() > 0) { - // execute the original method on nested calls - return null; - } - - // non-null value will skip the original method invocation - return HttpResponseReceiverInstrumenter.instrument(receiver); + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Runnable.class) + public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { + return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); } + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static > void onExit( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + public static > Flux onExit( @Advice.Argument(0) BiFunction> receiveFunction, - @Advice.Return(readOnly = false) Flux returnValue) { - - try { - if (modifiedReceiver != null) { - returnValue = modifiedReceiver.responseConnection(receiveFunction); - } - } finally { - // needs to be called after original method to prevent StackOverflowError - callDepth.decrementAndGet(); - } + @Advice.Return Flux returnValue, + @Advice.Enter AdviceScope adviceScope) { + return adviceScope.endResponseConnection(returnValue, receiveFunction); } } @SuppressWarnings("unused") public static class ResponseContentAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) - public static HttpClient.ResponseReceiver onEnter( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.This HttpClient.ResponseReceiver receiver) { - - callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); - if (callDepth.getAndIncrement() > 0) { - // execute the original method on nested calls - return null; - } - - // non-null value will skip the original method invocation - return HttpResponseReceiverInstrumenter.instrument(receiver); + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Runnable.class) + public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { + return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); } + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, - @Advice.Return(readOnly = false) ByteBufFlux returnValue) { - - try { - if (modifiedReceiver != null) { - returnValue = modifiedReceiver.responseContent(); - } - } finally { - // needs to be called after original method to prevent StackOverflowError - callDepth.decrementAndGet(); - } + public static ByteBufFlux onExit( + @Advice.Return ByteBufFlux returnValue, @Advice.Enter AdviceScope adviceScope) { + return adviceScope.endResponseContent(returnValue); } } @SuppressWarnings("unused") public static class ResponseSingleAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) - public static HttpClient.ResponseReceiver onEnter( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.This HttpClient.ResponseReceiver receiver) { - - callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); - if (callDepth.getAndIncrement() > 0) { - // execute the original method on nested calls - return null; - } - - // non-null value will skip the original method invocation - return HttpResponseReceiverInstrumenter.instrument(receiver); + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Runnable.class) + public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { + return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); } + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static > void onExit( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + public static > Mono onExit( @Advice.Argument(0) BiFunction> receiveFunction, - @Advice.Return(readOnly = false) Mono returnValue) { + @Advice.Return Mono returnValue, + @Advice.Enter AdviceScope adviceScope) { - try { - if (modifiedReceiver != null) { - returnValue = modifiedReceiver.responseSingle(receiveFunction); - } - } finally { - // needs to be called after original method to prevent StackOverflowError - callDepth.decrementAndGet(); - } + return adviceScope.endResponseSingle(returnValue, receiveFunction); } } } diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/TransportConnectorInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/TransportConnectorInstrumentation.java index 4b165de020ee..24adce582942 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/TransportConnectorInstrumentation.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/TransportConnectorInstrumentation.java @@ -5,6 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; +import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.CONNECTION_REQUEST_AND_CONTEXT; import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.connectionInstrumenter; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; @@ -16,15 +17,16 @@ import io.netty.resolver.AddressResolverGroup; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.instrumentation.netty.common.internal.NettyConnectionRequest; -import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.netty.v4_1.InstrumentedAddressResolverGroup; import java.net.SocketAddress; import java.util.List; +import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; +import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.core.publisher.Mono; @@ -68,56 +70,51 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class ResolveAndConnectAdvice { + @AssignReturned.ToArguments(@ToArgument(3)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 3, readOnly = false) AddressResolverGroup resolver) { - resolver = InstrumentedAddressResolverGroup.wrap(connectionInstrumenter(), resolver); + public static AddressResolverGroup onEnter( + @Advice.Argument(3) AddressResolverGroup resolver) { + return InstrumentedAddressResolverGroup.wrap(connectionInstrumenter(), resolver); } + @AssignReturned.ToReturned @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void onExit(@Advice.Return(readOnly = false) Mono mono) { + public static Mono onExit(@Advice.Return Mono mono) { + // end the CONNECT span that was started in doConnect() instrumentation - mono = ConnectionWrapper.wrap(mono); + return ConnectionWrapper.wrap(mono); } } - @SuppressWarnings("unused") - public static class ConnectAdvice { + public static class AdviceScope { + private final NettyConnectionRequest request; + private final Context context; + private final Scope scope; - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(0) SocketAddress remoteAddress, - @Advice.Argument(2) ChannelPromise channelPromise, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelRequest") NettyConnectionRequest request, - @Advice.Local("otelScope") Scope scope) { + private AdviceScope(NettyConnectionRequest request, Context context, Scope scope) { + this.request = request; + this.context = context; + this.scope = scope; + } + + @Nullable + public static AdviceScope start(SocketAddress remoteAddress, ChannelPromise channelPromise) { - Context parentContext = Java8BytecodeBridge.currentContext(); - request = NettyConnectionRequest.connect(remoteAddress); + Context parentContext = Context.current(); + NettyConnectionRequest request = NettyConnectionRequest.connect(remoteAddress); if (!connectionInstrumenter().shouldStart(parentContext, request)) { - return; + return null; } - context = connectionInstrumenter().start(parentContext, request); - scope = context.makeCurrent(); - + Context context = connectionInstrumenter().start(parentContext, request); // the span is finished in the mono decorated by the ConnectionWrapper - VirtualField.find(ChannelPromise.class, ConnectionRequestAndContext.class) - .set(channelPromise, ConnectionRequestAndContext.create(request, context)); + CONNECTION_REQUEST_AND_CONTEXT.set( + channelPromise, ConnectionRequestAndContext.create(request, context)); + return new AdviceScope(request, context, context.makeCurrent()); } - @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void endConnect( - @Advice.Thrown Throwable throwable, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelRequest") NettyConnectionRequest request, - @Advice.Local("otelScope") Scope scope) { - - if (scope == null) { - return; - } + public void end(@Nullable Throwable throwable) { scope.close(); - if (throwable != null) { connectionInstrumenter().end(context, request, null, throwable); } @@ -125,45 +122,44 @@ public static void endConnect( } @SuppressWarnings("unused") - public static class ConnectNewAdvice { + public static class ConnectAdvice { + @Nullable @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(0) List remoteAddresses, - @Advice.Argument(2) ChannelPromise channelPromise, - @Advice.Argument(3) int index, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelRequest") NettyConnectionRequest request, - @Advice.Local("otelScope") Scope scope) { + public static AdviceScope onEnter( + @Advice.Argument(0) SocketAddress remoteAddress, + @Advice.Argument(2) ChannelPromise channelPromise) { + return AdviceScope.start(remoteAddress, channelPromise); + } - Context parentContext = Java8BytecodeBridge.currentContext(); - request = NettyConnectionRequest.connect(remoteAddresses.get(index)); - if (!connectionInstrumenter().shouldStart(parentContext, request)) { - return; + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void endConnect( + @Advice.Thrown @Nullable Throwable throwable, + @Advice.Enter @Nullable AdviceScope adviceScope) { + if (adviceScope != null) { + adviceScope.end(throwable); } + } + } - context = connectionInstrumenter().start(parentContext, request); - scope = context.makeCurrent(); + @SuppressWarnings("unused") + public static class ConnectNewAdvice { - // the span is finished in the mono decorated by the ConnectionWrapper - VirtualField.find(ChannelPromise.class, ConnectionRequestAndContext.class) - .set(channelPromise, ConnectionRequestAndContext.create(request, context)); + @Nullable + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AdviceScope onEnter( + @Advice.Argument(0) List remoteAddresses, + @Advice.Argument(2) ChannelPromise channelPromise, + @Advice.Argument(3) int index) { + SocketAddress remoteAddress = remoteAddresses.get(index); + return AdviceScope.start(remoteAddress, channelPromise); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void endConnect( - @Advice.Thrown Throwable throwable, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelRequest") NettyConnectionRequest request, - @Advice.Local("otelScope") Scope scope) { - - if (scope == null) { - return; - } - scope.close(); - - if (throwable != null) { - connectionInstrumenter().end(context, request, null, throwable); + @Advice.Thrown Throwable throwable, @Advice.Enter @Nullable AdviceScope adviceScope) { + if (adviceScope != null) { + adviceScope.end(throwable); } } }