diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java index bc44771daa2a..1adebbe5ec08 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java @@ -19,10 +19,12 @@ import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingIterable; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingIterator; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingList; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingListIterator; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; @@ -60,6 +62,13 @@ public void transform(TypeTransformer transformer) { .and(takesArguments(0)) .and(returns(Iterator.class)), ConsumerRecordsInstrumentation.class.getName() + "$IteratorAdvice"); + transformer.applyAdviceToMethod( + isMethod() + .and(isPublic()) + .and(named("listIterator")) + .and(takesArguments(0)) + .and(returns(ListIterator.class)), + ConsumerRecordsInstrumentation.class.getName() + "$ListIteratorAdvice"); } @SuppressWarnings("unused") @@ -118,4 +127,23 @@ public static Iterator> wrap( iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext); } } + + @SuppressWarnings("unused") + public static class ListIteratorAdvice { + + @AssignReturned.ToReturned + @Advice.OnMethodExit(suppress = Throwable.class) + public static ListIterator> wrap( + @Advice.This ConsumerRecords records, + @Advice.Return ListIterator> listIterator) { + + // it's important not to suppress consumer span creation here because this instrumentation can + // leak the context and so there may be a leaked consumer span in the context, in which + // case it's important to overwrite the leaked span instead of suppressing the correct span + // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) + KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); + return TracingListIterator.wrap( + listIterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext); + } + } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java index e0e7821b4b46..6df0d51d53e8 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java @@ -18,6 +18,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; +import java.util.ListIterator; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -155,8 +156,8 @@ void testPassThroughTombstone() } @DisplayName("test records(TopicPartition) kafka consume") - @Test - void testRecordsWithTopicPartitionKafkaConsume() + @ValueSource(booleans = {true, false}) + void testRecordsWithTopicPartitionKafkaConsume(boolean testListIterator) throws ExecutionException, InterruptedException, TimeoutException { String greeting = "Hello from MockConsumer!"; producer @@ -172,9 +173,19 @@ void testRecordsWithTopicPartitionKafkaConsume() assertThat(recordsInPartition.size()).isEqualTo(1); // iterate over records to generate spans - for (ConsumerRecord record : recordsInPartition) { - assertThat(record.value()).isEqualTo(greeting); - assertThat(record.key()).isNull(); + if (testListIterator) { + for (ListIterator> iterator = + recordsInPartition.listIterator(); + iterator.hasNext(); ) { + ConsumerRecord record = iterator.next(); + assertThat(record.value()).isEqualTo(greeting); + assertThat(record.key()).isNull(); + } + } else { + for (ConsumerRecord record : recordsInPartition) { + assertThat(record.value()).isEqualTo(greeting); + assertThat(record.key()).isNull(); + } } AtomicReference producerSpan = new AtomicReference<>(); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingIterable.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingIterable.java index 540a15659765..2a30128d49a9 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingIterable.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingIterable.java @@ -16,9 +16,9 @@ */ public class TracingIterable implements Iterable> { private final Iterable> delegate; - private final Instrumenter instrumenter; - private final BooleanSupplier wrappingEnabled; - private final KafkaConsumerContext consumerContext; + protected final Instrumenter instrumenter; + protected final BooleanSupplier wrappingEnabled; + protected final KafkaConsumerContext consumerContext; private boolean firstIterator = true; protected TracingIterable( diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java index a896f1ebf70f..8af6d6ea2962 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java @@ -137,16 +137,14 @@ public int lastIndexOf(Object o) { @Override public ListIterator> listIterator() { - // TODO: the API for ListIterator is not really good to instrument it in context of Kafka - // Consumer so we will not do that for now - return delegate.listIterator(); + return TracingListIterator.wrap( + delegate.listIterator(), instrumenter, wrappingEnabled, consumerContext); } @Override public ListIterator> listIterator(int index) { - // TODO: the API for ListIterator is not really good to instrument it in context of Kafka - // Consumer so we will not do that for now - return delegate.listIterator(index); + return TracingListIterator.wrap( + delegate.listIterator(index), instrumenter, wrappingEnabled, consumerContext); } @Override diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingListIterator.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingListIterator.java new file mode 100644 index 000000000000..7f153a0ef7e8 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingListIterator.java @@ -0,0 +1,89 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal; + +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.util.Iterator; +import java.util.ListIterator; +import java.util.function.BooleanSupplier; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public class TracingListIterator implements ListIterator> { + + private final ListIterator> delegateListIterator; + private final Iterator> tracingIterator; + + private TracingListIterator( + ListIterator> delegateListIterator, + Instrumenter instrumenter, + BooleanSupplier wrappingEnabled, + KafkaConsumerContext consumerContext) { + this.delegateListIterator = delegateListIterator; + this.tracingIterator = + TracingIterator.wrap(delegateListIterator, instrumenter, wrappingEnabled, consumerContext); + } + + public static ListIterator> wrap( + ListIterator> delegateListIterator, + Instrumenter instrumenter, + BooleanSupplier wrappingEnabled, + KafkaConsumerContext consumerContext) { + if (wrappingEnabled.getAsBoolean()) { + return new TracingListIterator<>( + delegateListIterator, instrumenter, wrappingEnabled, consumerContext); + } + return delegateListIterator; + } + + @Override + public boolean hasNext() { + return tracingIterator.hasNext(); + } + + @Override + public ConsumerRecord next() { + return tracingIterator.next(); + } + + @Override + public boolean hasPrevious() { + return delegateListIterator.hasPrevious(); + } + + @Override + public ConsumerRecord previous() { + return delegateListIterator.previous(); + } + + @Override + public int nextIndex() { + return delegateListIterator.nextIndex(); + } + + @Override + public int previousIndex() { + return delegateListIterator.previousIndex(); + } + + @Override + public void remove() { + delegateListIterator.remove(); + } + + @Override + public void set(ConsumerRecord consumerRecord) { + delegateListIterator.set(consumerRecord); + } + + @Override + public void add(ConsumerRecord consumerRecord) { + delegateListIterator.add(consumerRecord); + } +}