Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsRequest;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

Expand Down Expand Up @@ -118,41 +121,54 @@ public static boolean onEnter(
@SuppressWarnings("unused")
public static class PublishReplyToHeadersBodyAdvice {

public static class AdviceScope {
private final NatsRequest request;
private final Context context;
private final Scope scope;

private AdviceScope(NatsRequest request, Context context, Scope scope) {
this.request = request;
this.context = context;
this.scope = scope;
}

@Nullable
public static AdviceScope start(NatsRequest natsRequest) {
Context parentContext = Context.current();
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
return null;
}
Context context = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
return new AdviceScope(natsRequest, context, context.makeCurrent());
}

public void end(@Nullable Throwable throwable) {
scope.close();
PRODUCER_INSTRUMENTER.end(context, request, null, throwable);
}
}

@AssignReturned.ToArguments(@ToArgument(value = 2, index = 1))
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
public static Object[] onEnter(
@Advice.This Connection connection,
@Advice.Argument(0) String subject,
@Advice.Argument(1) String replyTo,
@Advice.Argument(value = 2, readOnly = false) Headers headers,
@Advice.Argument(3) byte[] body,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("natsRequest") NatsRequest natsRequest) {
headers = NatsMessageWritableHeaders.create(headers);

Context parentContext = Context.current();
natsRequest = NatsRequest.create(connection, subject, replyTo, headers, body);

if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
return;
}

otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
otelScope = otelContext.makeCurrent();
@Advice.Argument(2) Headers originalHeaders,
@Advice.Argument(3) byte[] body) {
Headers headers = NatsMessageWritableHeaders.create(originalHeaders);
NatsRequest natsRequest = NatsRequest.create(connection, subject, replyTo, headers, body);
AdviceScope adviceScope = AdviceScope.start(natsRequest);
return new Object[] {adviceScope, headers};
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("natsRequest") NatsRequest natsRequest) {
if (otelScope == null) {
return;
@Advice.Thrown @Nullable Throwable throwable, @Advice.Enter Object[] enterResult) {
AdviceScope adviceScope = (AdviceScope) enterResult[0];
if (adviceScope != null) {
adviceScope.end(throwable);
}

otelScope.close();
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
}
}

Expand Down
Loading
Loading