From fd5b57a0fabfee0ebbc412a86897e1d440540529 Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Wed, 10 Sep 2025 10:29:33 +0200 Subject: [PATCH 01/15] reactor-kafka-1.0 --- .../kafka/v1_0/ConsumerHandlerInstrumentation.java | 6 +++++- .../kafka/v1_0/DefaultKafkaReceiverInstrumentation.java | 6 +++++- .../reactor/kafka/v1_0/KafkaReceiverInstrumentation.java | 7 +++++-- .../kafka/v1_0/ReactorKafkaInstrumentationModule.java | 9 ++++++++- 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java index f2857879ba74..159ef15b4770 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java @@ -11,6 +11,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.core.publisher.Flux; @@ -33,11 +34,14 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class ReceiveAdvice { + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit(@Advice.Return(readOnly = false) Flux flux) { + public static Flux onExit(@Advice.Return Flux originalFlux) { + Flux flux = originalFlux; if (!(flux instanceof TracingDisablingKafkaFlux)) { flux = new TracingDisablingKafkaFlux<>(flux); } + return flux; } } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java index 81c8bf61d996..56254fa9abab 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java @@ -11,6 +11,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.core.publisher.Flux; @@ -33,11 +34,14 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class CreateConsumerFluxAdvice { + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit(@Advice.Return(readOnly = false) Flux flux) { + public static Flux onExit(@Advice.Return Flux originalFlux) { + Flux flux = originalFlux; if (!(flux instanceof TracingDisablingKafkaFlux)) { flux = new TracingDisablingKafkaFlux<>(flux); } + return flux; } } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java index fcae8dc7c455..c39b868c2772 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java @@ -12,6 +12,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.kafka.receiver.KafkaReceiver; @@ -33,11 +34,13 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class CreateAdvice { + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit(@Advice.Return(readOnly = false) KafkaReceiver receiver) { + public static KafkaReceiver onExit(@Advice.Return KafkaReceiver receiver) { if (!(receiver instanceof InstrumentedKafkaReceiver)) { - receiver = new InstrumentedKafkaReceiver<>(receiver); + return new InstrumentedKafkaReceiver<>(receiver); } + return receiver; } } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java index 47524c7fec23..60682bcfd536 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java @@ -10,10 +10,12 @@ import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule; import java.util.List; @AutoService(InstrumentationModule.class) -public class ReactorKafkaInstrumentationModule extends InstrumentationModule { +public class ReactorKafkaInstrumentationModule extends InstrumentationModule + implements ExperimentalInstrumentationModule { public ReactorKafkaInstrumentationModule() { super("reactor-kafka", "reactor-kafka-1.0"); @@ -27,4 +29,9 @@ public List typeInstrumentations() { new DefaultKafkaReceiverInstrumentation(), new ConsumerHandlerInstrumentation()); } + + @Override + public boolean isIndyReady() { + return true; + } } From 9af07a0519e58407fa7fcc792016cdb4c505bda3 Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Wed, 10 Sep 2025 10:49:50 +0200 Subject: [PATCH 02/15] reactor-3.1 --- ...extPropagationOperatorInstrumentation.java | 24 +++++++++---------- ...pagationOperatorInstrumentationModule.java | 5 ++++ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentation.java b/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentation.java index cde1a4d4d8c5..cfaed961c8e1 100644 --- a/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentation.java +++ b/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentation.java @@ -19,6 +19,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.opentelemetryapi.context.AgentContextStorage; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -69,14 +70,13 @@ public static boolean methodEnter() { return false; } + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void methodExit( + public static reactor.util.context.Context methodExit( @Advice.Argument(0) reactor.util.context.Context reactorContext, - @Advice.Argument(1) Context applicationContext, - @Advice.Return(readOnly = false) reactor.util.context.Context updatedReactorContext) { - updatedReactorContext = - ContextPropagationOperator.storeOpenTelemetryContext( - reactorContext, AgentContextStorage.getAgentContext(applicationContext)); + @Advice.Argument(1) Context applicationContext) { + return ContextPropagationOperator.storeOpenTelemetryContext( + reactorContext, AgentContextStorage.getAgentContext(applicationContext)); } } @@ -87,19 +87,19 @@ public static boolean methodEnter() { return false; } + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void methodExit( + public static Context methodExit( @Advice.Argument(0) reactor.util.context.Context reactorContext, - @Advice.Argument(1) Context defaultContext, - @Advice.Return(readOnly = false) Context applicationContext) { + @Advice.Argument(1) Context defaultContext) { io.opentelemetry.context.Context agentContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, null); if (agentContext == null) { - applicationContext = defaultContext; - } else { - applicationContext = AgentContextStorage.toApplicationContext(agentContext); + return defaultContext; } + + return AgentContextStorage.toApplicationContext(agentContext); } } diff --git a/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentationModule.java b/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentationModule.java index 1f40d19a3f1b..9dca1677c555 100644 --- a/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentationModule.java +++ b/instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentationModule.java @@ -38,4 +38,9 @@ public String getModuleGroup() { // This module uses the api context bridge helpers, therefore must be in the same classloader return "opentelemetry-api-bridge"; } + + @Override + public boolean isIndyReady() { + return true; + } } From 7909a4d34b524341f01b7c8457d797df60e834a7 Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Wed, 10 Sep 2025 10:50:32 +0200 Subject: [PATCH 03/15] reactor-3.4 --- .../ContextPropagationOperator34Instrumentation.java | 11 ++++++----- ...extPropagationOperator34InstrumentationModule.java | 5 +++++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34Instrumentation.java b/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34Instrumentation.java index 4c4ab8e8bb7c..920697528b4c 100644 --- a/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34Instrumentation.java +++ b/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34Instrumentation.java @@ -18,6 +18,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.opentelemetryapi.context.AgentContextStorage; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -48,18 +49,18 @@ public static boolean methodEnter() { return false; } + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void methodExit( + public static Context methodExit( @Advice.Argument(0) reactor.util.context.ContextView reactorContext, - @Advice.Argument(1) Context defaultContext, - @Advice.Return(readOnly = false) Context applicationContext) { + @Advice.Argument(1) Context defaultContext) { io.opentelemetry.context.Context agentContext = ContextPropagationOperator.getOpenTelemetryContextFromContextView(reactorContext, null); if (agentContext == null) { - applicationContext = defaultContext; + return defaultContext; } else { - applicationContext = AgentContextStorage.toApplicationContext(agentContext); + return AgentContextStorage.toApplicationContext(agentContext); } } } diff --git a/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34InstrumentationModule.java b/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34InstrumentationModule.java index c5fabc4dccbf..ce14821c0e54 100644 --- a/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34InstrumentationModule.java +++ b/instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34InstrumentationModule.java @@ -39,4 +39,9 @@ public String getModuleGroup() { // This module uses the api context bridge helpers, therefore must be in the same classloader return "opentelemetry-api-bridge"; } + + @Override + public boolean isIndyReady() { + return true; + } } From 3864cf98cdce7dc70c0957fb7ac590f448c6d696 Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Wed, 10 Sep 2025 10:56:31 +0200 Subject: [PATCH 04/15] reactor-netty-0.9 --- .../v0_9/HttpClientInstrumentation.java | 64 ++++++++++++------- .../ReactorNettyInstrumentationModule.java | 9 ++- 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java index 509ddbdf9ed2..b58653dd3c0a 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java @@ -17,6 +17,8 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.function.BiConsumer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; +import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.netty.Connection; @@ -74,85 +76,98 @@ public void transform(TypeTransformer transformer) { public static class CreateAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter(@Advice.Local("otelCallDepth") CallDepth callDepth) { - callDepth = CallDepth.forClass(HttpClient.class); + public static CallDepth onEnter() { + CallDepth callDepth = CallDepth.forClass(HttpClient.class); callDepth.getAndIncrement(); + return callDepth; } + @AssignReturned.ToReturned @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void stopSpan( + public static HttpClient stopSpan( @Advice.Thrown Throwable throwable, - @Advice.Return(readOnly = false) HttpClient client, - @Advice.Local("otelCallDepth") CallDepth callDepth) { + @Advice.Return HttpClient client, + @Advice.Enter CallDepth callDepth) { if (callDepth.decrementAndGet() == 0 && throwable == null) { - client = client.doOnRequest(new OnRequest()).mapConnect(new MapConnect()); + return client.doOnRequest(new OnRequest()).mapConnect(new MapConnect()); } + return client; } } @SuppressWarnings("unused") public static class OnRequestAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) + BiConsumer originalCallback) { + BiConsumer callback = originalCallback; if (DecoratorFunctions.shouldDecorate(callback.getClass())) { callback = new DecoratorFunctions.OnRequestDecorator(callback); } + return callback; } } @SuppressWarnings("unused") public static class OnRequestErrorAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) BiConsumer callback) { if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnRequestErrorDecorator(callback); + return new DecoratorFunctions.OnRequestErrorDecorator(callback); } + return callback; } } @SuppressWarnings("unused") public static class OnResponseAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback, + public static BiConsumer onEnter( + @Advice.Argument(0) BiConsumer callback, @Advice.Origin("#m") String methodName) { if (DecoratorFunctions.shouldDecorate(callback.getClass())) { boolean forceParentContext = methodName.equals("doAfterResponse"); - callback = new DecoratorFunctions.OnResponseDecorator(callback, forceParentContext); + return new DecoratorFunctions.OnResponseDecorator(callback, forceParentContext); } + return callback; } } @SuppressWarnings("unused") public static class OnResponseErrorAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) BiConsumer callback) { if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnResponseErrorDecorator(callback); + return new DecoratorFunctions.OnResponseErrorDecorator(callback); } + return callback; } } @SuppressWarnings("unused") public static class OnErrorAdvice { + @AssignReturned.ToArguments({ + @ToArgument(value = 0, index = 0), + @ToArgument(value = 1, index = 1) + }) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) + public static Object[] onEnter( + @Advice.Argument(0) BiConsumer requestCallback, - @Advice.Argument(value = 1, readOnly = false) + @Advice.Argument(1) BiConsumer responseCallback) { if (DecoratorFunctions.shouldDecorate(requestCallback.getClass())) { requestCallback = new DecoratorFunctions.OnRequestErrorDecorator(requestCallback); @@ -160,6 +175,7 @@ public static void onEnter( if (DecoratorFunctions.shouldDecorate(responseCallback.getClass())) { responseCallback = new DecoratorFunctions.OnResponseErrorDecorator(responseCallback); } + return new Object[] {requestCallback, responseCallback}; } } } diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java index 8287a59c74e9..cbdca1728ecd 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java @@ -11,6 +11,7 @@ import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule; import java.util.List; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -24,7 +25,8 @@ * HttpClient#doOnRequest(BiConsumer)} to pass context from the caller to Reactor to Netty. */ @AutoService(InstrumentationModule.class) -public class ReactorNettyInstrumentationModule extends InstrumentationModule { +public class ReactorNettyInstrumentationModule extends InstrumentationModule + implements ExperimentalInstrumentationModule { public ReactorNettyInstrumentationModule() { super("reactor-netty", "reactor-netty-0.9"); @@ -40,4 +42,9 @@ public ElementMatcher.Junction classLoaderMatcher() { public List typeInstrumentations() { return singletonList(new HttpClientInstrumentation()); } + + @Override + public boolean isIndyReady() { + return true; + } } From 23c60ef03c10cca436df1e0e157c0bd0f5abe19c Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Thu, 11 Sep 2025 15:02:36 +0200 Subject: [PATCH 05/15] return to a working state This reverts commit 3864cf98cdce7dc70c0957fb7ac590f448c6d696. --- .../v0_9/HttpClientInstrumentation.java | 28 +++++++------------ .../ReactorNettyInstrumentationModule.java | 9 +----- 2 files changed, 11 insertions(+), 26 deletions(-) diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java index b58653dd3c0a..d594f9d54e43 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java @@ -17,7 +17,6 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.function.BiConsumer; import net.bytebuddy.asm.Advice; -import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -82,7 +81,7 @@ public static CallDepth onEnter() { return callDepth; } - @AssignReturned.ToReturned + @Advice.AssignReturned.ToReturned @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static HttpClient stopSpan( @Advice.Thrown Throwable throwable, @@ -99,14 +98,12 @@ public static HttpClient stopSpan( @SuppressWarnings("unused") public static class OnRequestAdvice { - @AssignReturned.ToArguments(@ToArgument(0)) + @Advice.AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) public static BiConsumer onEnter( - @Advice.Argument(0) - BiConsumer originalCallback) { - BiConsumer callback = originalCallback; + @Advice.Argument(0) BiConsumer callback) { if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnRequestDecorator(callback); + return new DecoratorFunctions.OnRequestDecorator(callback); } return callback; } @@ -115,7 +112,7 @@ public static class OnRequestAdvice { @SuppressWarnings("unused") public static class OnRequestErrorAdvice { - @AssignReturned.ToArguments(@ToArgument(0)) + @Advice.AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) public static BiConsumer onEnter( @Advice.Argument(0) BiConsumer callback) { @@ -129,7 +126,7 @@ public static class OnRequestErrorAdvice { @SuppressWarnings("unused") public static class OnResponseAdvice { - @AssignReturned.ToArguments(@ToArgument(0)) + @Advice.AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) public static BiConsumer onEnter( @Advice.Argument(0) BiConsumer callback, @@ -145,7 +142,7 @@ public static class OnResponseAdvice { @SuppressWarnings("unused") public static class OnResponseErrorAdvice { - @AssignReturned.ToArguments(@ToArgument(0)) + @Advice.AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) public static BiConsumer onEnter( @Advice.Argument(0) BiConsumer callback) { @@ -159,15 +156,11 @@ public static class OnResponseErrorAdvice { @SuppressWarnings("unused") public static class OnErrorAdvice { - @AssignReturned.ToArguments({ - @ToArgument(value = 0, index = 0), - @ToArgument(value = 1, index = 1) - }) @Advice.OnMethodEnter(suppress = Throwable.class) - public static Object[] onEnter( - @Advice.Argument(0) + public static void onEnter( + @Advice.Argument(value = 0, readOnly = false) BiConsumer requestCallback, - @Advice.Argument(1) + @Advice.Argument(value = 1, readOnly = false) BiConsumer responseCallback) { if (DecoratorFunctions.shouldDecorate(requestCallback.getClass())) { requestCallback = new DecoratorFunctions.OnRequestErrorDecorator(requestCallback); @@ -175,7 +168,6 @@ public static Object[] onEnter( if (DecoratorFunctions.shouldDecorate(responseCallback.getClass())) { responseCallback = new DecoratorFunctions.OnResponseErrorDecorator(responseCallback); } - return new Object[] {requestCallback, responseCallback}; } } } diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java index cbdca1728ecd..8287a59c74e9 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java @@ -11,7 +11,6 @@ import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; -import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule; import java.util.List; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -25,8 +24,7 @@ * HttpClient#doOnRequest(BiConsumer)} to pass context from the caller to Reactor to Netty. */ @AutoService(InstrumentationModule.class) -public class ReactorNettyInstrumentationModule extends InstrumentationModule - implements ExperimentalInstrumentationModule { +public class ReactorNettyInstrumentationModule extends InstrumentationModule { public ReactorNettyInstrumentationModule() { super("reactor-netty", "reactor-netty-0.9"); @@ -42,9 +40,4 @@ public ElementMatcher.Junction classLoaderMatcher() { public List typeInstrumentations() { return singletonList(new HttpClientInstrumentation()); } - - @Override - public boolean isIndyReady() { - return true; - } } From 2adabdc206bed12fe3044ca7fd3e0d946174322e Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Thu, 11 Sep 2025 15:18:54 +0200 Subject: [PATCH 06/15] make it work again --- .../v0_9/HttpClientInstrumentation.java | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java index d594f9d54e43..0a58d9c7ac11 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java @@ -17,6 +17,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.function.BiConsumer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -81,7 +82,7 @@ public static CallDepth onEnter() { return callDepth; } - @Advice.AssignReturned.ToReturned + @AssignReturned.ToReturned @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static HttpClient stopSpan( @Advice.Thrown Throwable throwable, @@ -98,7 +99,7 @@ public static HttpClient stopSpan( @SuppressWarnings("unused") public static class OnRequestAdvice { - @Advice.AssignReturned.ToArguments(@ToArgument(0)) + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) public static BiConsumer onEnter( @Advice.Argument(0) BiConsumer callback) { @@ -112,7 +113,7 @@ public static class OnRequestAdvice { @SuppressWarnings("unused") public static class OnRequestErrorAdvice { - @Advice.AssignReturned.ToArguments(@ToArgument(0)) + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) public static BiConsumer onEnter( @Advice.Argument(0) BiConsumer callback) { @@ -126,7 +127,7 @@ public static class OnRequestErrorAdvice { @SuppressWarnings("unused") public static class OnResponseAdvice { - @Advice.AssignReturned.ToArguments(@ToArgument(0)) + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) public static BiConsumer onEnter( @Advice.Argument(0) BiConsumer callback, @@ -142,7 +143,7 @@ public static class OnResponseAdvice { @SuppressWarnings("unused") public static class OnResponseErrorAdvice { - @Advice.AssignReturned.ToArguments(@ToArgument(0)) + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) public static BiConsumer onEnter( @Advice.Argument(0) BiConsumer callback) { @@ -156,18 +157,26 @@ public static class OnResponseErrorAdvice { @SuppressWarnings("unused") public static class OnErrorAdvice { + @AssignReturned.ToArguments({ + @ToArgument(value = 0, index = 0), + @ToArgument(value = 1, index = 1) + }) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer requestCallback, - @Advice.Argument(value = 1, readOnly = false) - BiConsumer responseCallback) { + public static Object[] onEnter( + @Advice.Argument(0) BiConsumer originalRequestCallback, + @Advice.Argument(1) BiConsumer originalResponseCallback) { + + // intermediate variables needed for inlined instrumentation + BiConsumer requestCallback = originalRequestCallback; + BiConsumer responseCallback = originalResponseCallback; + if (DecoratorFunctions.shouldDecorate(requestCallback.getClass())) { requestCallback = new DecoratorFunctions.OnRequestErrorDecorator(requestCallback); } if (DecoratorFunctions.shouldDecorate(responseCallback.getClass())) { responseCallback = new DecoratorFunctions.OnResponseErrorDecorator(responseCallback); } + return new Object[] {requestCallback, responseCallback}; } } } From c77fbaabe5669583d9b9ff2c911bf06fb64aa7dc Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Thu, 11 Sep 2025 15:32:21 +0200 Subject: [PATCH 07/15] reformat --- .../v0_9/HttpClientInstrumentation.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java index 0a58d9c7ac11..13305afa0865 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java @@ -158,17 +158,21 @@ public static class OnResponseErrorAdvice { public static class OnErrorAdvice { @AssignReturned.ToArguments({ - @ToArgument(value = 0, index = 0), - @ToArgument(value = 1, index = 1) + @ToArgument(value = 0, index = 0), + @ToArgument(value = 1, index = 1) }) @Advice.OnMethodEnter(suppress = Throwable.class) public static Object[] onEnter( - @Advice.Argument(0) BiConsumer originalRequestCallback, - @Advice.Argument(1) BiConsumer originalResponseCallback) { + @Advice.Argument(0) + BiConsumer originalRequestCallback, + @Advice.Argument(1) + BiConsumer originalResponseCallback) { // intermediate variables needed for inlined instrumentation - BiConsumer requestCallback = originalRequestCallback; - BiConsumer responseCallback = originalResponseCallback; + BiConsumer requestCallback = + originalRequestCallback; + BiConsumer responseCallback = + originalResponseCallback; if (DecoratorFunctions.shouldDecorate(requestCallback.getClass())) { requestCallback = new DecoratorFunctions.OnRequestErrorDecorator(requestCallback); From 8bf87993bf4229455a29e008f5d105f9bae2ad1c Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Thu, 11 Sep 2025 16:05:15 +0200 Subject: [PATCH 08/15] wip: ReactorNettySingletons --- .../reactornetty/v1_0/ReactorNettySingletons.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java index 6752c6cd0bc4..a16f5ff4736d 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java @@ -5,10 +5,12 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; +import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.HttpResponse; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.incubator.builder.internal.DefaultHttpClientInstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.instrumentation.netty.common.v4_0.HttpRequestAndChannel; import io.opentelemetry.instrumentation.netty.common.v4_0.internal.client.NettyClientInstrumenterBuilderFactory; import io.opentelemetry.instrumentation.netty.common.v4_0.internal.client.NettyClientInstrumenterFactory; @@ -31,6 +33,10 @@ public final class ReactorNettySingletons { private static final Instrumenter INSTRUMENTER; private static final NettyConnectionInstrumenter CONNECTION_INSTRUMENTER; + public static final VirtualField + CONNECTION_REQUEST_AND_CONTEXT = + VirtualField.find(ChannelPromise.class, ConnectionRequestAndContext.class); + static { INSTRUMENTER = JavaagentHttpClientInstrumenters.create( From 68d6de6149aa37970dbd9fc2de55b87124ddaed6 Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Thu, 11 Sep 2025 16:37:20 +0200 Subject: [PATCH 09/15] wip: HttpClientConnectInstrumentation --- .../v1_0/HttpClientConnectInstrumentation.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientConnectInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientConnectInstrumentation.java index 089f0bedd12d..5bfc1a37ac0d 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientConnectInstrumentation.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientConnectInstrumentation.java @@ -11,6 +11,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.core.publisher.Mono; @@ -35,10 +36,10 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class ConnectAdvice { + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit( - @Advice.Return(readOnly = false) Mono connection, - @Advice.This HttpClient httpClient) { + public static Mono onExit( + @Advice.Return Mono connection, @Advice.This HttpClient httpClient) { HttpClientConfig config = httpClient.configuration(); // reactor-netty 1.0.x has a bug: the .mapConnect() function is not applied when deferred @@ -46,8 +47,9 @@ public static void onExit( // we're fixing this bug here, so that our instrumentation can safely add its own // .mapConnect() listener if (HttpClientConfigBuddy.hasDeferredConfig(config)) { - connection = HttpClientConfigBuddy.getConnector(config).apply(connection); + return HttpClientConfigBuddy.getConnector(config).apply(connection); } + return connection; } } } From e8c19d37cbbffae0652dcc61fc501bb1be85c4ed Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Thu, 11 Sep 2025 17:27:38 +0200 Subject: [PATCH 10/15] wip: HttpClientInstrumentation --- .../v1_0/HttpClientInstrumentation.java | 89 ++++++++++++++----- 1 file changed, 66 insertions(+), 23 deletions(-) diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java index 9c1a49585ae8..f77680ab9c6f 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java @@ -16,6 +16,8 @@ import io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.DecoratorFunctions.PropagatedContext; import java.util.function.BiConsumer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; +import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.netty.Connection; @@ -79,102 +81,142 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class OnRequestAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) + BiConsumer originalCallBack) { + + // intermediate variable needed for inlined instrumentation + BiConsumer callback = originalCallBack; if (DecoratorFunctions.shouldDecorate(callback.getClass())) { // perform the callback with the client span active (instead of the parent) since this // callback occurs after the connection is made callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.CLIENT); } + return callback; } } @SuppressWarnings("unused") public static class AfterRequestAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) + BiConsumer originalCallBack) { + + // intermediate variable needed for inlined instrumentation + BiConsumer callback = originalCallBack; if (DecoratorFunctions.shouldDecorate(callback.getClass())) { // use client context after request is sent callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.CLIENT); } + return callback; } } @SuppressWarnings("unused") public static class OnRequestErrorAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) + BiConsumer originalCallBack) { + + // using an intermediate variable is required to keep the advice when inlined + BiConsumer callback = originalCallBack; if (DecoratorFunctions.shouldDecorate(callback.getClass())) { callback = new DecoratorFunctions.OnMessageErrorDecorator<>(callback, PropagatedContext.PARENT); } + return callback; } } @SuppressWarnings("unused") public static class OnResponseAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) + BiConsumer originalCallBack) { + + // intermediate variable needed for inlined instrumentation + BiConsumer callback = originalCallBack; if (DecoratorFunctions.shouldDecorate(callback.getClass())) { // use client context just when response status & headers are received callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.CLIENT); } + return callback; } } @SuppressWarnings("unused") public static class AfterResponseAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) + BiConsumer originalCallback) { + + // intermediate variable needed for inlined instrumentation + BiConsumer callback = originalCallback; if (DecoratorFunctions.shouldDecorate(callback.getClass())) { callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.PARENT); } + return callback; } } @SuppressWarnings("unused") public static class OnResponseErrorAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + public static BiConsumer onEnter( + @Advice.Argument(0) + BiConsumer originalCallback) { + + // intermediate variable needed for inlined instrumentation + BiConsumer callback = originalCallback; if (DecoratorFunctions.shouldDecorate(callback.getClass())) { callback = new DecoratorFunctions.OnMessageErrorDecorator<>(callback, PropagatedContext.PARENT); } + return callback; } } @SuppressWarnings("unused") public static class OnErrorAdvice { + @AssignReturned.ToArguments({ + @ToArgument(value = 0, index = 0), + @ToArgument(value = 1, index = 1) + }) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) - BiConsumer requestCallback, - @Advice.Argument(value = 1, readOnly = false) - BiConsumer responseCallback) { + public static Object[] onEnter( + @Advice.Argument(0) + BiConsumer originalRequestCallback, + @Advice.Argument(1) + BiConsumer originalResponseCallback) { + + // intermediate variables needed for inlined instrumentation + BiConsumer requestCallback = + originalRequestCallback; + BiConsumer responseCallback = + originalResponseCallback; if (DecoratorFunctions.shouldDecorate(requestCallback.getClass())) { requestCallback = @@ -186,6 +228,7 @@ public static void onEnter( new DecoratorFunctions.OnMessageErrorDecorator<>( responseCallback, PropagatedContext.PARENT); } + return new Object[] {requestCallback, responseCallback}; } } } From 19b4d178761d8ff7d4d08a84ff47035f908c4236 Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Thu, 11 Sep 2025 17:49:34 +0200 Subject: [PATCH 11/15] wip: TransportConnectorInstrumentation --- .../TransportConnectorInstrumentation.java | 124 +++++++++--------- 1 file changed, 60 insertions(+), 64 deletions(-) diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/TransportConnectorInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/TransportConnectorInstrumentation.java index 4b165de020ee..24adce582942 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/TransportConnectorInstrumentation.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/TransportConnectorInstrumentation.java @@ -5,6 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; +import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.CONNECTION_REQUEST_AND_CONTEXT; import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.connectionInstrumenter; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; @@ -16,15 +17,16 @@ import io.netty.resolver.AddressResolverGroup; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.instrumentation.netty.common.internal.NettyConnectionRequest; -import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.netty.v4_1.InstrumentedAddressResolverGroup; import java.net.SocketAddress; import java.util.List; +import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; +import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.core.publisher.Mono; @@ -68,56 +70,51 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class ResolveAndConnectAdvice { + @AssignReturned.ToArguments(@ToArgument(3)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 3, readOnly = false) AddressResolverGroup resolver) { - resolver = InstrumentedAddressResolverGroup.wrap(connectionInstrumenter(), resolver); + public static AddressResolverGroup onEnter( + @Advice.Argument(3) AddressResolverGroup resolver) { + return InstrumentedAddressResolverGroup.wrap(connectionInstrumenter(), resolver); } + @AssignReturned.ToReturned @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void onExit(@Advice.Return(readOnly = false) Mono mono) { + public static Mono onExit(@Advice.Return Mono mono) { + // end the CONNECT span that was started in doConnect() instrumentation - mono = ConnectionWrapper.wrap(mono); + return ConnectionWrapper.wrap(mono); } } - @SuppressWarnings("unused") - public static class ConnectAdvice { + public static class AdviceScope { + private final NettyConnectionRequest request; + private final Context context; + private final Scope scope; - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(0) SocketAddress remoteAddress, - @Advice.Argument(2) ChannelPromise channelPromise, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelRequest") NettyConnectionRequest request, - @Advice.Local("otelScope") Scope scope) { + private AdviceScope(NettyConnectionRequest request, Context context, Scope scope) { + this.request = request; + this.context = context; + this.scope = scope; + } + + @Nullable + public static AdviceScope start(SocketAddress remoteAddress, ChannelPromise channelPromise) { - Context parentContext = Java8BytecodeBridge.currentContext(); - request = NettyConnectionRequest.connect(remoteAddress); + Context parentContext = Context.current(); + NettyConnectionRequest request = NettyConnectionRequest.connect(remoteAddress); if (!connectionInstrumenter().shouldStart(parentContext, request)) { - return; + return null; } - context = connectionInstrumenter().start(parentContext, request); - scope = context.makeCurrent(); - + Context context = connectionInstrumenter().start(parentContext, request); // the span is finished in the mono decorated by the ConnectionWrapper - VirtualField.find(ChannelPromise.class, ConnectionRequestAndContext.class) - .set(channelPromise, ConnectionRequestAndContext.create(request, context)); + CONNECTION_REQUEST_AND_CONTEXT.set( + channelPromise, ConnectionRequestAndContext.create(request, context)); + return new AdviceScope(request, context, context.makeCurrent()); } - @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void endConnect( - @Advice.Thrown Throwable throwable, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelRequest") NettyConnectionRequest request, - @Advice.Local("otelScope") Scope scope) { - - if (scope == null) { - return; - } + public void end(@Nullable Throwable throwable) { scope.close(); - if (throwable != null) { connectionInstrumenter().end(context, request, null, throwable); } @@ -125,45 +122,44 @@ public static void endConnect( } @SuppressWarnings("unused") - public static class ConnectNewAdvice { + public static class ConnectAdvice { + @Nullable @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(0) List remoteAddresses, - @Advice.Argument(2) ChannelPromise channelPromise, - @Advice.Argument(3) int index, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelRequest") NettyConnectionRequest request, - @Advice.Local("otelScope") Scope scope) { + public static AdviceScope onEnter( + @Advice.Argument(0) SocketAddress remoteAddress, + @Advice.Argument(2) ChannelPromise channelPromise) { + return AdviceScope.start(remoteAddress, channelPromise); + } - Context parentContext = Java8BytecodeBridge.currentContext(); - request = NettyConnectionRequest.connect(remoteAddresses.get(index)); - if (!connectionInstrumenter().shouldStart(parentContext, request)) { - return; + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void endConnect( + @Advice.Thrown @Nullable Throwable throwable, + @Advice.Enter @Nullable AdviceScope adviceScope) { + if (adviceScope != null) { + adviceScope.end(throwable); } + } + } - context = connectionInstrumenter().start(parentContext, request); - scope = context.makeCurrent(); + @SuppressWarnings("unused") + public static class ConnectNewAdvice { - // the span is finished in the mono decorated by the ConnectionWrapper - VirtualField.find(ChannelPromise.class, ConnectionRequestAndContext.class) - .set(channelPromise, ConnectionRequestAndContext.create(request, context)); + @Nullable + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AdviceScope onEnter( + @Advice.Argument(0) List remoteAddresses, + @Advice.Argument(2) ChannelPromise channelPromise, + @Advice.Argument(3) int index) { + SocketAddress remoteAddress = remoteAddresses.get(index); + return AdviceScope.start(remoteAddress, channelPromise); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void endConnect( - @Advice.Thrown Throwable throwable, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelRequest") NettyConnectionRequest request, - @Advice.Local("otelScope") Scope scope) { - - if (scope == null) { - return; - } - scope.close(); - - if (throwable != null) { - connectionInstrumenter().end(context, request, null, throwable); + @Advice.Thrown Throwable throwable, @Advice.Enter @Nullable AdviceScope adviceScope) { + if (adviceScope != null) { + adviceScope.end(throwable); } } } From c3c7bc60e57e210066e4b6c429b71958c5041862 Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Thu, 11 Sep 2025 18:21:22 +0200 Subject: [PATCH 12/15] working for inlined ! --- .../ReactorNettyInstrumentationModule.java | 5 + .../v1_0/ResponseReceiverInstrumentation.java | 222 ++++++++---------- 2 files changed, 106 insertions(+), 121 deletions(-) diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java index 2517d4ccbc69..6e9a34ac0b9f 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java @@ -57,4 +57,9 @@ public List typeInstrumentations() { new ResponseReceiverInstrumentation(), new TransportConnectorInstrumentation()); } + + @Override + public boolean isIndyReady() { + return true; + } } diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java index 451c613df261..5ce4bbbbb421 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java @@ -16,7 +16,10 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.function.BiFunction; +import java.util.function.Function; +import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import org.reactivestreams.Publisher; @@ -70,187 +73,164 @@ public void transform(TypeTransformer transformer) { this.getClass().getName() + "$ResponseSingleAdvice"); } - @SuppressWarnings("unused") - public static class ResponseMonoAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) - public static HttpClient.ResponseReceiver onEnter( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.This HttpClient.ResponseReceiver receiver) { + public static class AdviceScope { + @Nullable private final HttpClient.ResponseReceiver modifiedReceiver; + private final CallDepth callDepth; - callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); - if (callDepth.getAndIncrement() > 0) { - // execute the original method on nested calls - return null; + /** Dedicated advice scope subclass that make instrumentation skip original method body. */ + public static class SkipMethodBody extends AdviceScope { + private SkipMethodBody(CallDepth callDepth, HttpClient.ResponseReceiver receiver) { + super(callDepth, receiver); } + } - // non-null value will skip the original method invocation - return HttpResponseReceiverInstrumenter.instrument(receiver); + private AdviceScope(CallDepth callDepth, @Nullable HttpClient.ResponseReceiver receiver) { + this.modifiedReceiver = receiver; + this.callDepth = callDepth; } - @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, - @Advice.Return(readOnly = false) Mono returnValue) { + public static AdviceScope start(CallDepth callDepth, HttpClient.ResponseReceiver receiver) { + if (callDepth.getAndIncrement() > 0) { + // original method body executed for nested calls + return new AdviceScope(callDepth, null); + } + // original method body will be skipped due to return type and 'skipOn' value + return new SkipMethodBody(callDepth, HttpResponseReceiverInstrumenter.instrument(receiver)); + } + public T end(T returnValue, Function, T> receiverFunction) { try { if (modifiedReceiver != null) { - returnValue = modifiedReceiver.response(); + return receiverFunction.apply(modifiedReceiver); } } finally { // needs to be called after original method to prevent StackOverflowError callDepth.decrementAndGet(); } + return returnValue; + } + + public Mono end(Mono returnValue) { + return end(returnValue, HttpClient.ResponseReceiver::response); + } + + public Flux endResponse( + Flux returnValue, + BiFunction> + receiveFunction) { + return end(returnValue, receiver -> receiver.response(receiveFunction)); + } + + public Flux endResponseConnection( + Flux returnValue, + BiFunction> + receiveFunction) { + return end(returnValue, receiver -> receiver.responseConnection(receiveFunction)); + } + + public ByteBufFlux endResponseContent(ByteBufFlux returnValue) { + return end(returnValue, HttpClient.ResponseReceiver::responseContent); + } + + public > Mono endResponseSingle( + Mono returnValue, + BiFunction> + receiveFunction) { + return end(returnValue, receiver -> receiver.responseSingle(receiveFunction)); } } @SuppressWarnings("unused") - public static class ResponseFluxAdvice { + public static class ResponseMonoAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) - public static HttpClient.ResponseReceiver onEnter( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.This HttpClient.ResponseReceiver receiver) { + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = AdviceScope.SkipMethodBody.class) + public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { + return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); + } - callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); - if (callDepth.getAndIncrement() > 0) { - // execute the original method on nested calls - return null; - } + @AssignReturned.ToReturned + @Advice.OnMethodExit(suppress = Throwable.class) + public static Mono onExit( + @Advice.Return Mono returnValue, + @Advice.Enter AdviceScope adviceScope) { + return adviceScope.end(returnValue); + } + } + + @SuppressWarnings("unused") + public static class ResponseFluxAdvice { - // non-null value will skip the original method invocation - return HttpResponseReceiverInstrumenter.instrument(receiver); + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = AdviceScope.SkipMethodBody.class) + public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { + return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); } + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static > void onExit( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + public static > Flux onExit( @Advice.Argument(0) BiFunction> receiveFunction, - @Advice.Return(readOnly = false) Flux returnValue) { - - try { - if (modifiedReceiver != null) { - returnValue = modifiedReceiver.response(receiveFunction); - } - } finally { - // needs to be called after original method to prevent StackOverflowError - callDepth.decrementAndGet(); - } + @Advice.Return Flux returnValue, + @Advice.Enter AdviceScope adviceScope) { + return adviceScope.endResponse(returnValue, receiveFunction); } } @SuppressWarnings("unused") public static class ResponseConnectionAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) - public static HttpClient.ResponseReceiver onEnter( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.This HttpClient.ResponseReceiver receiver) { - - callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); - if (callDepth.getAndIncrement() > 0) { - // execute the original method on nested calls - return null; - } - - // non-null value will skip the original method invocation - return HttpResponseReceiverInstrumenter.instrument(receiver); + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = AdviceScope.SkipMethodBody.class) + public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { + return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); } + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static > void onExit( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + public static > Flux onExit( @Advice.Argument(0) BiFunction> receiveFunction, - @Advice.Return(readOnly = false) Flux returnValue) { - - try { - if (modifiedReceiver != null) { - returnValue = modifiedReceiver.responseConnection(receiveFunction); - } - } finally { - // needs to be called after original method to prevent StackOverflowError - callDepth.decrementAndGet(); - } + @Advice.Return Flux returnValue, + @Advice.Enter AdviceScope adviceScope) { + return adviceScope.endResponseConnection(returnValue, receiveFunction); } } @SuppressWarnings("unused") public static class ResponseContentAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) - public static HttpClient.ResponseReceiver onEnter( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.This HttpClient.ResponseReceiver receiver) { - - callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); - if (callDepth.getAndIncrement() > 0) { - // execute the original method on nested calls - return null; - } - - // non-null value will skip the original method invocation - return HttpResponseReceiverInstrumenter.instrument(receiver); + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = AdviceScope.SkipMethodBody.class) + public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { + return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); } + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, - @Advice.Return(readOnly = false) ByteBufFlux returnValue) { - - try { - if (modifiedReceiver != null) { - returnValue = modifiedReceiver.responseContent(); - } - } finally { - // needs to be called after original method to prevent StackOverflowError - callDepth.decrementAndGet(); - } + public static ByteBufFlux onExit( + @Advice.Return ByteBufFlux returnValue, @Advice.Enter AdviceScope adviceScope) { + return adviceScope.endResponseContent(returnValue); } } @SuppressWarnings("unused") public static class ResponseSingleAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) - public static HttpClient.ResponseReceiver onEnter( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.This HttpClient.ResponseReceiver receiver) { - - callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); - if (callDepth.getAndIncrement() > 0) { - // execute the original method on nested calls - return null; - } - - // non-null value will skip the original method invocation - return HttpResponseReceiverInstrumenter.instrument(receiver); + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = AdviceScope.SkipMethodBody.class) + public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { + return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); } + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static > void onExit( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + public static > Mono onExit( @Advice.Argument(0) BiFunction> receiveFunction, - @Advice.Return(readOnly = false) Mono returnValue) { + @Advice.Return Mono returnValue, + @Advice.Enter AdviceScope adviceScope) { - try { - if (modifiedReceiver != null) { - returnValue = modifiedReceiver.responseSingle(receiveFunction); - } - } finally { - // needs to be called after original method to prevent StackOverflowError - callDepth.decrementAndGet(); - } + return adviceScope.endResponseSingle(returnValue, receiveFunction); } } } From 6d33fbac04a8941c6e4081efbac25830dd77c01c Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Fri, 12 Sep 2025 12:17:49 +0200 Subject: [PATCH 13/15] hack skipOn with Runnable interface --- .../v1_0/ResponseReceiverInstrumentation.java | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java index 5ce4bbbbb421..11ed5b54e0ea 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java @@ -77,11 +77,21 @@ public static class AdviceScope { @Nullable private final HttpClient.ResponseReceiver modifiedReceiver; private final CallDepth callDepth; - /** Dedicated advice scope subclass that make instrumentation skip original method body. */ - public static class SkipMethodBody extends AdviceScope { - private SkipMethodBody(CallDepth callDepth, HttpClient.ResponseReceiver receiver) { + /** + * Dedicated advice scope subclass that make instrumentation skip original method body using + * {@code skipOn = Runnable.class } which does not require to expose an extra type + */ + public static class SkipMethodBodyAdviceScope extends AdviceScope implements Runnable { + private SkipMethodBodyAdviceScope( + CallDepth callDepth, HttpClient.ResponseReceiver receiver) { super(callDepth, receiver); } + + @Override + public void run() { + // do nothing, only using Runnable as a marker interface to enable skipping without + // exposing the actual type + } } private AdviceScope(CallDepth callDepth, @Nullable HttpClient.ResponseReceiver receiver) { @@ -95,7 +105,8 @@ public static AdviceScope start(CallDepth callDepth, HttpClient.ResponseReceiver return new AdviceScope(callDepth, null); } // original method body will be skipped due to return type and 'skipOn' value - return new SkipMethodBody(callDepth, HttpResponseReceiverInstrumenter.instrument(receiver)); + return new SkipMethodBodyAdviceScope( + callDepth, HttpResponseReceiverInstrumenter.instrument(receiver)); } public T end(T returnValue, Function, T> receiverFunction) { @@ -143,7 +154,7 @@ public > Mono endResponseSingle( @SuppressWarnings("unused") public static class ResponseMonoAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = AdviceScope.SkipMethodBody.class) + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Runnable.class) public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); } @@ -160,7 +171,7 @@ public static Mono onExit( @SuppressWarnings("unused") public static class ResponseFluxAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = AdviceScope.SkipMethodBody.class) + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Runnable.class) public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); } @@ -180,7 +191,7 @@ public static > Flux onExit( @SuppressWarnings("unused") public static class ResponseConnectionAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = AdviceScope.SkipMethodBody.class) + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Runnable.class) public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); } @@ -200,7 +211,7 @@ public static > Flux onExit( @SuppressWarnings("unused") public static class ResponseContentAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = AdviceScope.SkipMethodBody.class) + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Runnable.class) public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); } @@ -216,7 +227,7 @@ public static ByteBufFlux onExit( @SuppressWarnings("unused") public static class ResponseSingleAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = AdviceScope.SkipMethodBody.class) + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Runnable.class) public static AdviceScope onEnter(@Advice.This HttpClient.ResponseReceiver receiver) { return AdviceScope.start(CallDepth.forClass(HttpClient.ResponseReceiver.class), receiver); } From 97251a4efcac69863a0ab9a446985f07f9e89369 Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Mon, 15 Sep 2025 09:22:48 +0200 Subject: [PATCH 14/15] add missing isIndyReady --- .../v0_9/ReactorNettyInstrumentationModule.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java index 8287a59c74e9..cbdca1728ecd 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java @@ -11,6 +11,7 @@ import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule; import java.util.List; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -24,7 +25,8 @@ * HttpClient#doOnRequest(BiConsumer)} to pass context from the caller to Reactor to Netty. */ @AutoService(InstrumentationModule.class) -public class ReactorNettyInstrumentationModule extends InstrumentationModule { +public class ReactorNettyInstrumentationModule extends InstrumentationModule + implements ExperimentalInstrumentationModule { public ReactorNettyInstrumentationModule() { super("reactor-netty", "reactor-netty-0.9"); @@ -40,4 +42,9 @@ public ElementMatcher.Junction classLoaderMatcher() { public List typeInstrumentations() { return singletonList(new HttpClientInstrumentation()); } + + @Override + public boolean isIndyReady() { + return true; + } } From dc304625317877cfad29f625b96c9396221978e5 Mon Sep 17 00:00:00 2001 From: Sylvain Juge <763082+SylvainJuge@users.noreply.github.com> Date: Mon, 15 Sep 2025 17:23:04 +0200 Subject: [PATCH 15/15] post-review changes --- .../kafka/v1_0/ConsumerHandlerInstrumentation.java | 9 ++++----- .../kafka/v1_0/DefaultKafkaReceiverInstrumentation.java | 9 ++++----- .../reactor/kafka/v1_0/KafkaReceiverInstrumentation.java | 6 +++--- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java index 159ef15b4770..526dce67631a 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java @@ -36,12 +36,11 @@ public static class ReceiveAdvice { @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static Flux onExit(@Advice.Return Flux originalFlux) { - Flux flux = originalFlux; - if (!(flux instanceof TracingDisablingKafkaFlux)) { - flux = new TracingDisablingKafkaFlux<>(flux); + public static Flux onExit(@Advice.Return Flux flux) { + if (flux instanceof TracingDisablingKafkaFlux) { + return flux; } - return flux; + return new TracingDisablingKafkaFlux<>(flux); } } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java index 56254fa9abab..97a75252105f 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java @@ -36,12 +36,11 @@ public static class CreateConsumerFluxAdvice { @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static Flux onExit(@Advice.Return Flux originalFlux) { - Flux flux = originalFlux; - if (!(flux instanceof TracingDisablingKafkaFlux)) { - flux = new TracingDisablingKafkaFlux<>(flux); + public static Flux onExit(@Advice.Return Flux flux) { + if (flux instanceof TracingDisablingKafkaFlux) { + return flux; } - return flux; + return new TracingDisablingKafkaFlux<>(flux); } } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java index c39b868c2772..48f01c7cc35f 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java @@ -37,10 +37,10 @@ public static class CreateAdvice { @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) public static KafkaReceiver onExit(@Advice.Return KafkaReceiver receiver) { - if (!(receiver instanceof InstrumentedKafkaReceiver)) { - return new InstrumentedKafkaReceiver<>(receiver); + if (receiver instanceof InstrumentedKafkaReceiver) { + return receiver; } - return receiver; + return new InstrumentedKafkaReceiver<>(receiver); } } }