Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingIterable;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingIterator;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingList;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingListIterator;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.description.type.TypeDescription;
Expand Down Expand Up @@ -60,6 +62,13 @@ public void transform(TypeTransformer transformer) {
.and(takesArguments(0))
.and(returns(Iterator.class)),
ConsumerRecordsInstrumentation.class.getName() + "$IteratorAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("listIterator"))
.and(takesArguments(0))
.and(returns(ListIterator.class)),
ConsumerRecordsInstrumentation.class.getName() + "$ListIteratorAdvice");
}

@SuppressWarnings("unused")
Expand Down Expand Up @@ -118,4 +127,23 @@ public static <K, V> Iterator<ConsumerRecord<K, V>> wrap(
iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
}
}

@SuppressWarnings("unused")
public static class ListIteratorAdvice {

@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class)
public static <K, V> ListIterator<ConsumerRecord<K, V>> wrap(
@Advice.This ConsumerRecords<?, ?> records,
@Advice.Return ListIterator<ConsumerRecord<K, V>> listIterator) {

// it's important not to suppress consumer span creation here because this instrumentation can
// leak the context and so there may be a leaked consumer span in the context, in which
// case it's important to overwrite the leaked span instead of suppressing the correct span
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
return TracingListIterator.wrap(
listIterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -155,8 +156,8 @@ void testPassThroughTombstone()
}

@DisplayName("test records(TopicPartition) kafka consume")
@Test
void testRecordsWithTopicPartitionKafkaConsume()
@ValueSource(booleans = {true, false})
void testRecordsWithTopicPartitionKafkaConsume(boolean testListIterator)
throws ExecutionException, InterruptedException, TimeoutException {
String greeting = "Hello from MockConsumer!";
producer
Expand All @@ -172,9 +173,19 @@ void testRecordsWithTopicPartitionKafkaConsume()
assertThat(recordsInPartition.size()).isEqualTo(1);

// iterate over records to generate spans
for (ConsumerRecord<?, ?> record : recordsInPartition) {
assertThat(record.value()).isEqualTo(greeting);
assertThat(record.key()).isNull();
if (testListIterator) {
for (ListIterator<? extends ConsumerRecord<?, ?>> iterator =
recordsInPartition.listIterator();
iterator.hasNext(); ) {
ConsumerRecord<?, ?> record = iterator.next();
assertThat(record.value()).isEqualTo(greeting);
assertThat(record.key()).isNull();
}
} else {
for (ConsumerRecord<?, ?> record : recordsInPartition) {
assertThat(record.value()).isEqualTo(greeting);
assertThat(record.key()).isNull();
}
}

AtomicReference<SpanData> producerSpan = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
public class TracingIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
private final Iterable<ConsumerRecord<K, V>> delegate;
private final Instrumenter<KafkaProcessRequest, Void> instrumenter;
private final BooleanSupplier wrappingEnabled;
private final KafkaConsumerContext consumerContext;
protected final Instrumenter<KafkaProcessRequest, Void> instrumenter;
protected final BooleanSupplier wrappingEnabled;
protected final KafkaConsumerContext consumerContext;
private boolean firstIterator = true;

protected TracingIterable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,14 @@ public int lastIndexOf(Object o) {

@Override
public ListIterator<ConsumerRecord<K, V>> listIterator() {
// TODO: the API for ListIterator is not really good to instrument it in context of Kafka
// Consumer so we will not do that for now
return delegate.listIterator();
return TracingListIterator.wrap(
delegate.listIterator(), instrumenter, wrappingEnabled, consumerContext);
}

@Override
public ListIterator<ConsumerRecord<K, V>> listIterator(int index) {
// TODO: the API for ListIterator is not really good to instrument it in context of Kafka
// Consumer so we will not do that for now
return delegate.listIterator(index);
return TracingListIterator.wrap(
delegate.listIterator(index), instrumenter, wrappingEnabled, consumerContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal;

import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.Iterator;
import java.util.ListIterator;
import java.util.function.BooleanSupplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public class TracingListIterator<K, V> implements ListIterator<ConsumerRecord<K, V>> {

private final ListIterator<ConsumerRecord<K, V>> delegateListIterator;
private final Iterator<ConsumerRecord<K, V>> tracingIterator;

private TracingListIterator(
ListIterator<ConsumerRecord<K, V>> delegateListIterator,
Instrumenter<KafkaProcessRequest, Void> instrumenter,
BooleanSupplier wrappingEnabled,
KafkaConsumerContext consumerContext) {
this.delegateListIterator = delegateListIterator;
this.tracingIterator =
TracingIterator.wrap(delegateListIterator, instrumenter, wrappingEnabled, consumerContext);
}

public static <K, V> ListIterator<ConsumerRecord<K, V>> wrap(
ListIterator<ConsumerRecord<K, V>> delegateListIterator,
Instrumenter<KafkaProcessRequest, Void> instrumenter,
BooleanSupplier wrappingEnabled,
KafkaConsumerContext consumerContext) {
if (wrappingEnabled.getAsBoolean()) {
return new TracingListIterator<>(
delegateListIterator, instrumenter, wrappingEnabled, consumerContext);
}
return delegateListIterator;
}

@Override
public boolean hasNext() {
return tracingIterator.hasNext();
}

@Override
public ConsumerRecord<K, V> next() {
return tracingIterator.next();
}

@Override
public boolean hasPrevious() {
return delegateListIterator.hasPrevious();
}

@Override
public ConsumerRecord<K, V> previous() {
return delegateListIterator.previous();
}

@Override
public int nextIndex() {
return delegateListIterator.nextIndex();
}

@Override
public int previousIndex() {
return delegateListIterator.previousIndex();
}

@Override
public void remove() {
delegateListIterator.remove();
}

@Override
public void set(ConsumerRecord<K, V> consumerRecord) {
delegateListIterator.set(consumerRecord);
}

@Override
public void add(ConsumerRecord<K, V> consumerRecord) {
delegateListIterator.add(consumerRecord);
}
}