Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.List;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -64,60 +65,57 @@ public void transform(TypeTransformer transformer) {
@SuppressWarnings("unused")
public static class IterableAdvice {

@SuppressWarnings("unchecked")
@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class)
public static <K, V> void wrap(
public static <K, V> Iterable<ConsumerRecord<K, V>> wrap(
@Advice.This ConsumerRecords<?, ?> records,
@Advice.Return(readOnly = false) Iterable<ConsumerRecord<K, V>> iterable) {
@Advice.Return Iterable<ConsumerRecord<K, V>> iterable) {

// it's important not to suppress consumer span creation here because this instrumentation can
// leak the context and so there may be a leaked consumer span in the context, in which
// case it's important to overwrite the leaked span instead of suppressing the correct span
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
iterable =
TracingIterable.wrap(
iterable, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
return TracingIterable.wrap(
iterable, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
}
}

@SuppressWarnings("unused")
public static class ListAdvice {

@SuppressWarnings("unchecked")
@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class)
public static <K, V> void wrap(
public static <K, V> List<ConsumerRecord<K, V>> wrap(
@Advice.This ConsumerRecords<?, ?> records,
@Advice.Return(readOnly = false) List<ConsumerRecord<K, V>> list) {
@Advice.Return List<ConsumerRecord<K, V>> list) {

// it's important not to suppress consumer span creation here because this instrumentation can
// leak the context and so there may be a leaked consumer span in the context, in which
// case it's important to overwrite the leaked span instead of suppressing the correct span
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
list =
TracingList.wrap(
list, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
return TracingList.wrap(
list, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
}
}

@SuppressWarnings("unused")
public static class IteratorAdvice {

@SuppressWarnings("unchecked")
@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class)
public static <K, V> void wrap(
public static <K, V> Iterator<ConsumerRecord<K, V>> wrap(
@Advice.This ConsumerRecords<?, ?> records,
@Advice.Return(readOnly = false) Iterator<ConsumerRecord<K, V>> iterator) {
@Advice.Return Iterator<ConsumerRecord<K, V>> iterator) {

// it's important not to suppress consumer span creation here because this instrumentation can
// leak the context and so there may be a leaked consumer span in the context, in which
// case it's important to overwrite the leaked span instead of suppressing the correct span
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
iterator =
TracingIterator.wrap(
iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
return TracingIterator.wrap(
iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class KafkaClientsInstrumentationModule extends InstrumentationModule {
public class KafkaClientsInstrumentationModule extends InstrumentationModule
implements ExperimentalInstrumentationModule {
public KafkaClientsInstrumentationModule() {
super("kafka-clients", "kafka-clients-0.11", "kafka");
}
Expand All @@ -25,4 +27,9 @@ public List<TypeInstrumentation> typeInstrumentations() {
new KafkaConsumerInstrumentation(),
new ConsumerRecordsInstrumentation());
}

@Override
public boolean isIndyReady() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.ApiVersions;
Expand Down Expand Up @@ -46,48 +49,83 @@ public void transform(TypeTransformer transformer) {
@SuppressWarnings("unused")
public static class SendAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static KafkaProducerRequest onEnter(
@Advice.FieldValue("apiVersions") ApiVersions apiVersions,
@Advice.FieldValue("clientId") String clientId,
@Advice.Argument(value = 0, readOnly = false) ProducerRecord<?, ?> record,
@Advice.Argument(value = 1, readOnly = false) Callback callback,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
public static class AdviceScope {
private final KafkaProducerRequest request;
private final Context context;
private final Scope scope;
private final Context parentContext;

KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
Context parentContext = Java8BytecodeBridge.currentContext();
if (!producerInstrumenter().shouldStart(parentContext, request)) {
return null;
private AdviceScope(
Context parentContext, KafkaProducerRequest request, Context context, Scope scope) {
this.parentContext = parentContext;
this.request = request;
this.context = context;
this.scope = scope;
}

context = producerInstrumenter().start(parentContext, request);
scope = context.makeCurrent();
@Nullable
public static AdviceScope start(KafkaProducerRequest request) {
Context parentContext = Java8BytecodeBridge.currentContext();
if (!producerInstrumenter().shouldStart(parentContext, request)) {
return null;
}
Context context = producerInstrumenter().start(parentContext, request);
return new AdviceScope(parentContext, request, context, context.makeCurrent());
}

if (KafkaSingletons.isProducerPropagationEnabled()
&& KafkaPropagation.shouldPropagate(apiVersions)) {
record = KafkaPropagation.propagateContext(context, record);
public Callback wrapCallback(Callback originalCallback) {
return new ProducerCallback(originalCallback, parentContext, context, request);
}

callback = new ProducerCallback(callback, parentContext, context, request);
return request;
public ProducerRecord<?, ?> propagateContext(
ApiVersions apiVersions, ProducerRecord<?, ?> record) {
if (KafkaSingletons.isProducerPropagationEnabled()
&& KafkaPropagation.shouldPropagate(apiVersions)) {
return KafkaPropagation.propagateContext(context, record);
}
return record;
}

public void end(@Nullable Throwable throwable) {
scope.close();
if (throwable != null) {
producerInstrumenter().end(context, request, null, throwable);
}
// span finished by ProducerCallback
}
}

@AssignReturned.ToArguments({
@ToArgument(value = 0, index = 1),
@ToArgument(value = 1, index = 2)
})
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Object[] onEnter(
@Advice.FieldValue("apiVersions") ApiVersions apiVersions,
@Advice.FieldValue("clientId") String clientId,
@Advice.Argument(0) ProducerRecord<?, ?> originalRecord,
@Advice.Argument(1) Callback originalCallback) {
ProducerRecord<?, ?> record = originalRecord;
Callback callback = originalCallback;

KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
AdviceScope adviceScope = AdviceScope.start(request);
if (adviceScope == null) {
return new Object[] {null, record, callback};
}
record = adviceScope.propagateContext(apiVersions, record);
callback = adviceScope.wrapCallback(callback);
return new Object[] {adviceScope, record, callback};
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Enter KafkaProducerRequest request,
@Advice.Thrown Throwable throwable,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
if (scope == null) {
return;
}
scope.close();
@Advice.Thrown @Nullable Throwable throwable, @Advice.Enter Object[] enterResult) {

if (throwable != null) {
producerInstrumenter().end(context, request, null, throwable);
AdviceScope adviceScope = (AdviceScope) enterResult[0];
if (adviceScope != null) {
adviceScope.end(throwable);
}
// span finished by ProducerCallback
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

Expand All @@ -39,9 +41,10 @@ public void transform(TypeTransformer transformer) {
@SuppressWarnings("unused")
public static class ConstructorMapAdvice {

@AssignReturned.ToArguments(@ToArgument(0))
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
public static Map<String, Object> onEnter(
@Advice.Argument(0) Map<String, Object> originalConfig) {

// In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread`
// of DefaultKafkaProducerFactory is set to true, the `config` object entering
Expand All @@ -59,8 +62,9 @@ public static void onEnter(
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/12538

// ensure config is a mutable map and avoid concurrency conflicts
config = new HashMap<>(config);
Map<String, Object> config = new HashMap<>(originalConfig);
enhanceConfig(config);
return config;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new KafkaMetricsProducerInstrumentation(), new KafkaMetricsConsumerInstrumentation());
}

@Override
public boolean isIndyReady() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

Expand All @@ -39,9 +41,11 @@ public void transform(TypeTransformer transformer) {
@SuppressWarnings("unused")
public static class ConstructorMapAdvice {

@AssignReturned.ToArguments(@ToArgument(0))
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
public static Map<String, Object> onEnter(
@Advice.Argument(0) Map<String, Object> originalConfig) {
Map<String, Object> config = originalConfig;

// In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread`
// of DefaultKafkaProducerFactory is set to true, the `config` object entering
Expand All @@ -61,6 +65,7 @@ public static void onEnter(
// ensure config is a mutable map and avoid concurrency conflicts
config = new HashMap<>(config);
enhanceConfig(config);
return config;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class KafkaStreamsInstrumentationModule extends InstrumentationModule {
public class KafkaStreamsInstrumentationModule extends InstrumentationModule
implements ExperimentalInstrumentationModule {
public KafkaStreamsInstrumentationModule() {
super("kafka-streams", "kafka-streams-0.11", "kafka");
}
Expand All @@ -27,4 +29,9 @@ public List<TypeInstrumentation> typeInstrumentations() {
new StreamTaskInstrumentation(),
new StreamThreadInstrumentation());
}

@Override
public boolean isIndyReady() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -45,12 +46,13 @@ public void transform(TypeTransformer transformer) {
@SuppressWarnings("unused")
public static class DeserializeAdvice {

@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
public static ConsumerRecord<?, ?> onExit(
@Advice.Argument(1) ConsumerRecord<?, ?> incoming,
@Advice.Return(readOnly = false) ConsumerRecord<?, ?> result) {
@Advice.Return ConsumerRecord<?, ?> result) {
if (result == null) {
return;
return null;
}

// on 1.x we need to copy headers from incoming to result
Expand All @@ -62,6 +64,7 @@ public static void onExit(

// copy the receive CONSUMER span association
KafkaConsumerContextUtil.set(result, KafkaConsumerContextUtil.get(incoming));
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -42,12 +43,13 @@ public void transform(TypeTransformer transformer) {
@SuppressWarnings("unused")
public static class SaveHeadersAdvice {

@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class)
public static void saveHeaders(
public static ConsumerRecord<?, ?> saveHeaders(
@Advice.Argument(0) ConsumerRecord<?, ?> incoming,
@Advice.Return(readOnly = false) ConsumerRecord<?, ?> result) {
@Advice.Return ConsumerRecord<?, ?> result) {
if (result == null) {
return;
return null;
}

// copy headers from incoming to result
Expand All @@ -57,6 +59,7 @@ public static void saveHeaders(

// copy the receive CONSUMER span association
KafkaConsumerContextUtil.set(result, KafkaConsumerContextUtil.get(incoming));
return result;
}
}
}