Skip to content

Commit ecdaa63

Browse files
authored
make reactor indy-ready (#14640)
1 parent 6313b39 commit ecdaa63

File tree

16 files changed

+365
-267
lines changed

16 files changed

+365
-267
lines changed

instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentation.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
2020
import io.opentelemetry.javaagent.instrumentation.opentelemetryapi.context.AgentContextStorage;
2121
import net.bytebuddy.asm.Advice;
22+
import net.bytebuddy.asm.Advice.AssignReturned;
2223
import net.bytebuddy.description.type.TypeDescription;
2324
import net.bytebuddy.matcher.ElementMatcher;
2425

@@ -69,14 +70,13 @@ public static boolean methodEnter() {
6970
return false;
7071
}
7172

73+
@AssignReturned.ToReturned
7274
@Advice.OnMethodExit(suppress = Throwable.class)
73-
public static void methodExit(
75+
public static reactor.util.context.Context methodExit(
7476
@Advice.Argument(0) reactor.util.context.Context reactorContext,
75-
@Advice.Argument(1) Context applicationContext,
76-
@Advice.Return(readOnly = false) reactor.util.context.Context updatedReactorContext) {
77-
updatedReactorContext =
78-
ContextPropagationOperator.storeOpenTelemetryContext(
79-
reactorContext, AgentContextStorage.getAgentContext(applicationContext));
77+
@Advice.Argument(1) Context applicationContext) {
78+
return ContextPropagationOperator.storeOpenTelemetryContext(
79+
reactorContext, AgentContextStorage.getAgentContext(applicationContext));
8080
}
8181
}
8282

@@ -87,19 +87,19 @@ public static boolean methodEnter() {
8787
return false;
8888
}
8989

90+
@AssignReturned.ToReturned
9091
@Advice.OnMethodExit(suppress = Throwable.class)
91-
public static void methodExit(
92+
public static Context methodExit(
9293
@Advice.Argument(0) reactor.util.context.Context reactorContext,
93-
@Advice.Argument(1) Context defaultContext,
94-
@Advice.Return(readOnly = false) Context applicationContext) {
94+
@Advice.Argument(1) Context defaultContext) {
9595

9696
io.opentelemetry.context.Context agentContext =
9797
ContextPropagationOperator.getOpenTelemetryContext(reactorContext, null);
9898
if (agentContext == null) {
99-
applicationContext = defaultContext;
100-
} else {
101-
applicationContext = AgentContextStorage.toApplicationContext(agentContext);
99+
return defaultContext;
102100
}
101+
102+
return AgentContextStorage.toApplicationContext(agentContext);
103103
}
104104
}
105105

instrumentation/reactor/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_1/operator/ContextPropagationOperatorInstrumentationModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,9 @@ public String getModuleGroup() {
3838
// This module uses the api context bridge helpers, therefore must be in the same classloader
3939
return "opentelemetry-api-bridge";
4040
}
41+
42+
@Override
43+
public boolean isIndyReady() {
44+
return true;
45+
}
4146
}

instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34Instrumentation.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
1919
import io.opentelemetry.javaagent.instrumentation.opentelemetryapi.context.AgentContextStorage;
2020
import net.bytebuddy.asm.Advice;
21+
import net.bytebuddy.asm.Advice.AssignReturned;
2122
import net.bytebuddy.description.type.TypeDescription;
2223
import net.bytebuddy.matcher.ElementMatcher;
2324

@@ -48,18 +49,18 @@ public static boolean methodEnter() {
4849
return false;
4950
}
5051

52+
@AssignReturned.ToReturned
5153
@Advice.OnMethodExit(suppress = Throwable.class)
52-
public static void methodExit(
54+
public static Context methodExit(
5355
@Advice.Argument(0) reactor.util.context.ContextView reactorContext,
54-
@Advice.Argument(1) Context defaultContext,
55-
@Advice.Return(readOnly = false) Context applicationContext) {
56+
@Advice.Argument(1) Context defaultContext) {
5657

5758
io.opentelemetry.context.Context agentContext =
5859
ContextPropagationOperator.getOpenTelemetryContextFromContextView(reactorContext, null);
5960
if (agentContext == null) {
60-
applicationContext = defaultContext;
61+
return defaultContext;
6162
} else {
62-
applicationContext = AgentContextStorage.toApplicationContext(agentContext);
63+
return AgentContextStorage.toApplicationContext(agentContext);
6364
}
6465
}
6566
}

