Skip to content

Commit 4f02ea3

Browse files
committed
fix instrumentation
1 parent 9915090 commit 4f02ea3

File tree

2 files changed

+22
-28
lines changed

2 files changed

+22
-28
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.List;
2727
import java.util.ListIterator;
2828
import net.bytebuddy.asm.Advice;
29+
import net.bytebuddy.asm.Advice.AssignReturned;
2930
import net.bytebuddy.description.type.TypeDescription;
3031
import net.bytebuddy.matcher.ElementMatcher;
3132
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -73,83 +74,76 @@ public void transform(TypeTransformer transformer) {
7374
@SuppressWarnings("unused")
7475
public static class IterableAdvice {
7576

76-
@SuppressWarnings("unchecked")
77+
@AssignReturned.ToReturned
7778
@Advice.OnMethodExit(suppress = Throwable.class)
78-
public static <K, V> void wrap(
79+
public static <K, V> Iterable<ConsumerRecord<K, V>> wrap(
7980
@Advice.This ConsumerRecords<?, ?> records,
80-
@Advice.Return(readOnly = false) Iterable<ConsumerRecord<K, V>> iterable) {
81+
@Advice.Return Iterable<ConsumerRecord<K, V>> iterable) {
8182

8283
// it's important not to suppress consumer span creation here because this instrumentation can
8384
// leak the context and so there may be a leaked consumer span in the context, in which
8485
// case it's important to overwrite the leaked span instead of suppressing the correct span
8586
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
8687
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
87-
iterable =
88-
TracingIterable.wrap(
89-
iterable, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
88+
return TracingIterable.wrap(
89+
iterable, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
9090
}
9191
}
9292

9393
@SuppressWarnings("unused")
9494
public static class ListAdvice {
9595

96-
@SuppressWarnings("unchecked")
96+
@AssignReturned.ToReturned
9797
@Advice.OnMethodExit(suppress = Throwable.class)
98-
public static <K, V> void wrap(
98+
public static <K, V> List<ConsumerRecord<K, V>> wrap(
9999
@Advice.This ConsumerRecords<?, ?> records,
100-
@Advice.Return(readOnly = false) List<ConsumerRecord<K, V>> list) {
100+
@Advice.Return List<ConsumerRecord<K, V>> list) {
101101

102102
// it's important not to suppress consumer span creation here because this instrumentation can
103103
// leak the context and so there may be a leaked consumer span in the context, in which
104104
// case it's important to overwrite the leaked span instead of suppressing the correct span
105105
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
106106
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
107-
list =
108-
TracingList.wrap(
109-
list, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
107+
return TracingList.wrap(
108+
list, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
110109
}
111110
}
112111

113112
@SuppressWarnings("unused")
114113
public static class IteratorAdvice {
115114

116-
@SuppressWarnings("unchecked")
115+
@AssignReturned.ToReturned
117116
@Advice.OnMethodExit(suppress = Throwable.class)
118-
public static <K, V> void wrap(
117+
public static <K, V> Iterator<ConsumerRecord<K, V>> wrap(
119118
@Advice.This ConsumerRecords<?, ?> records,
120-
@Advice.Return(readOnly = false) Iterator<ConsumerRecord<K, V>> iterator) {
119+
@Advice.Return Iterator<ConsumerRecord<K, V>> iterator) {
121120

122121
// it's important not to suppress consumer span creation here because this instrumentation can
123122
// leak the context and so there may be a leaked consumer span in the context, in which
124123
// case it's important to overwrite the leaked span instead of suppressing the correct span
125124
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
126125
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
127-
iterator =
128-
TracingIterator.wrap(
129-
iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
126+
return TracingIterator.wrap(
127+
iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
130128
}
131129
}
132130

133131
@SuppressWarnings("unused")
134132
public static class ListIteratorAdvice {
135133

136-
@SuppressWarnings("unchecked")
134+
@AssignReturned.ToReturned
137135
@Advice.OnMethodExit(suppress = Throwable.class)
138-
public static <K, V> void wrap(
136+
public static <K, V> ListIterator<ConsumerRecord<K, V>> wrap(
139137
@Advice.This ConsumerRecords<?, ?> records,
140-
@Advice.Return(readOnly = false) ListIterator<ConsumerRecord<K, V>> listIterator) {
138+
@Advice.Return ListIterator<ConsumerRecord<K, V>> listIterator) {
141139

142140
// it's important not to suppress consumer span creation here because this instrumentation can
143141
// leak the context and so there may be a leaked consumer span in the context, in which
144142
// case it's important to overwrite the leaked span instead of suppressing the correct span
145143
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
146144
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
147-
listIterator =
148-
TracingListIterator.wrap(
149-
listIterator,
150-
consumerProcessInstrumenter(),
151-
wrappingEnabledSupplier(),
152-
consumerContext);
145+
return TracingListIterator.wrap(
146+
listIterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
153147
}
154148
}
155149
}

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ void testRecordsWithTopicPartitionKafkaConsume(boolean testListIterator)
172172
consumerRecords.records(KafkaClientBaseTest.topicPartition);
173173
assertThat(recordsInPartition.size()).isEqualTo(1);
174174

175+
// iterate over records to generate spans
175176
if (testListIterator) {
176177
for (ListIterator<? extends ConsumerRecord<?, ?>> iterator =
177178
recordsInPartition.listIterator();
@@ -181,7 +182,6 @@ void testRecordsWithTopicPartitionKafkaConsume(boolean testListIterator)
181182
assertThat(record.key()).isNull();
182183
}
183184
} else {
184-
// iterate over records to generate spans
185185
for (ConsumerRecord<?, ?> record : recordsInPartition) {
186186
assertThat(record.value()).isEqualTo(greeting);
187187
assertThat(record.key()).isNull();

0 commit comments

Comments
 (0)