Skip to content

Commit dbc19b8

Browse files
committed
make pulsar indy-ready
1 parent c0db6ba commit dbc19b8

File tree

5 files changed

+65
-40
lines changed

5 files changed

+65
-40
lines changed

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
2323
import java.util.concurrent.CompletableFuture;
2424
import net.bytebuddy.asm.Advice;
25+
import net.bytebuddy.asm.Advice.AssignReturned;
2526
import net.bytebuddy.description.type.TypeDescription;
2627
import net.bytebuddy.matcher.ElementMatcher;
2728
import org.apache.pulsar.client.api.Consumer;
@@ -137,12 +138,13 @@ public static Timer before() {
137138
return Timer.start();
138139
}
139140

141+
@AssignReturned.ToReturned
140142
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
141-
public static void after(
142-
@Advice.Enter Timer timer,
143+
public static CompletableFuture<Message<?>> after(
143144
@Advice.This Consumer<?> consumer,
144-
@Advice.Return(readOnly = false) CompletableFuture<Message<?>> future) {
145-
future = wrap(future, timer, consumer);
145+
@Advice.Return CompletableFuture<Message<?>> future,
146+
@Advice.Enter Timer timer) {
147+
return wrap(future, timer, consumer);
146148
}
147149
}
148150

@@ -154,12 +156,13 @@ public static Timer before() {
154156
return Timer.start();
155157
}
156158