instrumentation/reactor/reactor-3.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/v3_4/operator/ContextPropagationOperator34InstrumentationModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,9 @@ public String getModuleGroup() {
3939
// This module uses the api context bridge helpers, therefore must be in the same classloader
4040
return "opentelemetry-api-bridge";
4141
}
42+
43+
@Override
44+
public boolean isIndyReady() {
45+
return true;
46+
}
4247
}

instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
1212
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
1313
import net.bytebuddy.asm.Advice;
14+
import net.bytebuddy.asm.Advice.AssignReturned;
1415
import net.bytebuddy.description.type.TypeDescription;
1516
import net.bytebuddy.matcher.ElementMatcher;
1617
import reactor.core.publisher.Flux;
@@ -33,11 +34,13 @@ public void transform(TypeTransformer transformer) {
3334
@SuppressWarnings("unused")
3435
public static class ReceiveAdvice {
3536

37+
@AssignReturned.ToReturned
3638
@Advice.OnMethodExit(suppress = Throwable.class)
37-
public static void onExit(@Advice.Return(readOnly = false) Flux<?> flux) {
38-
if (!(flux instanceof TracingDisablingKafkaFlux)) {
39-
flux = new TracingDisablingKafkaFlux<>(flux);
39+
public static Flux<?> onExit(@Advice.Return Flux<?> flux) {
40+
if (flux instanceof TracingDisablingKafkaFlux) {
41+
return flux;
4042
}
43+
return new TracingDisablingKafkaFlux<>(flux);
4144
}
4245
}
4346
}

instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
1212
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
1313
import net.bytebuddy.asm.Advice;
14+
import net.bytebuddy.asm.Advice.AssignReturned;
1415
import net.bytebuddy.description.type.TypeDescription;
1516
import net.bytebuddy.matcher.ElementMatcher;
1617
import reactor.core.publisher.Flux;
@@ -33,11 +34,13 @@ public void transform(TypeTransformer transformer) {
3334
@SuppressWarnings("unused")
3435
public static class CreateConsumerFluxAdvice {
3536

37+
@AssignReturned.ToReturned
3638
@Advice.OnMethodExit(suppress = Throwable.class)
37-
public static void onExit(@Advice.Return(readOnly = false) Flux<?> flux) {
38-
if (!(flux instanceof TracingDisablingKafkaFlux)) {
39-
flux = new TracingDisablingKafkaFlux<>(flux);
39+
public static Flux<?> onExit(@Advice.Return Flux<?> flux) {
40+
if (flux instanceof TracingDisablingKafkaFlux) {
41+
return flux;
4042
}
43+
return new TracingDisablingKafkaFlux<>(flux);
4144
}
4245
}
4346
}

instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
1313
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
1414
import net.bytebuddy.asm.Advice;
15+
import net.bytebuddy.asm.Advice.AssignReturned;
1516
import net.bytebuddy.description.type.TypeDescription;
1617
import net.bytebuddy.matcher.ElementMatcher;
1718
import reactor.kafka.receiver.KafkaReceiver;
@@ -33,11 +34,13 @@ public void transform(TypeTransformer transformer) {
3334
@SuppressWarnings("unused")
3435
public static class CreateAdvice {
3536

37+
@AssignReturned.ToReturned
3638
@Advice.OnMethodExit(suppress = Throwable.class)
37-
public static void onExit(@Advice.Return(readOnly = false) KafkaReceiver<?, ?> receiver) {
38-
if (!(receiver instanceof InstrumentedKafkaReceiver)) {
39-
receiver = new InstrumentedKafkaReceiver<>(receiver);
39+
public static KafkaReceiver<?, ?> onExit(@Advice.Return KafkaReceiver<?, ?> receiver) {
40+
if (receiver instanceof InstrumentedKafkaReceiver) {
41+
return receiver;
4042
}
43+
return new InstrumentedKafkaReceiver<>(receiver);
4144
}
4245
}
4346
}

instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
import com.google.auto.service.AutoService;
1111
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
1212
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
13+
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
1314
import java.util.List;
1415

1516
@AutoService(InstrumentationModule.class)
16-
public class ReactorKafkaInstrumentationModule extends InstrumentationModule {
17+
public class ReactorKafkaInstrumentationModule extends InstrumentationModule
18+
implements ExperimentalInstrumentationModule {
1719

1820
public ReactorKafkaInstrumentationModule() {
1921
super("reactor-kafka", "reactor-kafka-1.0");
@@ -27,4 +29,9 @@ public List<TypeInstrumentation> typeInstrumentations() {
2729
new DefaultKafkaReceiverInstrumentation(),
2830
new ConsumerHandlerInstrumentation());
2931
}
32+
33+
@Override
34+
public boolean isIndyReady() {
35+
return true;
36+
}
3037
}

instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/HttpClientInstrumentation.java

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
1818
import java.util.function.BiConsumer;
1919
import net.bytebuddy.asm.Advice;
20+
import net.bytebuddy.asm.Advice.AssignReturned;
21+
import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument;
2022
import net.bytebuddy.description.type.TypeDescription;
2123
import net.bytebuddy.matcher.ElementMatcher;
2224
import reactor.netty.Connection;
@@ -74,92 +76,111 @@ public void transform(TypeTransformer transformer) {
7476
public static class CreateAdvice {
7577

7678
@Advice.OnMethodEnter(suppress = Throwable.class)
77-
public static void onEnter(@Advice.Local("otelCallDepth") CallDepth callDepth) {
78-
callDepth = CallDepth.forClass(HttpClient.class);
79+
public static CallDepth onEnter() {
80+
CallDepth callDepth = CallDepth.forClass(HttpClient.class);
7981
callDepth.getAndIncrement();
82+
return callDepth;
8083
}
8184

85+
@AssignReturned.ToReturned
8286
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
83-
public static void stopSpan(
87+
public static HttpClient stopSpan(
8488
@Advice.Thrown Throwable throwable,
85-
@Advice.Return(readOnly = false) HttpClient client,
86-
@Advice.Local("otelCallDepth") CallDepth callDepth) {
89+
@Advice.Return HttpClient client,
90+
@Advice.Enter CallDepth callDepth) {
8791

8892
if (callDepth.decrementAndGet() == 0 && throwable == null) {
89-
client = client.doOnRequest(new OnRequest()).mapConnect(new MapConnect());
93+
return client.doOnRequest(new OnRequest()).mapConnect(new MapConnect());
9094
}
95+
return client;
9196
}
9297
}
9398

9499
@SuppressWarnings("unused")
95100
public static class OnRequestAdvice {
96101

102+
@AssignReturned.ToArguments(@ToArgument(0))
97103
@Advice.OnMethodEnter(suppress = Throwable.class)
98-
public static void onEnter(
99-
@Advice.Argument(value = 0, readOnly = false)
100-
BiConsumer<? super HttpClientRequest, ? super Connection> callback) {
104+
public static BiConsumer<? super HttpClientRequest, ? super Connection> onEnter(
105+
@Advice.Argument(0) BiConsumer<? super HttpClientRequest, ? super Connection> callback) {
101106
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
102-
callback = new DecoratorFunctions.OnRequestDecorator(callback);
107+
return new DecoratorFunctions.OnRequestDecorator(callback);
103108
}
109+
return callback;
104110
}
105111
}
106112

107113
@SuppressWarnings("unused")
108114
public static class OnRequestErrorAdvice {
109115

116+
@AssignReturned.ToArguments(@ToArgument(0))
110117
@Advice.OnMethodEnter(suppress = Throwable.class)
111-
public static void onEnter(
112-
@Advice.Argument(value = 0, readOnly = false)
113-
BiConsumer<? super HttpClientRequest, ? super Throwable> callback) {
118+
public static BiConsumer<? super HttpClientRequest, ? super Throwable> onEnter(
119+
@Advice.Argument(0) BiConsumer<? super HttpClientRequest, ? super Throwable> callback) {
114120
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
115-
callback = new DecoratorFunctions.OnRequestErrorDecorator(callback);
121+
return new DecoratorFunctions.OnRequestErrorDecorator(callback);
116122
}
123+
return callback;
117124
}
118125
}
119126

120127
@SuppressWarnings("unused")
121128
public static class OnResponseAdvice {
122129

130+
@AssignReturned.ToArguments(@ToArgument(0))
123131
@Advice.OnMethodEnter(suppress = Throwable.class)
124-
public static void onEnter(
125-
@Advice.Argument(value = 0, readOnly = false)
126-
BiConsumer<? super HttpClientResponse, ? super Connection> callback,
132+
public static BiConsumer<? super HttpClientResponse, ? super Connection> onEnter(
133+
@Advice.Argument(0) BiConsumer<? super HttpClientResponse, ? super Connection> callback,
127134
@Advice.Origin("#m") String methodName) {
128135
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
129136
boolean forceParentContext = methodName.equals("doAfterResponse");
130-
callback = new DecoratorFunctions.OnResponseDecorator(callback, forceParentContext);
137+
return new DecoratorFunctions.OnResponseDecorator(callback, forceParentContext);
131138
}
139+
return callback;
132140
}
133141
}
134142

135143
@SuppressWarnings("unused")
136144
public static class OnResponseErrorAdvice {
137145

146+
@AssignReturned.ToArguments(@ToArgument(0))
138147
@Advice.OnMethodEnter(suppress = Throwable.class)
139-
public static void onEnter(
140-
@Advice.Argument(value = 0, readOnly = false)
141-
BiConsumer<? super HttpClientResponse, ? super Throwable> callback) {
148+
public static BiConsumer<? super HttpClientResponse, ? super Throwable> onEnter(
149+
@Advice.Argument(0) BiConsumer<? super HttpClientResponse, ? super Throwable> callback) {
142150
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
143-
callback = new DecoratorFunctions.OnResponseErrorDecorator(callback);
151+
return new DecoratorFunctions.OnResponseErrorDecorator(callback);
144152
}
153+
return callback;
145154
}
146155
}
147156

148157
@SuppressWarnings("unused")
149158
public static class OnErrorAdvice {
150159

160+
@AssignReturned.ToArguments({
161+
@ToArgument(value = 0, index = 0),
162+
@ToArgument(value = 1, index = 1)
163+
})
151164
@Advice.OnMethodEnter(suppress = Throwable.class)
152-
public static void onEnter(
153-
@Advice.Argument(value = 0, readOnly = false)
154-
BiConsumer<? super HttpClientRequest, ? super Throwable> requestCallback,
155-
@Advice.Argument(value = 1, readOnly = false)
156-
BiConsumer<? super HttpClientResponse, ? super Throwable> responseCallback) {
165+
public static Object[] onEnter(
166+
@Advice.Argument(0)
167+
BiConsumer<? super HttpClientRequest, ? super Throwable> originalRequestCallback,
168+
@Advice.Argument(1)
169+
BiConsumer<? super HttpClientResponse, ? super Throwable> originalResponseCallback) {
170+
171+
// intermediate variables needed for inlined instrumentation
172+
BiConsumer<? super HttpClientRequest, ? super Throwable> requestCallback =
173+
originalRequestCallback;
174+
BiConsumer<? super HttpClientResponse, ? super Throwable> responseCallback =
175+
originalResponseCallback;
176+
157177
if (DecoratorFunctions.shouldDecorate(requestCallback.getClass())) {
158178
requestCallback = new DecoratorFunctions.OnRequestErrorDecorator(requestCallback);
159179
}
160180
if (DecoratorFunctions.shouldDecorate(responseCallback.getClass())) {
161181
responseCallback = new DecoratorFunctions.OnResponseErrorDecorator(responseCallback);
162182
}
183+
return new Object[] {requestCallback, responseCallback};
163184
}
164185
}
165186
}

instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.google.auto.service.AutoService;
1212
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
1313
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
14+
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
1415
import java.util.List;
1516
import java.util.function.BiConsumer;
1617
import java.util.function.BiFunction;
@@ -24,7 +25,8 @@
2425
* HttpClient#doOnRequest(BiConsumer)} to pass context from the caller to Reactor to Netty.
2526
*/
2627
@AutoService(InstrumentationModule.class)
27-
public class ReactorNettyInstrumentationModule extends InstrumentationModule {
28+
public class ReactorNettyInstrumentationModule extends InstrumentationModule
29+
implements ExperimentalInstrumentationModule {
2830

2931
public ReactorNettyInstrumentationModule() {
3032
super("reactor-netty", "reactor-netty-0.9");
@@ -40,4 +42,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
4042
public List<TypeInstrumentation> typeInstrumentations() {
4143
return singletonList(new HttpClientInstrumentation());
4244
}
45+
46+
@Override
47+
public boolean isIndyReady() {
48+
return true;
49+
}
4350
}

0 commit comments

Comments
 (0)