Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.opentelemetryapi.context.AgentContextStorage;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

Expand Down Expand Up @@ -69,14 +70,13 @@ public static boolean methodEnter() {
return false;
}

@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class)
public static void methodExit(
public static reactor.util.context.Context methodExit(
@Advice.Argument(0) reactor.util.context.Context reactorContext,
@Advice.Argument(1) Context applicationContext,
@Advice.Return(readOnly = false) reactor.util.context.Context updatedReactorContext) {
updatedReactorContext =
ContextPropagationOperator.storeOpenTelemetryContext(
reactorContext, AgentContextStorage.getAgentContext(applicationContext));
@Advice.Argument(1) Context applicationContext) {
return ContextPropagationOperator.storeOpenTelemetryContext(
reactorContext, AgentContextStorage.getAgentContext(applicationContext));
}
}

Expand All @@ -87,19 +87,19 @@ public static boolean methodEnter() {
return false;
}

@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class)
public static void methodExit(
public static Context methodExit(
@Advice.Argument(0) reactor.util.context.Context reactorContext,
@Advice.Argument(1) Context defaultContext,
@Advice.Return(readOnly = false) Context applicationContext) {
@Advice.Argument(1) Context defaultContext) {

io.opentelemetry.context.Context agentContext =
ContextPropagationOperator.getOpenTelemetryContext(reactorContext, null);
if (agentContext == null) {
applicationContext = defaultContext;
} else {
applicationContext = AgentContextStorage.toApplicationContext(agentContext);
return defaultContext;
}

return AgentContextStorage.toApplicationContext(agentContext);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ public String getModuleGroup() {
// This module uses the api context bridge helpers, therefore must be in the same classloader
return "opentelemetry-api-bridge";
}

@Override
public boolean isIndyReady() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.opentelemetryapi.context.AgentContextStorage;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

Expand Down Expand Up @@ -48,18 +49,18 @@ public static boolean methodEnter() {
return false;
}

@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class)
public static void methodExit(
public static Context methodExit(
@Advice.Argument(0) reactor.util.context.ContextView reactorContext,
@Advice.Argument(1) Context defaultContext,
@Advice.Return(readOnly = false) Context applicationContext) {
@Advice.Argument(1) Context defaultContext) {

io.opentelemetry.context.Context agentContext =
ContextPropagationOperator.getOpenTelemetryContextFromContextView(reactorContext, null);
if (agentContext == null) {
applicationContext = defaultContext;
return defaultContext;
} else {
applicationContext = AgentContextStorage.toApplicationContext(agentContext);
return AgentContextStorage.toApplicationContext(agentContext);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ public String getModuleGroup() {
// This module uses the api context bridge helpers, therefore must be in the same classloader
return "opentelemetry-api-bridge";
}

@Override
public boolean isIndyReady() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import reactor.core.publisher.Flux;
Expand All @@ -33,11 +34,14 @@ public void transform(TypeTransformer transformer) {
@SuppressWarnings("unused")
public static class ReceiveAdvice {

@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return(readOnly = false) Flux<?> flux) {
public static Flux<?> onExit(@Advice.Return Flux<?> originalFlux) {
Flux<?> flux = originalFlux;
if (!(flux instanceof TracingDisablingKafkaFlux)) {
flux = new TracingDisablingKafkaFlux<>(flux);
}
return flux;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import reactor.core.publisher.Flux;
Expand All @@ -33,11 +34,14 @@ public void transform(TypeTransformer transformer) {
@SuppressWarnings("unused")
public static class CreateConsumerFluxAdvice {

@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return(readOnly = false) Flux<?> flux) {
public static Flux<?> onExit(@Advice.Return Flux<?> originalFlux) {
Flux<?> flux = originalFlux;
if (!(flux instanceof TracingDisablingKafkaFlux)) {
flux = new TracingDisablingKafkaFlux<>(flux);
}
return flux;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import reactor.kafka.receiver.KafkaReceiver;
Expand All @@ -33,11 +34,13 @@ public void transform(TypeTransformer transformer) {
@SuppressWarnings("unused")
public static class CreateAdvice {

@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return(readOnly = false) KafkaReceiver<?, ?> receiver) {
public static KafkaReceiver<?, ?> onExit(@Advice.Return KafkaReceiver<?, ?> receiver) {
if (!(receiver instanceof InstrumentedKafkaReceiver)) {
receiver = new InstrumentedKafkaReceiver<>(receiver);
return new InstrumentedKafkaReceiver<>(receiver);
}
return receiver;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class ReactorKafkaInstrumentationModule extends InstrumentationModule {
public class ReactorKafkaInstrumentationModule extends InstrumentationModule
implements ExperimentalInstrumentationModule {

public ReactorKafkaInstrumentationModule() {
super("reactor-kafka", "reactor-kafka-1.0");
Expand All @@ -27,4 +29,9 @@ public List<TypeInstrumentation> typeInstrumentations() {
new DefaultKafkaReceiverInstrumentation(),
new ConsumerHandlerInstrumentation());
}

@Override
public boolean isIndyReady() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.function.BiConsumer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import reactor.netty.Connection;
Expand Down Expand Up @@ -74,92 +76,111 @@ public void transform(TypeTransformer transformer) {
public static class CreateAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Local("otelCallDepth") CallDepth callDepth) {
callDepth = CallDepth.forClass(HttpClient.class);
public static CallDepth onEnter() {
CallDepth callDepth = CallDepth.forClass(HttpClient.class);
callDepth.getAndIncrement();
return callDepth;
}

@AssignReturned.ToReturned
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
public static HttpClient stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Return(readOnly = false) HttpClient client,
@Advice.Local("otelCallDepth") CallDepth callDepth) {
@Advice.Return HttpClient client,
@Advice.Enter CallDepth callDepth) {

if (callDepth.decrementAndGet() == 0 && throwable == null) {
client = client.doOnRequest(new OnRequest()).mapConnect(new MapConnect());
return client.doOnRequest(new OnRequest()).mapConnect(new MapConnect());
}
return client;
}
}

@SuppressWarnings("unused")
public static class OnRequestAdvice {

@AssignReturned.ToArguments(@ToArgument(0))
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false)
BiConsumer<? super HttpClientRequest, ? super Connection> callback) {
public static BiConsumer<? super HttpClientRequest, ? super Connection> onEnter(
@Advice.Argument(0) BiConsumer<? super HttpClientRequest, ? super Connection> callback) {
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
callback = new DecoratorFunctions.OnRequestDecorator(callback);
return new DecoratorFunctions.OnRequestDecorator(callback);
}
return callback;
}
}

@SuppressWarnings("unused")
public static class OnRequestErrorAdvice {

@AssignReturned.ToArguments(@ToArgument(0))
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false)
BiConsumer<? super HttpClientRequest, ? super Throwable> callback) {
public static BiConsumer<? super HttpClientRequest, ? super Throwable> onEnter(
@Advice.Argument(0) BiConsumer<? super HttpClientRequest, ? super Throwable> callback) {
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
callback = new DecoratorFunctions.OnRequestErrorDecorator(callback);
return new DecoratorFunctions.OnRequestErrorDecorator(callback);
}
return callback;
}
}

@SuppressWarnings("unused")
public static class OnResponseAdvice {

@AssignReturned.ToArguments(@ToArgument(0))
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false)
BiConsumer<? super HttpClientResponse, ? super Connection> callback,
public static BiConsumer<? super HttpClientResponse, ? super Connection> onEnter(
@Advice.Argument(0) BiConsumer<? super HttpClientResponse, ? super Connection> callback,
@Advice.Origin("#m") String methodName) {
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
boolean forceParentContext = methodName.equals("doAfterResponse");
callback = new DecoratorFunctions.OnResponseDecorator(callback, forceParentContext);
return new DecoratorFunctions.OnResponseDecorator(callback, forceParentContext);
}
return callback;
}
}

@SuppressWarnings("unused")
public static class OnResponseErrorAdvice {

@AssignReturned.ToArguments(@ToArgument(0))
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false)
BiConsumer<? super HttpClientResponse, ? super Throwable> callback) {
public static BiConsumer<? super HttpClientResponse, ? super Throwable> onEnter(
@Advice.Argument(0) BiConsumer<? super HttpClientResponse, ? super Throwable> callback) {
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
callback = new DecoratorFunctions.OnResponseErrorDecorator(callback);
return new DecoratorFunctions.OnResponseErrorDecorator(callback);
}
return callback;
}
}

@SuppressWarnings("unused")
public static class OnErrorAdvice {

@AssignReturned.ToArguments({
@ToArgument(value = 0, index = 0),
@ToArgument(value = 1, index = 1)
})
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false)
BiConsumer<? super HttpClientRequest, ? super Throwable> requestCallback,
@Advice.Argument(value = 1, readOnly = false)
BiConsumer<? super HttpClientResponse, ? super Throwable> responseCallback) {
public static Object[] onEnter(
@Advice.Argument(0)
BiConsumer<? super HttpClientRequest, ? super Throwable> originalRequestCallback,
@Advice.Argument(1)
BiConsumer<? super HttpClientResponse, ? super Throwable> originalResponseCallback) {

// intermediate variables needed for inlined instrumentation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was there some side effect that surfaced that caused issues with referencing the originals without making an intermediate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when removing those this makes the tests fail with missing spans. I remember having seen this elsewhere and thus applied this workaround after spending too much time doing diffs and partial reverts. It could be something worth investigating further as it might be related to bytebuddy or the way we use it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as I remember byte-buddy does not allow you to reassign a value to a read only argument

BiConsumer<? super HttpClientRequest, ? super Throwable> requestCallback =
originalRequestCallback;
BiConsumer<? super HttpClientResponse, ? super Throwable> responseCallback =
originalResponseCallback;

if (DecoratorFunctions.shouldDecorate(requestCallback.getClass())) {
requestCallback = new DecoratorFunctions.OnRequestErrorDecorator(requestCallback);
}
if (DecoratorFunctions.shouldDecorate(responseCallback.getClass())) {
responseCallback = new DecoratorFunctions.OnResponseErrorDecorator(responseCallback);
}
return new Object[] {requestCallback, responseCallback};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
Expand All @@ -24,7 +25,8 @@
* HttpClient#doOnRequest(BiConsumer)} to pass context from the caller to Reactor to Netty.
*/
@AutoService(InstrumentationModule.class)
public class ReactorNettyInstrumentationModule extends InstrumentationModule {
public class ReactorNettyInstrumentationModule extends InstrumentationModule
implements ExperimentalInstrumentationModule {

public ReactorNettyInstrumentationModule() {
super("reactor-netty", "reactor-netty-0.9");
Expand All @@ -40,4 +42,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new HttpClientInstrumentation());
}

@Override
public boolean isIndyReady() {
return true;
}
}
Loading