diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java index 0919440dc11e..3f60d5d4675a 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java @@ -22,6 +22,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.concurrent.CompletableFuture; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import org.apache.pulsar.client.api.Consumer; @@ -137,12 +138,13 @@ public static Timer before() { return Timer.start(); } + @AssignReturned.ToReturned @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void after( - @Advice.Enter Timer timer, + public static CompletableFuture> after( @Advice.This Consumer consumer, - @Advice.Return(readOnly = false) CompletableFuture> future) { - future = wrap(future, timer, consumer); + @Advice.Return CompletableFuture> future, + @Advice.Enter Timer timer) { + return wrap(future, timer, consumer); } } @@ -154,12 +156,13 @@ public static Timer before() { return Timer.start(); } + @AssignReturned.ToReturned @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void after( - @Advice.Enter Timer timer, + public static CompletableFuture> after( @Advice.This Consumer consumer, - @Advice.Return(readOnly = false) CompletableFuture> future) { - future = wrapBatch(future, timer, consumer); + @Advice.Return CompletableFuture> future, + @Advice.Enter Timer timer) { + return wrapBatch(future, timer, consumer); } } } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageListenerInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageListenerInstrumentation.java index d0b8dfe6a8ff..48015630befa 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageListenerInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageListenerInstrumentation.java @@ -17,6 +17,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.implementation.bytecode.assign.Assigner; import net.bytebuddy.matcher.ElementMatcher; @@ -44,16 +45,12 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class ConsumerConfigurationDataMethodAdvice { + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void after( + public static MessageListener after( @Advice.This ConsumerConfigurationData data, - @Advice.Return(readOnly = false, typing = Assigner.Typing.DYNAMIC) - MessageListener listener) { - if (listener == null) { - return; - } - - listener = new MessageListenerWrapper<>(listener); + @Advice.Return(typing = Assigner.Typing.DYNAMIC) MessageListener listener) { + return listener == null ? null : new MessageListenerWrapper<>(listener); } } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java index 06a5ab553cde..9a8018e3770d 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java @@ -8,11 +8,13 @@ import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule; import java.util.Arrays; import java.util.List; @AutoService(InstrumentationModule.class) -public class PulsarInstrumentationModule extends InstrumentationModule { +public class PulsarInstrumentationModule extends InstrumentationModule + implements ExperimentalInstrumentationModule { public PulsarInstrumentationModule() { super("pulsar", "pulsar-2.8"); } @@ -28,4 +30,9 @@ public List typeInstrumentations() { new SendCallbackInstrumentation(), new TransactionImplInstrumentation()); } + + @Override + public boolean isIndyReady() { + return true; + } } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java index 85295042e335..8249ebb36ed3 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java @@ -16,6 +16,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; +import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -43,32 +44,47 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class SendCallbackSendCompleteAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.This SendCallback callback, - @Advice.Local("otelContext") Context otelContext, - @Advice.Local("otelScope") Scope otelScope, - @Advice.Local("otelRequest") PulsarRequest request) { - // Extract the Context and PulsarRequest from the SendCallback instance. - SendCallbackData callBackData = VirtualFieldStore.extract(callback); - if (callBackData != null) { - // If the extraction was successful, store the Context and PulsarRequest in local variables. - otelContext = callBackData.context; - request = callBackData.request; - otelScope = otelContext.makeCurrent(); + public static class AdviceScope { + private final PulsarRequest request; + private final Context context; + private final Scope scope; + + private AdviceScope(PulsarRequest request, Context context, Scope scope) { + this.request = request; + this.context = context; + this.scope = scope; + } + + @Nullable + public static AdviceScope start(SendCallback callback) { + // Extract the Context and PulsarRequest from the SendCallback instance. + SendCallbackData callBackData = VirtualFieldStore.extract(callback); + if (callBackData == null) { + return null; + } + + Context context = callBackData.context; + return new AdviceScope(callBackData.request, context, context.makeCurrent()); + } + + public void end(@Nullable Throwable t) { + // Close the Scope and end the span. + scope.close(); + producerInstrumenter().end(context, request, null, t); } } + @Nullable + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AdviceScope onEnter(@Advice.This SendCallback callback) { + return AdviceScope.start(callback); + } + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void onExit( - @Advice.Argument(0) Throwable t, - @Advice.Local("otelContext") Context otelContext, - @Advice.Local("otelScope") Scope otelScope, - @Advice.Local("otelRequest") PulsarRequest request) { - if (otelScope != null) { - // Close the Scope and end the span. - otelScope.close(); - producerInstrumenter().end(otelContext, request, null, t); + @Advice.Argument(0) Throwable t, @Advice.Enter @Nullable AdviceScope adviceScope) { + if (adviceScope != null) { + adviceScope.end(t); } } } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java index 15684a75fb53..9772cf84afb4 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java @@ -15,6 +15,7 @@ import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons; import java.util.concurrent.CompletableFuture; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -37,9 +38,10 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class RegisterProducedTopicAdvice { + @AssignReturned.ToReturned @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void after(@Advice.Return(readOnly = false) CompletableFuture future) { - future = PulsarSingletons.wrap(future); + public static CompletableFuture after(@Advice.Return CompletableFuture future) { + return PulsarSingletons.wrap(future); } } }