Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -92,64 +93,85 @@ public void transform(TypeTransformer transformer) {
@SuppressWarnings("unused")
public static class ChannelMethodAdvice {

@Advice.OnMethodEnter
public static void onEnter(
@Advice.This Channel channel,
@Advice.Origin("Channel.#m") String method,
@Advice.Local("otelCallDepth") CallDepth callDepth,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Local("otelRequest") ChannelAndMethod request) {
callDepth = CallDepth.forClass(Channel.class);
if (callDepth.getAndIncrement() > 0) {
return;
public static class ChannelMethodAdviceScope {
private final CallDepth callDepth;
@Nullable private final Context context;
@Nullable private final Scope scope;
@Nullable private final ChannelAndMethod request;

private ChannelMethodAdviceScope(
CallDepth callDepth,
@Nullable Context context,
@Nullable Scope scope,
@Nullable ChannelAndMethod request) {
this.callDepth = callDepth;
this.context = context;
this.scope = scope;
this.request = request;
}

Context parentContext = Java8BytecodeBridge.currentContext();
request = ChannelAndMethod.create(channel, method);
public static ChannelMethodAdviceScope start(
CallDepth callDepth, Channel channel, String method) {
if (callDepth.getAndIncrement() > 0) {
return new ChannelMethodAdviceScope(callDepth, null, null, null);
}

if (!channelInstrumenter(request).shouldStart(parentContext, request)) {
return;
}
Context parentContext = Context.current();
ChannelAndMethod request = ChannelAndMethod.create(channel, method);

context = channelInstrumenter(request).start(parentContext, request);
CURRENT_RABBIT_CONTEXT.set(context);
helper().setChannelAndMethod(context, request);
scope = context.makeCurrent();
}
if (!channelInstrumenter(request).shouldStart(parentContext, request)) {
return new ChannelMethodAdviceScope(callDepth, null, null, null);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelCallDepth") CallDepth callDepth,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Local("otelRequest") ChannelAndMethod request) {
if (callDepth.decrementAndGet() > 0) {
return;
Context context = channelInstrumenter(request).start(parentContext, request);
CURRENT_RABBIT_CONTEXT.set(context);
helper().setChannelAndMethod(context, request);

return new ChannelMethodAdviceScope(callDepth, context, context.makeCurrent(), request);
}
if (scope == null) {
return;

public void end(Throwable throwable) {
if (callDepth.decrementAndGet() > 0) {
return;
}
if (scope == null) {
return;
}

scope.close();

CURRENT_RABBIT_CONTEXT.remove();
channelInstrumenter(request).end(context, request, null, throwable);
}
}

scope.close();
@Advice.OnMethodEnter
public static ChannelMethodAdviceScope onEnter(
@Advice.This Channel channel, @Advice.Origin("Channel.#m") String method) {
return ChannelMethodAdviceScope.start(CallDepth.forClass(Channel.class), channel, method);
}

CURRENT_RABBIT_CONTEXT.remove();
channelInstrumenter(request).end(context, request, null, throwable);
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown @Nullable Throwable throwable,
@Advice.Enter ChannelMethodAdviceScope adviceScope) {
adviceScope.end(throwable);
}
}

@SuppressWarnings("unused")
public static class ChannelPublishAdvice {

@Advice.AssignReturned.ToArguments(@Advice.AssignReturned.ToArguments.ToArgument(4))
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void setSpanNameAddHeaders(
public static AMQP.BasicProperties setSpanNameAddHeaders(
@Advice.Argument(0) String exchange,
@Advice.Argument(1) String routingKey,
@Advice.Argument(value = 4, readOnly = false) AMQP.BasicProperties props,
@Advice.Argument(4) AMQP.BasicProperties originalProps,
@Advice.Argument(5) byte[] body) {
Context context = Java8BytecodeBridge.currentContext();
Span span = Java8BytecodeBridge.spanFromContext(context);
AMQP.BasicProperties props = originalProps;

if (span.getSpanContext().isValid()) {
helper().onPublish(span, exchange, routingKey);
Expand Down Expand Up @@ -187,19 +209,57 @@ public static void setSpanNameAddHeaders(
props.getAppId(),
props.getClusterId());
}

return props;
}
}

@SuppressWarnings("unused")
public static class ChannelGetAdvice {

public static class ChannelGetAdviceScope {
private final CallDepth callDepth;
private final Timer timer;

private ChannelGetAdviceScope(CallDepth callDepth, Timer timer) {
this.callDepth = callDepth;
this.timer = timer;
}

public static ChannelGetAdviceScope start() {
CallDepth callDepth = CallDepth.forClass(Channel.class);
callDepth.getAndIncrement();
Timer timer = Timer.start();
return new ChannelGetAdviceScope(callDepth, timer);
}

public void end(Channel channel, String queue, GetResponse response, Throwable throwable) {
if (callDepth.decrementAndGet() > 0) {
return;
}

Context parentContext = Context.current();
ReceiveRequest request = ReceiveRequest.create(queue, response, channel.getConnection());
if (!receiveInstrumenter().shouldStart(parentContext, request)) {
return;
}

// can't create span and put into scope in method enter above, because can't add parent
// after span creation
InstrumenterUtil.startAndEnd(
receiveInstrumenter(),
parentContext,
request,
null,
throwable,
timer.startTime(),
timer.now());
}
}

@Advice.OnMethodEnter
public static void takeTimestamp(
@Advice.Local("otelCallDepth") CallDepth callDepth,
@Advice.Local("otelTimer") Timer timer) {
callDepth = CallDepth.forClass(Channel.class);
callDepth.getAndIncrement();
timer = Timer.start();
public static ChannelGetAdviceScope takeTimestamp() {
return ChannelGetAdviceScope.start();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand All @@ -208,43 +268,26 @@ public static void extractAndStartSpan(
@Advice.Argument(0) String queue,
@Advice.Return GetResponse response,
@Advice.Thrown Throwable throwable,
@Advice.Local("otelCallDepth") CallDepth callDepth,
@Advice.Local("otelTimer") Timer timer) {
if (callDepth.decrementAndGet() > 0) {
return;
}

Context parentContext = Java8BytecodeBridge.currentContext();
ReceiveRequest request = ReceiveRequest.create(queue, response, channel.getConnection());
if (!receiveInstrumenter().shouldStart(parentContext, request)) {
return;
}

// can't create span and put into scope in method enter above, because can't add parent after
// span creation
InstrumenterUtil.startAndEnd(
receiveInstrumenter(),
parentContext,
request,
null,
throwable,
timer.startTime(),
timer.now());
@Advice.Enter ChannelGetAdviceScope adviceScope) {
adviceScope.end(channel, queue, response, throwable);
}
}

@SuppressWarnings("unused")
public static class ChannelConsumeAdvice {

@Advice.AssignReturned.ToArguments(@Advice.AssignReturned.ToArguments.ToArgument(6))
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapConsumer(
public static Object wrapConsumer(
@Advice.This Channel channel,
@Advice.Argument(0) String queue,
@Advice.Argument(value = 6, readOnly = false) Consumer consumer) {
@Advice.Argument(6) Consumer consumer) {
// We have to save off the queue name here because it isn't available to the consumer later.
if (consumer != null && !(consumer instanceof TracedDelegatingConsumer)) {
consumer = new TracedDelegatingConsumer(queue, consumer, channel.getConnection());
return new TracedDelegatingConsumer(queue, consumer, channel.getConnection());
}

return consumer;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class RabbitMqInstrumentationModule extends InstrumentationModule {
public class RabbitMqInstrumentationModule extends InstrumentationModule
implements ExperimentalInstrumentationModule {
public RabbitMqInstrumentationModule() {
super("rabbitmq", "rabbitmq-2.7");
}
Expand All @@ -22,4 +24,9 @@ public RabbitMqInstrumentationModule() {
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new RabbitChannelInstrumentation(), new RabbitCommandInstrumentation());
}

@Override
public boolean isIndyReady() {
return true;
}
}
Loading