Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.concurrent.CompletableFuture;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -137,12 +138,13 @@ public static Timer before() {
return Timer.start();
}

@AssignReturned.ToReturned
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void after(
@Advice.Enter Timer timer,
public static CompletableFuture<Message<?>> after(
@Advice.This Consumer<?> consumer,
@Advice.Return(readOnly = false) CompletableFuture<Message<?>> future) {
future = wrap(future, timer, consumer);
@Advice.Return CompletableFuture<Message<?>> future,
@Advice.Enter Timer timer) {
return wrap(future, timer, consumer);
}
}

Expand All @@ -154,12 +156,13 @@ public static Timer before() {
return Timer.start();
}

@AssignReturned.ToReturned
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void after(
@Advice.Enter Timer timer,
public static CompletableFuture<Messages<?>> after(
@Advice.This Consumer<?> consumer,
@Advice.Return(readOnly = false) CompletableFuture<Messages<?>> future) {
future = wrapBatch(future, timer, consumer);
@Advice.Return CompletableFuture<Messages<?>> future,
@Advice.Enter Timer timer) {
return wrapBatch(future, timer, consumer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.implementation.bytecode.assign.Assigner;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -44,16 +45,12 @@ public void transform(TypeTransformer transformer) {
@SuppressWarnings("unused")
public static class ConsumerConfigurationDataMethodAdvice {

@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class)
public static void after(
public static MessageListener<?> after(
@Advice.This ConsumerConfigurationData<?> data,
@Advice.Return(readOnly = false, typing = Assigner.Typing.DYNAMIC)
MessageListener<?> listener) {
if (listener == null) {
return;
}

listener = new MessageListenerWrapper<>(listener);
@Advice.Return(typing = Assigner.Typing.DYNAMIC) MessageListener<?> listener) {
return listener == null ? null : new MessageListenerWrapper<>(listener);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
import java.util.Arrays;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class PulsarInstrumentationModule extends InstrumentationModule {
public class PulsarInstrumentationModule extends InstrumentationModule
implements ExperimentalInstrumentationModule {
public PulsarInstrumentationModule() {
super("pulsar", "pulsar-2.8");
}
Expand All @@ -28,4 +30,9 @@ public List<TypeInstrumentation> typeInstrumentations() {
new SendCallbackInstrumentation(),
new TransactionImplInstrumentation());
}

@Override
public boolean isIndyReady() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -43,32 +44,47 @@ public void transform(TypeTransformer transformer) {
@SuppressWarnings("unused")
public static class SendCallbackSendCompleteAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This SendCallback callback,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("otelRequest") PulsarRequest request) {
// Extract the Context and PulsarRequest from the SendCallback instance.
SendCallbackData callBackData = VirtualFieldStore.extract(callback);
if (callBackData != null) {
// If the extraction was successful, store the Context and PulsarRequest in local variables.
otelContext = callBackData.context;
request = callBackData.request;
otelScope = otelContext.makeCurrent();
public static class AdviceScope {
private final PulsarRequest request;
private final Context context;
private final Scope scope;

private AdviceScope(PulsarRequest request, Context context, Scope scope) {
this.request = request;
this.context = context;
this.scope = scope;
}

@Nullable
public static AdviceScope start(SendCallback callback) {
// Extract the Context and PulsarRequest from the SendCallback instance.
SendCallbackData callBackData = VirtualFieldStore.extract(callback);
if (callBackData == null) {
return null;
}

Context context = callBackData.context;
return new AdviceScope(callBackData.request, context, context.makeCurrent());
}

public void end(@Nullable Throwable t) {
// Close the Scope and end the span.
scope.close();
producerInstrumenter().end(context, request, null, t);
}
}

@Nullable
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AdviceScope onEnter(@Advice.This SendCallback callback) {
return AdviceScope.start(callback);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Argument(0) Throwable t,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("otelRequest") PulsarRequest request) {
if (otelScope != null) {
// Close the Scope and end the span.
otelScope.close();
producerInstrumenter().end(otelContext, request, null, t);
@Advice.Argument(0) Throwable t, @Advice.Enter @Nullable AdviceScope adviceScope) {
if (adviceScope != null) {
adviceScope.end(t);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons;
import java.util.concurrent.CompletableFuture;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

Expand All @@ -37,9 +38,10 @@ public void transform(TypeTransformer transformer) {
@SuppressWarnings("unused")
public static class RegisterProducedTopicAdvice {

@AssignReturned.ToReturned
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void after(@Advice.Return(readOnly = false) CompletableFuture<Void> future) {
future = PulsarSingletons.wrap(future);
public static CompletableFuture<Void> after(@Advice.Return CompletableFuture<Void> future) {
return PulsarSingletons.wrap(future);
}
}
}