159+
@AssignReturned.ToReturned
157160
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
158-
public static void after(
159-
@Advice.Enter Timer timer,
161+
public static CompletableFuture<Messages<?>> after(
160162
@Advice.This Consumer<?> consumer,
161-
@Advice.Return(readOnly = false) CompletableFuture<Messages<?>> future) {
162-
future = wrapBatch(future, timer, consumer);
163+
@Advice.Return CompletableFuture<Messages<?>> future,
164+
@Advice.Enter Timer timer) {
165+
return wrapBatch(future, timer, consumer);
163166
}
164167
}
165168
}

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageListenerInstrumentation.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
1818
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest;
1919
import net.bytebuddy.asm.Advice;
20+
import net.bytebuddy.asm.Advice.AssignReturned;
2021
import net.bytebuddy.description.type.TypeDescription;
2122
import net.bytebuddy.implementation.bytecode.assign.Assigner;
2223
import net.bytebuddy.matcher.ElementMatcher;
@@ -44,16 +45,12 @@ public void transform(TypeTransformer transformer) {
4445
@SuppressWarnings("unused")
4546
public static class ConsumerConfigurationDataMethodAdvice {
4647

48+
@AssignReturned.ToReturned
4749
@Advice.OnMethodExit(suppress = Throwable.class)
48-
public static void after(
50+
public static MessageListener<?> after(
4951
@Advice.This ConsumerConfigurationData<?> data,
50-
@Advice.Return(readOnly = false, typing = Assigner.Typing.DYNAMIC)
51-
MessageListener<?> listener) {
52-
if (listener == null) {
53-
return;
54-
}
55-
56-
listener = new MessageListenerWrapper<>(listener);
52+
@Advice.Return(typing = Assigner.Typing.DYNAMIC) MessageListener<?> listener) {
53+
return listener == null ? null : new MessageListenerWrapper<>(listener);
5754
}
5855
}
5956

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88
import com.google.auto.service.AutoService;
99
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
1010
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
11+
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
1112
import java.util.Arrays;
1213
import java.util.List;
1314

1415
@AutoService(InstrumentationModule.class)
15-
public class PulsarInstrumentationModule extends InstrumentationModule {
16+
public class PulsarInstrumentationModule extends InstrumentationModule
17+
implements ExperimentalInstrumentationModule {
1618
public PulsarInstrumentationModule() {
1719
super("pulsar", "pulsar-2.8");
1820
}
@@ -28,4 +30,9 @@ public List<TypeInstrumentation> typeInstrumentations() {
2830
new SendCallbackInstrumentation(),
2931
new TransactionImplInstrumentation());
3032
}
33+
34+
@Override
35+
public boolean isIndyReady() {
36+
return true;
37+
}
3138
}

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
1717
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
1818
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest;
19+
import javax.annotation.Nullable;
1920
import net.bytebuddy.asm.Advice;
2021
import net.bytebuddy.description.type.TypeDescription;
2122
import net.bytebuddy.matcher.ElementMatcher;
@@ -43,32 +44,47 @@ public void transform(TypeTransformer transformer) {
4344
@SuppressWarnings("unused")
4445
public static class SendCallbackSendCompleteAdvice {
4546

46-
@Advice.OnMethodEnter(suppress = Throwable.class)
47-
public static void onEnter(
48-
@Advice.This SendCallback callback,
49-
@Advice.Local("otelContext") Context otelContext,
50-
@Advice.Local("otelScope") Scope otelScope,
51-
@Advice.Local("otelRequest") PulsarRequest request) {
52-
// Extract the Context and PulsarRequest from the SendCallback instance.
53-
SendCallbackData callBackData = VirtualFieldStore.extract(callback);
54-
if (callBackData != null) {
55-
// If the extraction was successful, store the Context and PulsarRequest in local variables.
56-
otelContext = callBackData.context;
57-
request = callBackData.request;
58-
otelScope = otelContext.makeCurrent();
47+
public static class AdviceScope {
48+
private final PulsarRequest request;
49+
private final Context context;
50+
private final Scope scope;
51+
52+
private AdviceScope(PulsarRequest request, Context context, Scope scope) {
53+
this.request = request;
54+
this.context = context;
55+
this.scope = scope;
56+
}
57+
58+
@Nullable
59+
public static AdviceScope start(SendCallback callback) {
60+
// Extract the Context and PulsarRequest from the SendCallback instance.
61+
SendCallbackData callBackData = VirtualFieldStore.extract(callback);
62+
if (callBackData == null) {
63+
return null;
64+
}
65+
66+
Context context = callBackData.context;
67+
return new AdviceScope(callBackData.request, context, context.makeCurrent());
68+
}
69+
70+
public void end(@Nullable Throwable t) {
71+
// Close the Scope and end the span.
72+
scope.close();
73+
producerInstrumenter().end(context, request, null, t);
5974
}
6075
}
6176

77+
@Nullable
78+
@Advice.OnMethodEnter(suppress = Throwable.class)
79+
public static AdviceScope onEnter(@Advice.This SendCallback callback) {
80+
return AdviceScope.start(callback);
81+
}
82+
6283
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
6384
public static void onExit(
64-
@Advice.Argument(0) Throwable t,
65-
@Advice.Local("otelContext") Context otelContext,
66-
@Advice.Local("otelScope") Scope otelScope,
67-
@Advice.Local("otelRequest") PulsarRequest request) {
68-
if (otelScope != null) {
69-
// Close the Scope and end the span.
70-
otelScope.close();
71-
producerInstrumenter().end(otelContext, request, null, t);
85+
@Advice.Argument(0) Throwable t, @Advice.Enter @Nullable AdviceScope adviceScope) {
86+
if (adviceScope != null) {
87+
adviceScope.end(t);
7288
}
7389
}
7490
}

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons;
1616
import java.util.concurrent.CompletableFuture;
1717
import net.bytebuddy.asm.Advice;
18+
import net.bytebuddy.asm.Advice.AssignReturned;
1819
import net.bytebuddy.description.type.TypeDescription;
1920
import net.bytebuddy.matcher.ElementMatcher;
2021

@@ -37,9 +38,10 @@ public void transform(TypeTransformer transformer) {
3738
@SuppressWarnings("unused")
3839
public static class RegisterProducedTopicAdvice {
3940

41+
@AssignReturned.ToReturned
4042
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
41-
public static void after(@Advice.Return(readOnly = false) CompletableFuture<Void> future) {
42-
future = PulsarSingletons.wrap(future);
43+
public static CompletableFuture<Void> after(@Advice.Return CompletableFuture<Void> future) {
44+
return PulsarSingletons.wrap(future);
4345
}
4446
}
4547
}

0 commit comments

Comments
 (0)