diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java index d680e78143e9..8979cf725932 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java @@ -19,10 +19,12 @@ import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingIterable; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingIterator; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingList; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingListIterator; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -59,6 +61,13 @@ public void transform(TypeTransformer transformer) { .and(takesArguments(0)) .and(returns(Iterator.class)), ConsumerRecordsInstrumentation.class.getName() + "$IteratorAdvice"); + transformer.applyAdviceToMethod( + isMethod() + .and(isPublic()) + .and(named("listIterator")) + .and(takesArguments(0)) + .and(returns(ListIterator.class)), + ConsumerRecordsInstrumentation.class.getName() + "$ListIteratorAdvice"); } @SuppressWarnings("unused") @@ -120,4 +129,27 @@ public static void wrap( iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext); } } + + @SuppressWarnings("unused") + public static class ListIteratorAdvice { + + @SuppressWarnings("unchecked") + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap( + @Advice.This ConsumerRecords records, + @Advice.Return(readOnly = false) ListIterator> listIterator) { + + // it's important not to suppress consumer span creation here because this instrumentation can + // leak the context and so there may be a leaked consumer span in the context, in which + // case it's important to overwrite the leaked span instead of suppressing the correct span + // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) + KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); + listIterator = + TracingListIterator.wrap( + listIterator, + consumerProcessInstrumenter(), + wrappingEnabledSupplier(), + consumerContext); + } + } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingIterable.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingIterable.java index 540a15659765..2a30128d49a9 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingIterable.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingIterable.java @@ -16,9 +16,9 @@ */ public class TracingIterable implements Iterable> { private final Iterable> delegate; - private final Instrumenter instrumenter; - private final BooleanSupplier wrappingEnabled; - private final KafkaConsumerContext consumerContext; + protected final Instrumenter instrumenter; + protected final BooleanSupplier wrappingEnabled; + protected final KafkaConsumerContext consumerContext; private boolean firstIterator = true; protected TracingIterable( diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java index a896f1ebf70f..8af6d6ea2962 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java @@ -137,16 +137,14 @@ public int lastIndexOf(Object o) { @Override public ListIterator> listIterator() { - // TODO: the API for ListIterator is not really good to instrument it in context of Kafka - // Consumer so we will not do that for now - return delegate.listIterator(); + return TracingListIterator.wrap( + delegate.listIterator(), instrumenter, wrappingEnabled, consumerContext); } @Override public ListIterator> listIterator(int index) { - // TODO: the API for ListIterator is not really good to instrument it in context of Kafka - // Consumer so we will not do that for now - return delegate.listIterator(index); + return TracingListIterator.wrap( + delegate.listIterator(index), instrumenter, wrappingEnabled, consumerContext); } @Override diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingListIterator.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingListIterator.java new file mode 100644 index 000000000000..7f153a0ef7e8 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingListIterator.java @@ -0,0 +1,89 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal; + +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.util.Iterator; +import java.util.ListIterator; +import java.util.function.BooleanSupplier; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public class TracingListIterator implements ListIterator> { + + private final ListIterator> delegateListIterator; + private final Iterator> tracingIterator; + + private TracingListIterator( + ListIterator> delegateListIterator, + Instrumenter instrumenter, + BooleanSupplier wrappingEnabled, + KafkaConsumerContext consumerContext) { + this.delegateListIterator = delegateListIterator; + this.tracingIterator = + TracingIterator.wrap(delegateListIterator, instrumenter, wrappingEnabled, consumerContext); + } + + public static ListIterator> wrap( + ListIterator> delegateListIterator, + Instrumenter instrumenter, + BooleanSupplier wrappingEnabled, + KafkaConsumerContext consumerContext) { + if (wrappingEnabled.getAsBoolean()) { + return new TracingListIterator<>( + delegateListIterator, instrumenter, wrappingEnabled, consumerContext); + } + return delegateListIterator; + } + + @Override + public boolean hasNext() { + return tracingIterator.hasNext(); + } + + @Override + public ConsumerRecord next() { + return tracingIterator.next(); + } + + @Override + public boolean hasPrevious() { + return delegateListIterator.hasPrevious(); + } + + @Override + public ConsumerRecord previous() { + return delegateListIterator.previous(); + } + + @Override + public int nextIndex() { + return delegateListIterator.nextIndex(); + } + + @Override + public int previousIndex() { + return delegateListIterator.previousIndex(); + } + + @Override + public void remove() { + delegateListIterator.remove(); + } + + @Override + public void set(ConsumerRecord consumerRecord) { + delegateListIterator.set(consumerRecord); + } + + @Override + public void add(ConsumerRecord consumerRecord) { + delegateListIterator.add(consumerRecord); + } +}