Skip to content

Commit 12ea869

Browse files
author
Mateusz Rzeszutek
authored
Implement messaging.kafka.* attributes spec (part 1) (#7824)
Part 1 of #7771 It's a lot more than I initially expected it to be; I'll introduce the clientId attribute in part 2
1 parent 0e4e696 commit 12ea869

File tree

49 files changed

+986
-1202
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+986
-1202
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import net.bytebuddy.asm.Advice;
2222
import net.bytebuddy.description.type.TypeDescription;
2323
import net.bytebuddy.matcher.ElementMatcher;
24+
import org.apache.kafka.clients.consumer.Consumer;
2425
import org.apache.kafka.clients.consumer.ConsumerRecord;
2526
import org.apache.kafka.clients.consumer.ConsumerRecords;
2627

@@ -59,6 +60,7 @@ public void transform(TypeTransformer transformer) {
5960
@SuppressWarnings("unused")
6061
public static class IterableAdvice {
6162

63+
@SuppressWarnings("unchecked")
6264
@Advice.OnMethodExit(suppress = Throwable.class)
6365
public static <K, V> void wrap(
6466
@Advice.This ConsumerRecords<?, ?> records,
@@ -69,13 +71,16 @@ public static <K, V> void wrap(
6971
// case it's important to overwrite the leaked span instead of suppressing the correct span
7072
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
7173
Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records);
72-
iterable = TracingIterable.wrap(iterable, receiveContext);
74+
Consumer<K, V> consumer =
75+
VirtualField.find(ConsumerRecords.class, Consumer.class).get(records);
76+
iterable = TracingIterable.wrap(iterable, receiveContext, consumer);
7377
}
7478
}
7579

7680
@SuppressWarnings("unused")
7781
public static class ListAdvice {
7882

83+
@SuppressWarnings("unchecked")
7984
@Advice.OnMethodExit(suppress = Throwable.class)
8085
public static <K, V> void wrap(
8186
@Advice.This ConsumerRecords<?, ?> records,
@@ -86,13 +91,16 @@ public static <K, V> void wrap(
8691
// case it's important to overwrite the leaked span instead of suppressing the correct span
8792
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
8893
Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records);
89-
list = TracingList.wrap(list, receiveContext);
94+
Consumer<K, V> consumer =
95+
VirtualField.find(ConsumerRecords.class, Consumer.class).get(records);
96+
list = TracingList.wrap(list, receiveContext, consumer);
9097
}
9198
}
9299

93100
@SuppressWarnings("unused")
94101
public static class IteratorAdvice {
95102

103+
@SuppressWarnings("unchecked")
96104
@Advice.OnMethodExit(suppress = Throwable.class)
97105
public static <K, V> void wrap(
98106
@Advice.This ConsumerRecords<?, ?> records,
@@ -103,7 +111,9 @@ public static <K, V> void wrap(
103111
// case it's important to overwrite the leaked span instead of suppressing the correct span
104112
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
105113
Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records);
106-
iterator = TracingIterator.wrap(iterator, receiveContext);
114+
Consumer<K, V> consumer =
115+
VirtualField.find(ConsumerRecords.class, Consumer.class).get(records);
116+
iterator = TracingIterator.wrap(iterator, receiveContext, consumer);
107117
}
108118
}
109119
}

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.opentelemetry.context.Context;
1919
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
2020
import io.opentelemetry.instrumentation.api.util.VirtualField;
21+
import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord;
2122
import io.opentelemetry.instrumentation.kafka.internal.Timer;
2223
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
2324
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
@@ -28,6 +29,7 @@
2829
import net.bytebuddy.asm.Advice;
2930
import net.bytebuddy.description.type.TypeDescription;
3031
import net.bytebuddy.matcher.ElementMatcher;
32+
import org.apache.kafka.clients.consumer.Consumer;
3133
import org.apache.kafka.clients.consumer.ConsumerRecord;
3234
import org.apache.kafka.clients.consumer.ConsumerRecords;
3335

@@ -83,6 +85,7 @@ public static Timer onEnter() {
8385
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
8486
public static void onExit(
8587
@Advice.Enter Timer timer,
88+
@Advice.This Consumer<?, ?> consumer,
8689
@Advice.Return ConsumerRecords<?, ?> records,
8790
@Advice.Thrown Throwable error) {
8891

@@ -91,24 +94,32 @@ public static void onExit(
9194
return;
9295
}
9396

97+
// we're attaching the consumer to the records to be able to retrieve things like consumer
98+
// group or clientId later
99+
VirtualField<ConsumerRecords<?, ?>, Consumer<?, ?>> consumerRecordsConsumer =
100+
VirtualField.find(ConsumerRecords.class, Consumer.class);
101+
consumerRecordsConsumer.set(records, consumer);
102+
94103
Context parentContext = currentContext();
95-
if (consumerReceiveInstrumenter().shouldStart(parentContext, records)) {
104+
ConsumerAndRecord<ConsumerRecords<?, ?>> request =
105+
ConsumerAndRecord.create(consumer, records);
106+
107+
if (consumerReceiveInstrumenter().shouldStart(parentContext, request)) {
96108
// disable process tracing and store the receive span for each individual record too
97109
boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false);
98110
try {
99111
Context context =
100112
InstrumenterUtil.startAndEnd(
101113
consumerReceiveInstrumenter(),
102114
parentContext,
103-
records,
115+
request,
104116
null,
105117
error,
106118
timer.startTime(),
107119
timer.now());
108120

109121
// we're storing the context of the receive span so that process spans can use it as
110-
// parent
111-
// context even though the span has ended
122+
// parent context even though the span has ended
112123
// this is the suggested behavior according to the spec batch receive scenario:
113124
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving
114125
VirtualField<ConsumerRecords<?, ?>, Context> consumerRecordsContext =

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import io.opentelemetry.api.GlobalOpenTelemetry;
99
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
10+
import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord;
1011
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
1112
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
1213
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
@@ -34,8 +35,10 @@ public final class KafkaSingletons {
3435
.getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true);
3536

3637
private static final Instrumenter<ProducerRecord<?, ?>, RecordMetadata> PRODUCER_INSTRUMENTER;
37-
private static final Instrumenter<ConsumerRecords<?, ?>, Void> CONSUMER_RECEIVE_INSTRUMENTER;
38-
private static final Instrumenter<ConsumerRecord<?, ?>, Void> CONSUMER_PROCESS_INSTRUMENTER;
38+
private static final Instrumenter<ConsumerAndRecord<ConsumerRecords<?, ?>>, Void>
39+
CONSUMER_RECEIVE_INSTRUMENTER;
40+
private static final Instrumenter<ConsumerAndRecord<ConsumerRecord<?, ?>>, Void>
41+
CONSUMER_PROCESS_INSTRUMENTER;
3942

4043
static {
4144
KafkaInstrumenterFactory instrumenterFactory =
@@ -59,11 +62,13 @@ public static boolean isProducerPropagationEnabled() {
5962
return PRODUCER_INSTRUMENTER;
6063
}
6164

62-
public static Instrumenter<ConsumerRecords<?, ?>, Void> consumerReceiveInstrumenter() {
65+
public static Instrumenter<ConsumerAndRecord<ConsumerRecords<?, ?>>, Void>
66+
consumerReceiveInstrumenter() {
6367
return CONSUMER_RECEIVE_INSTRUMENTER;
6468
}
6569

66-
public static Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter() {
70+
public static Instrumenter<ConsumerAndRecord<ConsumerRecord<?, ?>>, Void>
71+
consumerProcessInstrumenter() {
6772
return CONSUMER_PROCESS_INSTRUMENTER;
6873
}
6974

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterable.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,30 @@
99
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
1010
import java.util.Iterator;
1111
import javax.annotation.Nullable;
12+
import org.apache.kafka.clients.consumer.Consumer;
1213
import org.apache.kafka.clients.consumer.ConsumerRecord;
1314

1415
public class TracingIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
1516
private final Iterable<ConsumerRecord<K, V>> delegate;
1617
@Nullable private final Context receiveContext;
18+
private final Consumer<K, V> consumer;
1719
private boolean firstIterator = true;
1820

1921
protected TracingIterable(
20-
Iterable<ConsumerRecord<K, V>> delegate, @Nullable Context receiveContext) {
22+
Iterable<ConsumerRecord<K, V>> delegate,
23+
@Nullable Context receiveContext,
24+
Consumer<K, V> consumer) {
2125
this.delegate = delegate;
2226
this.receiveContext = receiveContext;
27+
this.consumer = consumer;
2328
}
2429

2530
public static <K, V> Iterable<ConsumerRecord<K, V>> wrap(
26-
Iterable<ConsumerRecord<K, V>> delegate, @Nullable Context receiveContext) {
31+
Iterable<ConsumerRecord<K, V>> delegate,
32+
@Nullable Context receiveContext,
33+
Consumer<K, V> consumer) {
2734
if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
28-
return new TracingIterable<>(delegate, receiveContext);
35+
return new TracingIterable<>(delegate, receiveContext, consumer);
2936
}
3037
return delegate;
3138
}
@@ -37,7 +44,7 @@ public Iterator<ConsumerRecord<K, V>> iterator() {
3744
// However, this is not thread-safe, but usually the first (hopefully only) traversal of
3845
// ConsumerRecords is performed in the same thread that called poll()
3946
if (firstIterator) {
40-
it = TracingIterator.wrap(delegate.iterator(), receiveContext);
47+
it = TracingIterator.wrap(delegate.iterator(), receiveContext, consumer);
4148
firstIterator = false;
4249
} else {
4350
it = delegate.iterator();

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterator.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,35 +9,44 @@
99

1010
import io.opentelemetry.context.Context;
1111
import io.opentelemetry.context.Scope;
12+
import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord;
1213
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
1314
import java.util.Iterator;
1415
import javax.annotation.Nullable;
16+
import org.apache.kafka.clients.consumer.Consumer;
1517
import org.apache.kafka.clients.consumer.ConsumerRecord;
1618

1719
public class TracingIterator<K, V> implements Iterator<ConsumerRecord<K, V>> {
20+
1821
private final Iterator<ConsumerRecord<K, V>> delegateIterator;
1922
private final Context parentContext;
23+
private final Consumer<K, V> consumer;
2024

2125
/*
2226
* Note: this may potentially create problems if this iterator is used from different threads. But
2327
* at the moment we cannot do much about this.
2428
*/
25-
@Nullable private ConsumerRecord<?, ?> currentRequest;
29+
@Nullable private ConsumerAndRecord<ConsumerRecord<?, ?>> currentRequest;
2630
@Nullable private Context currentContext;
2731
@Nullable private Scope currentScope;
2832

2933
private TracingIterator(
30-
Iterator<ConsumerRecord<K, V>> delegateIterator, @Nullable Context receiveContext) {
34+
Iterator<ConsumerRecord<K, V>> delegateIterator,
35+
@Nullable Context receiveContext,
36+
Consumer<K, V> consumer) {
3137
this.delegateIterator = delegateIterator;
3238

3339
// use the receive CONSUMER as parent if it's available
3440
this.parentContext = receiveContext != null ? receiveContext : Context.current();
41+
this.consumer = consumer;
3542
}
3643

3744
public static <K, V> Iterator<ConsumerRecord<K, V>> wrap(
38-
Iterator<ConsumerRecord<K, V>> delegateIterator, @Nullable Context receiveContext) {
45+
Iterator<ConsumerRecord<K, V>> delegateIterator,
46+
@Nullable Context receiveContext,
47+
Consumer<K, V> consumer) {
3948
if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
40-
return new TracingIterator<>(delegateIterator, receiveContext);
49+
return new TracingIterator<>(delegateIterator, receiveContext, consumer);
4150
}
4251
return delegateIterator;
4352
}
@@ -60,7 +69,7 @@ public ConsumerRecord<K, V> next() {
6069
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
6170
ConsumerRecord<K, V> next = delegateIterator.next();
6271
if (next != null && KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
63-
currentRequest = next;
72+
currentRequest = ConsumerAndRecord.create(consumer, next);
6473
currentContext = consumerProcessInstrumenter().start(parentContext, currentRequest);
6574
currentScope = currentContext.makeCurrent();
6675
}

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingList.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,26 @@
1111
import java.util.List;
1212
import java.util.ListIterator;
1313
import javax.annotation.Nullable;
14+
import org.apache.kafka.clients.consumer.Consumer;
1415
import org.apache.kafka.clients.consumer.ConsumerRecord;
1516

1617
public class TracingList<K, V> extends TracingIterable<K, V> implements List<ConsumerRecord<K, V>> {
1718
private final List<ConsumerRecord<K, V>> delegate;
1819

19-
private TracingList(List<ConsumerRecord<K, V>> delegate, @Nullable Context receiveContext) {
20-
super(delegate, receiveContext);
20+
private TracingList(
21+
List<ConsumerRecord<K, V>> delegate,
22+
@Nullable Context receiveContext,
23+
Consumer<K, V> consumer) {
24+
super(delegate, receiveContext, consumer);
2125
this.delegate = delegate;
2226
}
2327

2428
public static <K, V> List<ConsumerRecord<K, V>> wrap(
25-
List<ConsumerRecord<K, V>> delegate, @Nullable Context receiveContext) {
29+
List<ConsumerRecord<K, V>> delegate,
30+
@Nullable Context receiveContext,
31+
Consumer<K, V> consumer) {
2632
if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
27-
return new TracingList<>(delegate, receiveContext);
33+
return new TracingList<>(delegate, receiveContext, consumer);
2834
}
2935
return delegate;
3036
}

0 commit comments

Comments
 (0)