Skip to content

Commit 970717b

Browse files
authored
Update ConsumerRecordsInstrumentation.java
1 parent e62bf8a commit 970717b

File tree

1 file changed

+50
-16
lines changed
  • instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11

1 file changed

+50
-16
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java

Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingIterable;
2020
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingIterator;
2121
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingList;
22+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingListIterator;
2223
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2324
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
2425
import java.util.Iterator;
2526
import java.util.List;
27+
import java.util.ListIterator;
2628
import net.bytebuddy.asm.Advice;
27-
import net.bytebuddy.asm.Advice.AssignReturned;
2829
import net.bytebuddy.description.type.TypeDescription;
2930
import net.bytebuddy.matcher.ElementMatcher;
3031
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -60,62 +61,95 @@ public void transform(TypeTransformer transformer) {
6061
.and(takesArguments(0))
6162
.and(returns(Iterator.class)),
6263
ConsumerRecordsInstrumentation.class.getName() + "$IteratorAdvice");
64+
transformer.applyAdviceToMethod(
65+
isMethod()
66+
.and(isPublic())
67+
.and(named("listIterator"))
68+
.and(takesArguments(0))
69+
.and(returns(ListIterator.class)),
70+
ConsumerRecordsInstrumentation.class.getName() + "$ListIteratorAdvice");
6371
}
6472

6573
@SuppressWarnings("unused")
6674
public static class IterableAdvice {
6775

68-
@AssignReturned.ToReturned
76+
@SuppressWarnings("unchecked")
6977
@Advice.OnMethodExit(suppress = Throwable.class)
70-
public static <K, V> Iterable<ConsumerRecord<K, V>> wrap(
78+
public static <K, V> void wrap(
7179
@Advice.This ConsumerRecords<?, ?> records,
72-
@Advice.Return Iterable<ConsumerRecord<K, V>> iterable) {
80+
@Advice.Return(readOnly = false) Iterable<ConsumerRecord<K, V>> iterable) {
7381

7482
// it's important not to suppress consumer span creation here because this instrumentation can
7583
// leak the context and so there may be a leaked consumer span in the context, in which
7684
// case it's important to overwrite the leaked span instead of suppressing the correct span
7785
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
7886
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
79-
return TracingIterable.wrap(
80-
iterable, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
87+
iterable =
88+
TracingIterable.wrap(
89+
iterable, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
8190
}
8291
}
8392

8493
@SuppressWarnings("unused")
8594
public static class ListAdvice {
8695

87-
@AssignReturned.ToReturned
96+
@SuppressWarnings("unchecked")
8897
@Advice.OnMethodExit(suppress = Throwable.class)
89-
public static <K, V> List<ConsumerRecord<K, V>> wrap(
98+
public static <K, V> void wrap(
9099
@Advice.This ConsumerRecords<?, ?> records,
91-
@Advice.Return List<ConsumerRecord<K, V>> list) {
100+
@Advice.Return(readOnly = false) List<ConsumerRecord<K, V>> list) {
92101

93102
// it's important not to suppress consumer span creation here because this instrumentation can
94103
// leak the context and so there may be a leaked consumer span in the context, in which
95104
// case it's important to overwrite the leaked span instead of suppressing the correct span
96105
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
97106
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
98-
return TracingList.wrap(
99-
list, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
107+
list =
108+
TracingList.wrap(
109+
list, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
100110
}
101111
}
102112

103113
@SuppressWarnings("unused")
104114
public static class IteratorAdvice {
105115

106-
@AssignReturned.ToReturned
116+
@SuppressWarnings("unchecked")
117+
@Advice.OnMethodExit(suppress = Throwable.class)
118+
public static <K, V> void wrap(
119+
@Advice.This ConsumerRecords<?, ?> records,
120+
@Advice.Return(readOnly = false) Iterator<ConsumerRecord<K, V>> iterator) {
121+
122+
// it's important not to suppress consumer span creation here because this instrumentation can
123+
// leak the context and so there may be a leaked consumer span in the context, in which
124+
// case it's important to overwrite the leaked span instead of suppressing the correct span
125+
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
126+
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
127+
iterator =
128+
TracingIterator.wrap(
129+
iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
130+
}
131+
}
132+
133+
@SuppressWarnings("unused")
134+
public static class ListIteratorAdvice {
135+
136+
@SuppressWarnings("unchecked")
107137
@Advice.OnMethodExit(suppress = Throwable.class)
108-
public static <K, V> Iterator<ConsumerRecord<K, V>> wrap(
138+
public static <K, V> void wrap(
109139
@Advice.This ConsumerRecords<?, ?> records,
110-
@Advice.Return Iterator<ConsumerRecord<K, V>> iterator) {
140+
@Advice.Return(readOnly = false) ListIterator<ConsumerRecord<K, V>> listIterator) {
111141

112142
// it's important not to suppress consumer span creation here because this instrumentation can
113143
// leak the context and so there may be a leaked consumer span in the context, in which
114144
// case it's important to overwrite the leaked span instead of suppressing the correct span
115145
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
116146
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
117-
return TracingIterator.wrap(
118-
iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
147+
listIterator =
148+
TracingListIterator.wrap(
149+
listIterator,
150+
consumerProcessInstrumenter(),
151+
wrappingEnabledSupplier(),
152+
consumerContext);
119153
}
120154
}
121155
}

0 commit comments

Comments
 (0)