diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java index 99b4861ac6ae..bcf6dd28acee 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -92,64 +93,85 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class ChannelMethodAdvice { - @Advice.OnMethodEnter - public static void onEnter( - @Advice.This Channel channel, - @Advice.Origin("Channel.#m") String method, - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelScope") Scope scope, - @Advice.Local("otelRequest") ChannelAndMethod request) { - callDepth = CallDepth.forClass(Channel.class); - if (callDepth.getAndIncrement() > 0) { - return; + public static class ChannelMethodAdviceScope { + private final CallDepth callDepth; + @Nullable private final Context context; + @Nullable private final Scope scope; + @Nullable private final ChannelAndMethod request; + + private ChannelMethodAdviceScope( + CallDepth callDepth, + @Nullable Context context, + @Nullable Scope scope, + @Nullable ChannelAndMethod request) { + this.callDepth = callDepth; + this.context = context; + this.scope = scope; + this.request = request; } - Context parentContext = Java8BytecodeBridge.currentContext(); - request = ChannelAndMethod.create(channel, method); + public static ChannelMethodAdviceScope start( + CallDepth callDepth, Channel channel, String method) { + if (callDepth.getAndIncrement() > 0) { + return new ChannelMethodAdviceScope(callDepth, null, null, null); + } - if (!channelInstrumenter(request).shouldStart(parentContext, request)) { - return; - } + Context parentContext = Context.current(); + ChannelAndMethod request = ChannelAndMethod.create(channel, method); - context = channelInstrumenter(request).start(parentContext, request); - CURRENT_RABBIT_CONTEXT.set(context); - helper().setChannelAndMethod(context, request); - scope = context.makeCurrent(); - } + if (!channelInstrumenter(request).shouldStart(parentContext, request)) { + return new ChannelMethodAdviceScope(callDepth, null, null, null); + } - @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void stopSpan( - @Advice.Thrown Throwable throwable, - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelScope") Scope scope, - @Advice.Local("otelRequest") ChannelAndMethod request) { - if (callDepth.decrementAndGet() > 0) { - return; + Context context = channelInstrumenter(request).start(parentContext, request); + CURRENT_RABBIT_CONTEXT.set(context); + helper().setChannelAndMethod(context, request); + + return new ChannelMethodAdviceScope(callDepth, context, context.makeCurrent(), request); } - if (scope == null) { - return; + + public void end(Throwable throwable) { + if (callDepth.decrementAndGet() > 0) { + return; + } + if (scope == null) { + return; + } + + scope.close(); + + CURRENT_RABBIT_CONTEXT.remove(); + channelInstrumenter(request).end(context, request, null, throwable); } + } - scope.close(); + @Advice.OnMethodEnter + public static ChannelMethodAdviceScope onEnter( + @Advice.This Channel channel, @Advice.Origin("Channel.#m") String method) { + return ChannelMethodAdviceScope.start(CallDepth.forClass(Channel.class), channel, method); + } - CURRENT_RABBIT_CONTEXT.remove(); - channelInstrumenter(request).end(context, request, null, throwable); + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan( + @Advice.Thrown @Nullable Throwable throwable, + @Advice.Enter ChannelMethodAdviceScope adviceScope) { + adviceScope.end(throwable); } } @SuppressWarnings("unused") public static class ChannelPublishAdvice { + @Advice.AssignReturned.ToArguments(@Advice.AssignReturned.ToArguments.ToArgument(4)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void setSpanNameAddHeaders( + public static AMQP.BasicProperties setSpanNameAddHeaders( @Advice.Argument(0) String exchange, @Advice.Argument(1) String routingKey, - @Advice.Argument(value = 4, readOnly = false) AMQP.BasicProperties props, + @Advice.Argument(4) AMQP.BasicProperties originalProps, @Advice.Argument(5) byte[] body) { Context context = Java8BytecodeBridge.currentContext(); Span span = Java8BytecodeBridge.spanFromContext(context); + AMQP.BasicProperties props = originalProps; if (span.getSpanContext().isValid()) { helper().onPublish(span, exchange, routingKey); @@ -187,19 +209,57 @@ public static void setSpanNameAddHeaders( props.getAppId(), props.getClusterId()); } + + return props; } } @SuppressWarnings("unused") public static class ChannelGetAdvice { + public static class ChannelGetAdviceScope { + private final CallDepth callDepth; + private final Timer timer; + + private ChannelGetAdviceScope(CallDepth callDepth, Timer timer) { + this.callDepth = callDepth; + this.timer = timer; + } + + public static ChannelGetAdviceScope start() { + CallDepth callDepth = CallDepth.forClass(Channel.class); + callDepth.getAndIncrement(); + Timer timer = Timer.start(); + return new ChannelGetAdviceScope(callDepth, timer); + } + + public void end(Channel channel, String queue, GetResponse response, Throwable throwable) { + if (callDepth.decrementAndGet() > 0) { + return; + } + + Context parentContext = Context.current(); + ReceiveRequest request = ReceiveRequest.create(queue, response, channel.getConnection()); + if (!receiveInstrumenter().shouldStart(parentContext, request)) { + return; + } + + // can't create span and put into scope in method enter above, because can't add parent + // after span creation + InstrumenterUtil.startAndEnd( + receiveInstrumenter(), + parentContext, + request, + null, + throwable, + timer.startTime(), + timer.now()); + } + } + @Advice.OnMethodEnter - public static void takeTimestamp( - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.Local("otelTimer") Timer timer) { - callDepth = CallDepth.forClass(Channel.class); - callDepth.getAndIncrement(); - timer = Timer.start(); + public static ChannelGetAdviceScope takeTimestamp() { + return ChannelGetAdviceScope.start(); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @@ -208,43 +268,26 @@ public static void extractAndStartSpan( @Advice.Argument(0) String queue, @Advice.Return GetResponse response, @Advice.Thrown Throwable throwable, - @Advice.Local("otelCallDepth") CallDepth callDepth, - @Advice.Local("otelTimer") Timer timer) { - if (callDepth.decrementAndGet() > 0) { - return; - } - - Context parentContext = Java8BytecodeBridge.currentContext(); - ReceiveRequest request = ReceiveRequest.create(queue, response, channel.getConnection()); - if (!receiveInstrumenter().shouldStart(parentContext, request)) { - return; - } - - // can't create span and put into scope in method enter above, because can't add parent after - // span creation - InstrumenterUtil.startAndEnd( - receiveInstrumenter(), - parentContext, - request, - null, - throwable, - timer.startTime(), - timer.now()); + @Advice.Enter ChannelGetAdviceScope adviceScope) { + adviceScope.end(channel, queue, response, throwable); } } @SuppressWarnings("unused") public static class ChannelConsumeAdvice { + @Advice.AssignReturned.ToArguments(@Advice.AssignReturned.ToArguments.ToArgument(6)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void wrapConsumer( + public static Object wrapConsumer( @Advice.This Channel channel, @Advice.Argument(0) String queue, - @Advice.Argument(value = 6, readOnly = false) Consumer consumer) { + @Advice.Argument(6) Consumer consumer) { // We have to save off the queue name here because it isn't available to the consumer later. if (consumer != null && !(consumer instanceof TracedDelegatingConsumer)) { - consumer = new TracedDelegatingConsumer(queue, consumer, channel.getConnection()); + return new TracedDelegatingConsumer(queue, consumer, channel.getConnection()); } + + return consumer; } } } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitMqInstrumentationModule.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitMqInstrumentationModule.java index 78d5c293cd8b..e1876017c0e2 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitMqInstrumentationModule.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitMqInstrumentationModule.java @@ -10,10 +10,12 @@ import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule; import java.util.List; @AutoService(InstrumentationModule.class) -public class RabbitMqInstrumentationModule extends InstrumentationModule { +public class RabbitMqInstrumentationModule extends InstrumentationModule + implements ExperimentalInstrumentationModule { public RabbitMqInstrumentationModule() { super("rabbitmq", "rabbitmq-2.7"); } @@ -22,4 +24,9 @@ public RabbitMqInstrumentationModule() { public List typeInstrumentations() { return asList(new RabbitChannelInstrumentation(), new RabbitCommandInstrumentation()); } + + @Override + public boolean isIndyReady() { + return true; + } }