diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java index d680e78143e9..bc44771daa2a 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -64,60 +65,57 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class IterableAdvice { - @SuppressWarnings("unchecked") + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void wrap( + public static Iterable> wrap( @Advice.This ConsumerRecords records, - @Advice.Return(readOnly = false) Iterable> iterable) { + @Advice.Return Iterable> iterable) { // it's important not to suppress consumer span creation here because this instrumentation can // leak the context and so there may be a leaked consumer span in the context, in which // case it's important to overwrite the leaked span instead of suppressing the correct span // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); - iterable = - TracingIterable.wrap( - iterable, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext); + return TracingIterable.wrap( + iterable, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext); } } @SuppressWarnings("unused") public static class ListAdvice { - @SuppressWarnings("unchecked") + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void wrap( + public static List> wrap( @Advice.This ConsumerRecords records, - @Advice.Return(readOnly = false) List> list) { + @Advice.Return List> list) { // it's important not to suppress consumer span creation here because this instrumentation can // leak the context and so there may be a leaked consumer span in the context, in which // case it's important to overwrite the leaked span instead of suppressing the correct span // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); - list = - TracingList.wrap( - list, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext); + return TracingList.wrap( + list, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext); } } @SuppressWarnings("unused") public static class IteratorAdvice { - @SuppressWarnings("unchecked") + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void wrap( + public static Iterator> wrap( @Advice.This ConsumerRecords records, - @Advice.Return(readOnly = false) Iterator> iterator) { + @Advice.Return Iterator> iterator) { // it's important not to suppress consumer span creation here because this instrumentation can // leak the context and so there may be a leaked consumer span in the context, in which // case it's important to overwrite the leaked span instead of suppressing the correct span // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); - iterator = - TracingIterator.wrap( - iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext); + return TracingIterator.wrap( + iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext); } } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java index 88ef08507e37..895cc6324a84 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java @@ -10,10 +10,12 @@ import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule; import java.util.List; @AutoService(InstrumentationModule.class) -public class KafkaClientsInstrumentationModule extends InstrumentationModule { +public class KafkaClientsInstrumentationModule extends InstrumentationModule + implements ExperimentalInstrumentationModule { public KafkaClientsInstrumentationModule() { super("kafka-clients", "kafka-clients-0.11", "kafka"); } @@ -25,4 +27,9 @@ public List typeInstrumentations() { new KafkaConsumerInstrumentation(), new ConsumerRecordsInstrumentation()); } + + @Override + public boolean isIndyReady() { + return true; + } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java index c3614f95f59b..e993089d00cc 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java @@ -18,7 +18,10 @@ import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; +import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import org.apache.kafka.clients.ApiVersions; @@ -46,48 +49,83 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class SendAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class) - public static KafkaProducerRequest onEnter( - @Advice.FieldValue("apiVersions") ApiVersions apiVersions, - @Advice.FieldValue("clientId") String clientId, - @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, - @Advice.Argument(value = 1, readOnly = false) Callback callback, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelScope") Scope scope) { + public static class AdviceScope { + private final KafkaProducerRequest request; + private final Context context; + private final Scope scope; + private final Context parentContext; - KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId); - Context parentContext = Java8BytecodeBridge.currentContext(); - if (!producerInstrumenter().shouldStart(parentContext, request)) { - return null; + private AdviceScope( + Context parentContext, KafkaProducerRequest request, Context context, Scope scope) { + this.parentContext = parentContext; + this.request = request; + this.context = context; + this.scope = scope; } - context = producerInstrumenter().start(parentContext, request); - scope = context.makeCurrent(); + @Nullable + public static AdviceScope start(KafkaProducerRequest request) { + Context parentContext = Java8BytecodeBridge.currentContext(); + if (!producerInstrumenter().shouldStart(parentContext, request)) { + return null; + } + Context context = producerInstrumenter().start(parentContext, request); + return new AdviceScope(parentContext, request, context, context.makeCurrent()); + } - if (KafkaSingletons.isProducerPropagationEnabled() - && KafkaPropagation.shouldPropagate(apiVersions)) { - record = KafkaPropagation.propagateContext(context, record); + public Callback wrapCallback(Callback originalCallback) { + return new ProducerCallback(originalCallback, parentContext, context, request); } - callback = new ProducerCallback(callback, parentContext, context, request); - return request; + public ProducerRecord propagateContext( + ApiVersions apiVersions, ProducerRecord record) { + if (KafkaSingletons.isProducerPropagationEnabled() + && KafkaPropagation.shouldPropagate(apiVersions)) { + return KafkaPropagation.propagateContext(context, record); + } + return record; + } + + public void end(@Nullable Throwable throwable) { + scope.close(); + if (throwable != null) { + producerInstrumenter().end(context, request, null, throwable); + } + // span finished by ProducerCallback + } + } + + @AssignReturned.ToArguments({ + @ToArgument(value = 0, index = 1), + @ToArgument(value = 1, index = 2) + }) + @Advice.OnMethodEnter(suppress = Throwable.class) + public static Object[] onEnter( + @Advice.FieldValue("apiVersions") ApiVersions apiVersions, + @Advice.FieldValue("clientId") String clientId, + @Advice.Argument(0) ProducerRecord originalRecord, + @Advice.Argument(1) Callback originalCallback) { + ProducerRecord record = originalRecord; + Callback callback = originalCallback; + + KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId); + AdviceScope adviceScope = AdviceScope.start(request); + if (adviceScope == null) { + return new Object[] {null, record, callback}; + } + record = adviceScope.propagateContext(apiVersions, record); + callback = adviceScope.wrapCallback(callback); + return new Object[] {adviceScope, record, callback}; } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( - @Advice.Enter KafkaProducerRequest request, - @Advice.Thrown Throwable throwable, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelScope") Scope scope) { - if (scope == null) { - return; - } - scope.close(); + @Advice.Thrown @Nullable Throwable throwable, @Advice.Enter Object[] enterResult) { - if (throwable != null) { - producerInstrumenter().end(context, request, null, throwable); + AdviceScope adviceScope = (AdviceScope) enterResult[0]; + if (adviceScope != null) { + adviceScope.end(throwable); } - // span finished by ProducerCallback } } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsConsumerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsConsumerInstrumentation.java index 45c0d6078d4a..6a1f4455e9df 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsConsumerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsConsumerInstrumentation.java @@ -16,6 +16,8 @@ import java.util.Map; import java.util.Properties; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; +import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -39,9 +41,10 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class ConstructorMapAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) Map config) { + public static Map onEnter( + @Advice.Argument(0) Map originalConfig) { // In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread` // of DefaultKafkaProducerFactory is set to true, the `config` object entering @@ -59,8 +62,9 @@ public static void onEnter( // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/12538 // ensure config is a mutable map and avoid concurrency conflicts - config = new HashMap<>(config); + Map config = new HashMap<>(originalConfig); enhanceConfig(config); + return config; } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsInstrumentationModule.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsInstrumentationModule.java index 16c405f80554..ef64d13dad29 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsInstrumentationModule.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsInstrumentationModule.java @@ -40,4 +40,9 @@ public List typeInstrumentations() { return asList( new KafkaMetricsProducerInstrumentation(), new KafkaMetricsConsumerInstrumentation()); } + + @Override + public boolean isIndyReady() { + return true; + } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsProducerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsProducerInstrumentation.java index 3c199255522f..dc08085dcf8d 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsProducerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsProducerInstrumentation.java @@ -16,6 +16,8 @@ import java.util.Map; import java.util.Properties; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; +import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -39,9 +41,11 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class ConstructorMapAdvice { + @AssignReturned.ToArguments(@ToArgument(0)) @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) Map config) { + public static Map onEnter( + @Advice.Argument(0) Map originalConfig) { + Map config = originalConfig; // In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread` // of DefaultKafkaProducerFactory is set to true, the `config` object entering @@ -61,6 +65,7 @@ public static void onEnter( // ensure config is a mutable map and avoid concurrency conflicts config = new HashMap<>(config); enhanceConfig(config); + return config; } } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsInstrumentationModule.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsInstrumentationModule.java index c61cecfed349..033448076a8a 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsInstrumentationModule.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsInstrumentationModule.java @@ -10,10 +10,12 @@ import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule; import java.util.List; @AutoService(InstrumentationModule.class) -public class KafkaStreamsInstrumentationModule extends InstrumentationModule { +public class KafkaStreamsInstrumentationModule extends InstrumentationModule + implements ExperimentalInstrumentationModule { public KafkaStreamsInstrumentationModule() { super("kafka-streams", "kafka-streams-0.11", "kafka"); } @@ -27,4 +29,9 @@ public List typeInstrumentations() { new StreamTaskInstrumentation(), new StreamThreadInstrumentation()); } + + @Override + public boolean isIndyReady() { + return true; + } } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java index 2229e9942b9c..0b52087b017c 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java @@ -17,6 +17,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -45,12 +46,13 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class DeserializeAdvice { + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit( + public static ConsumerRecord onExit( @Advice.Argument(1) ConsumerRecord incoming, - @Advice.Return(readOnly = false) ConsumerRecord result) { + @Advice.Return ConsumerRecord result) { if (result == null) { - return; + return null; } // on 1.x we need to copy headers from incoming to result @@ -62,6 +64,7 @@ public static void onExit( // copy the receive CONSUMER span association KafkaConsumerContextUtil.set(result, KafkaConsumerContextUtil.get(incoming)); + return result; } } } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java index a6d2ce6a357f..8e39d388d986 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java @@ -15,6 +15,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -42,12 +43,13 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class SaveHeadersAdvice { + @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class) - public static void saveHeaders( + public static ConsumerRecord saveHeaders( @Advice.Argument(0) ConsumerRecord incoming, - @Advice.Return(readOnly = false) ConsumerRecord result) { + @Advice.Return ConsumerRecord result) { if (result == null) { - return; + return null; } // copy headers from incoming to result @@ -57,6 +59,7 @@ public static void saveHeaders( // copy the receive CONSUMER span association KafkaConsumerContextUtil.set(result, KafkaConsumerContextUtil.get(incoming)); + return result; } } }