Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -92,64 +93,85 @@ public void transform(TypeTransformer transformer) {
@SuppressWarnings("unused")
public static class ChannelMethodAdvice {

@Advice.OnMethodEnter
public static void onEnter(
@Advice.This Channel channel,
@Advice.Origin("Channel.#m") String method,
@Advice.Local("otelCallDepth") CallDepth callDepth,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Local("otelRequest") ChannelAndMethod request) {
callDepth = CallDepth.forClass(Channel.class);
if (callDepth.getAndIncrement() > 0) {
return;
public static class ChannelMethodAdviceScope {
private final CallDepth callDepth;
@Nullable private final Context context;
@Nullable private final Scope scope;
@Nullable private final ChannelAndMethod request;

private ChannelMethodAdviceScope(
CallDepth callDepth,
@Nullable Context context,
@Nullable Scope scope,
@Nullable ChannelAndMethod request) {
this.callDepth = callDepth;
this.context = context;
this.scope = scope;
this.request = request;
}

Context parentContext = Java8BytecodeBridge.currentContext();
request = ChannelAndMethod.create(channel, method);
public static ChannelMethodAdviceScope start(Channel channel, String method) {
CallDepth callDepth = CallDepth.forClass(Channel.class);
if (callDepth.getAndIncrement() > 0) {
return new ChannelMethodAdviceScope(callDepth, null, null, null);
}

if (!channelInstrumenter(request).shouldStart(parentContext, request)) {
return;
}
Context parentContext = Context.current();
ChannelAndMethod request = ChannelAndMethod.create(channel, method);

context = channelInstrumenter(request).start(parentContext, request);
CURRENT_RABBIT_CONTEXT.set(context);
helper().setChannelAndMethod(context, request);
scope = context.makeCurrent();
}
if (!channelInstrumenter(request).shouldStart(parentContext, request)) {
return new ChannelMethodAdviceScope(callDepth, null, null, null);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelCallDepth") CallDepth callDepth,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Local("otelRequest") ChannelAndMethod request) {
if (callDepth.decrementAndGet() > 0) {
return;
Context context = channelInstrumenter(request).start(parentContext, request);
CURRENT_RABBIT_CONTEXT.set(context);
helper().setChannelAndMethod(context, request);

return new ChannelMethodAdviceScope(callDepth, context, context.makeCurrent(), request);
}
if (scope == null) {
return;

public void end(Throwable throwable) {
if (callDepth.decrementAndGet() > 0) {
return;
}
if (scope == null) {
return;
}

scope.close();

CURRENT_RABBIT_CONTEXT.remove();
channelInstrumenter(request).end(context, request, null, throwable);
}
}

scope.close();
@Advice.OnMethodEnter
public static ChannelMethodAdviceScope onEnter(
@Advice.This Channel channel, @Advice.Origin("Channel.#m") String method) {
return ChannelMethodAdviceScope.start(channel, method);
}

CURRENT_RABBIT_CONTEXT.remove();
channelInstrumenter(request).end(context, request, null, throwable);
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown @Nullable Throwable throwable,
@Advice.Enter ChannelMethodAdviceScope adviceScope) {
adviceScope.end(throwable);
}
}

@SuppressWarnings("unused")
public static class ChannelPublishAdvice {

@Advice.AssignReturned.ToArguments(@Advice.AssignReturned.ToArguments.ToArgument(4))
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void setSpanNameAddHeaders(
public static Object setSpanNameAddHeaders(
@Advice.Argument(0) String exchange,
@Advice.Argument(1) String routingKey,
@Advice.Argument(value = 4, readOnly = false) AMQP.BasicProperties props,
@Advice.Argument(4) AMQP.BasicProperties props,
@Advice.Argument(5) byte[] body) {
Context context = Java8BytecodeBridge.currentContext();
Span span = Java8BytecodeBridge.spanFromContext(context);
AMQP.BasicProperties modifiedProps = props;
Copy link
Contributor

@laurit laurit Oct 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would have taken less renaming if you had renamed the method argument to originalProps and here used AMQP.BasicProperties props = originalProps;. I know that with inline advice you can't reassign read only arguments. Just curious did it fail with the original code when props was reassigned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed props to originalProps.

I know that with inline advice you can't reassign read only arguments. Just curious did it fail with the original code when props was reassigned?

I don't test advices that use readonly, always change them first, given the known issues with those for inline advice. However, I just did a quick test and I got this error:

java.lang.IllegalStateException: Cannot define writable field access for com.rabbitmq.client.AMQP$BasicProperties arg2 when using delegation
	at net.bytebuddy.asm.Advice$OffsetMapping$ForArgument$Unresolved$Factory.make(Advice.java:1800)
	at net.bytebuddy.asm.Advice$Dispatcher$Resolved$AbstractBase.<init>(Advice.java:9101)
	at net.bytebuddy.asm.Advice$Dispatcher$Delegating$Resolved.<init>(Advice.java:10852)
	at net.bytebuddy.asm.Advice$Dispatcher$Delegating$Resolved$ForMethodEnter.<init>(Advice.java:11295)
	at net.bytebuddy.asm.Advice$Dispatcher$Delegating$Resolved$ForMethodEnter$WithDiscardedEnterType.<init>(Advice.java:11482)
	at net.bytebuddy.asm.Advice$Dispatcher$Delegating$Resolved$ForMethodEnter.of(Advice.java:11338)
	at net.bytebuddy.asm.Advice$Dispatcher$Delegating.asMethodEnter(Advice.java:10782)
	at net.bytebuddy.asm.Advice.to(Advice.java:378)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it out and got a slightly different exception. My understanding is that writing to method parameters is fine with non-inline advice. The advice code runs in a separate class and even when you change the value for the local parameter it isn't reflected in the original method. Where things break is when the same advice is applied as inline advice. There byte-buddy will detect the attempt to change the value of the method parameter and reject it because the parameter is read only.


if (span.getSpanContext().isValid()) {
helper().onPublish(span, exchange, routingKey);
Expand All @@ -159,47 +181,86 @@ public static void setSpanNameAddHeaders(
}

// This is the internal behavior when props are null. We're just doing it earlier now.
if (props == null) {
props = MessageProperties.MINIMAL_BASIC;
if (modifiedProps == null) {
modifiedProps = MessageProperties.MINIMAL_BASIC;
}
helper().onProps(context, span, props);
helper().onProps(context, span, modifiedProps);

// We need to copy the BasicProperties and provide a header map we can modify
Map<String, Object> headers = props.getHeaders();
Map<String, Object> headers = modifiedProps.getHeaders();
headers = (headers == null) ? new HashMap<>() : new HashMap<>(headers);

helper().inject(context, headers, MapSetter.INSTANCE);

props =
modifiedProps =
new AMQP.BasicProperties(
props.getContentType(),
props.getContentEncoding(),
modifiedProps.getContentType(),
modifiedProps.getContentEncoding(),
headers,
props.getDeliveryMode(),
props.getPriority(),
props.getCorrelationId(),
props.getReplyTo(),
props.getExpiration(),
props.getMessageId(),
props.getTimestamp(),
props.getType(),
props.getUserId(),
props.getAppId(),
props.getClusterId());
modifiedProps.getDeliveryMode(),
modifiedProps.getPriority(),
modifiedProps.getCorrelationId(),
modifiedProps.getReplyTo(),
modifiedProps.getExpiration(),
modifiedProps.getMessageId(),
modifiedProps.getTimestamp(),
modifiedProps.getType(),
modifiedProps.getUserId(),
modifiedProps.getAppId(),
modifiedProps.getClusterId());
}

return modifiedProps;
}
}

@SuppressWarnings("unused")
public static class ChannelGetAdvice {

public static class ChannelGetAdviceScope {
private final CallDepth callDepth;
private final Timer timer;

private ChannelGetAdviceScope(CallDepth callDepth, Timer timer) {
this.callDepth = callDepth;
this.timer = timer;
}

public static ChannelGetAdviceScope start() {
CallDepth callDepth = CallDepth.forClass(Channel.class);
callDepth.getAndIncrement();
Timer timer = Timer.start();
return new ChannelGetAdviceScope(callDepth, timer);
}

public void end(Channel channel, String queue, GetResponse response, Throwable throwable) {
if (callDepth.decrementAndGet() > 0) {
return;
}

Context parentContext = Context.current();
ReceiveRequest request = ReceiveRequest.create(queue, response, channel.getConnection());
if (!receiveInstrumenter().shouldStart(parentContext, request)) {
return;
}

// can't create span and put into scope in method enter above, because can't add parent
// after
// span creation
InstrumenterUtil.startAndEnd(
receiveInstrumenter(),
parentContext,
request,
null,
throwable,
timer.startTime(),
timer.now());
}
}

@Advice.OnMethodEnter
public static void takeTimestamp(
@Advice.Local("otelCallDepth") CallDepth callDepth,
@Advice.Local("otelTimer") Timer timer) {
callDepth = CallDepth.forClass(Channel.class);
callDepth.getAndIncrement();
timer = Timer.start();
public static ChannelGetAdviceScope takeTimestamp() {
return ChannelGetAdviceScope.start();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand All @@ -208,43 +269,28 @@ public static void extractAndStartSpan(
@Advice.Argument(0) String queue,
@Advice.Return GetResponse response,
@Advice.Thrown Throwable throwable,
@Advice.Local("otelCallDepth") CallDepth callDepth,
@Advice.Local("otelTimer") Timer timer) {
if (callDepth.decrementAndGet() > 0) {
return;
}

Context parentContext = Java8BytecodeBridge.currentContext();
ReceiveRequest request = ReceiveRequest.create(queue, response, channel.getConnection());
if (!receiveInstrumenter().shouldStart(parentContext, request)) {
return;
}

// can't create span and put into scope in method enter above, because can't add parent after
// span creation
InstrumenterUtil.startAndEnd(
receiveInstrumenter(),
parentContext,
request,
null,
throwable,
timer.startTime(),
timer.now());
@Advice.Enter ChannelGetAdviceScope adviceScope) {
adviceScope.end(channel, queue, response, throwable);
}
}

@SuppressWarnings("unused")
public static class ChannelConsumeAdvice {

@Advice.AssignReturned.ToArguments(@Advice.AssignReturned.ToArguments.ToArgument(6))
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapConsumer(
public static Object wrapConsumer(
@Advice.This Channel channel,
@Advice.Argument(0) String queue,
@Advice.Argument(value = 6, readOnly = false) Consumer consumer) {
@Advice.Argument(6) Consumer consumer) {
// We have to save off the queue name here because it isn't available to the consumer later.
if (consumer != null && !(consumer instanceof TracedDelegatingConsumer)) {
consumer = new TracedDelegatingConsumer(queue, consumer, channel.getConnection());
Consumer modifiedConsumer = consumer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't really need this local, could just return the new value from within the if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I've just added the changes.

if (modifiedConsumer != null && !(modifiedConsumer instanceof TracedDelegatingConsumer)) {
modifiedConsumer =
new TracedDelegatingConsumer(queue, modifiedConsumer, channel.getConnection());
}

return modifiedConsumer;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class RabbitMqInstrumentationModule extends InstrumentationModule {
public class RabbitMqInstrumentationModule extends InstrumentationModule
implements ExperimentalInstrumentationModule {
public RabbitMqInstrumentationModule() {
super("rabbitmq", "rabbitmq-2.7");
}
Expand All @@ -22,4 +24,9 @@ public RabbitMqInstrumentationModule() {
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new RabbitChannelInstrumentation(), new RabbitCommandInstrumentation());
}

@Override
public boolean isIndyReady() {
return true;
}
}
Loading