Skip to content

Commit bbd21cb

Browse files
committed
fix
1 parent 536c1cd commit bbd21cb

File tree

4 files changed

+166
-10
lines changed

4 files changed

+166
-10
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@
66
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
77

88
import io.opentelemetry.api.OpenTelemetry;
9+
import io.opentelemetry.context.Context;
910
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
11+
import io.opentelemetry.instrumentation.api.internal.Timer;
12+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext;
13+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
1014
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest;
1115
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
1216
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
@@ -19,14 +23,21 @@
1923
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaProducerTelemetrySupplier;
2024
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.OpenTelemetryConsumerInterceptor;
2125
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.OpenTelemetryProducerInterceptor;
26+
import java.lang.reflect.InvocationTargetException;
27+
import java.lang.reflect.Proxy;
2228
import java.util.Collections;
2329
import java.util.HashMap;
2430
import java.util.Map;
2531
import org.apache.kafka.clients.CommonClientConfigs;
32+
import org.apache.kafka.clients.consumer.Consumer;
2633
import org.apache.kafka.clients.consumer.ConsumerConfig;
34+
import org.apache.kafka.clients.consumer.ConsumerRecords;
2735
import org.apache.kafka.clients.consumer.KafkaConsumer;
36+
import org.apache.kafka.clients.producer.Callback;
2837
import org.apache.kafka.clients.producer.KafkaProducer;
38+
import org.apache.kafka.clients.producer.Producer;
2939
import org.apache.kafka.clients.producer.ProducerConfig;
40+
import org.apache.kafka.clients.producer.ProducerRecord;
3041
import org.apache.kafka.clients.producer.RecordMetadata;
3142
import org.apache.kafka.common.metrics.MetricsReporter;
3243

@@ -52,16 +63,77 @@ public final class KafkaTelemetry {
5263
new KafkaConsumerTelemetry(consumerReceiveInstrumenter, consumerProcessInstrumenter);
5364
}
5465

55-
@Deprecated
66+
// this method can be removed when the deprecated TracingProducerInterceptor is removed
5667
KafkaProducerTelemetry getProducerTelemetry() {
5768
return producerTelemetry;
5869
}
5970

60-
@Deprecated
71+
// this method can be removed when the deprecated TracingProducerInterceptor is removed
6172
KafkaConsumerTelemetry getConsumerTelemetry() {
6273
return consumerTelemetry;
6374
}
6475

76+
// TODO consider if this is needed in public API
77+
@SuppressWarnings("unchecked")
78+
public <K, V> Producer<K, V> wrap(Producer<K, V> producer) {
79+
return (Producer<K, V>)
80+
Proxy.newProxyInstance(
81+
KafkaTelemetry.class.getClassLoader(),
82+
new Class<?>[] {Producer.class},
83+
(proxy, method, args) -> {
84+
// Future<RecordMetadata> send(ProducerRecord<K, V> record)
85+
// Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
86+
if ("send".equals(method.getName())
87+
&& method.getParameterCount() >= 1
88+
&& method.getParameterTypes()[0] == ProducerRecord.class) {
89+
ProducerRecord<K, V> record = (ProducerRecord<K, V>) args[0];
90+
Callback callback =
91+
method.getParameterCount() >= 2
92+
&& method.getParameterTypes()[1] == Callback.class
93+
? (Callback) args[1]
94+
: null;
95+
return producerTelemetry.buildAndInjectSpan(record, producer, callback, producer::send);
96+
}
97+
try {
98+
return method.invoke(producer, args);
99+
} catch (InvocationTargetException exception) {
100+
throw exception.getCause();
101+
}
102+
});
103+
}
104+
105+
// TODO consider if this is needed in public API
106+
@SuppressWarnings("unchecked")
107+
public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
108+
return (Consumer<K, V>)
109+
Proxy.newProxyInstance(
110+
KafkaTelemetry.class.getClassLoader(),
111+
new Class<?>[] {Consumer.class},
112+
(proxy, method, args) -> {
113+
Object result;
114+
Timer timer = "poll".equals(method.getName()) ? Timer.start() : null;
115+
try {
116+
result = method.invoke(consumer, args);
117+
} catch (InvocationTargetException exception) {
118+
throw exception.getCause();
119+
}
120+
// ConsumerRecords<K, V> poll(long timeout)
121+
// ConsumerRecords<K, V> poll(Duration duration)
122+
if ("poll".equals(method.getName()) && result instanceof ConsumerRecords) {
123+
ConsumerRecords<K, V> consumerRecords = (ConsumerRecords<K, V>) result;
124+
Context receiveContext =
125+
consumerTelemetry.buildAndFinishSpan(consumerRecords, consumer, timer);
126+
if (receiveContext == null) {
127+
receiveContext = Context.current();
128+
}
129+
KafkaConsumerContext consumerContext =
130+
KafkaConsumerContextUtil.create(receiveContext, consumer);
131+
result = consumerTelemetry.addTracing(consumerRecords, consumerContext);
132+
}
133+
return result;
134+
});
135+
}
136+
65137
/** Returns a new {@link KafkaTelemetry} configured with the given {@link OpenTelemetry}. */
66138
public static KafkaTelemetry create(OpenTelemetry openTelemetry) {
67139
return builder(openTelemetry).build();

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaConsumerTelemetry.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext;
1313
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest;
1414
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
15+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil;
1516
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingList;
1617
import java.util.LinkedHashMap;
1718
import java.util.List;
1819
import java.util.Map;
20+
import org.apache.kafka.clients.consumer.Consumer;
1921
import org.apache.kafka.clients.consumer.ConsumerRecord;
2022
import org.apache.kafka.clients.consumer.ConsumerRecords;
2123
import org.apache.kafka.common.TopicPartition;
@@ -38,6 +40,18 @@ public KafkaConsumerTelemetry(
3840
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
3941
}
4042

43+
// this getter is needed for the deprecated wrap() methods in KafkaTelemetry
44+
public Instrumenter<KafkaProcessRequest, Void> getConsumerProcessInstrumenter() {
45+
return consumerProcessInstrumenter;
46+
}
47+
48+
// this overload is needed for the deprecated wrap() methods in KafkaTelemetry
49+
public <K, V> Context buildAndFinishSpan(
50+
ConsumerRecords<K, V> records, Consumer<K, V> consumer, Timer timer) {
51+
return buildAndFinishSpan(
52+
records, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer), timer);
53+
}
54+
4155
public <K, V> Context buildAndFinishSpan(
4256
ConsumerRecords<K, V> records, String consumerGroup, String clientId, Timer timer) {
4357
if (records.isEmpty()) {

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,17 @@
88
import static java.util.logging.Level.WARNING;
99

1010
import io.opentelemetry.context.Context;
11+
import io.opentelemetry.context.Scope;
1112
import io.opentelemetry.context.propagation.TextMapPropagator;
1213
import io.opentelemetry.context.propagation.TextMapSetter;
1314
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
1415
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaHeadersSetter;
1516
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
17+
import java.util.concurrent.Future;
18+
import java.util.function.BiFunction;
1619
import java.util.logging.Logger;
20+
import org.apache.kafka.clients.producer.Callback;
21+
import org.apache.kafka.clients.producer.Producer;
1722
import org.apache.kafka.clients.producer.ProducerRecord;
1823
import org.apache.kafka.clients.producer.RecordMetadata;
1924
import org.apache.kafka.common.header.Headers;
@@ -42,6 +47,19 @@ public KafkaProducerTelemetry(
4247
this.producerPropagationEnabled = producerPropagationEnabled;
4348
}
4449

50+
// these getters are needed for the deprecated wrap() methods in KafkaTelemetry
51+
public TextMapPropagator getPropagator() {
52+
return propagator;
53+
}
54+
55+
public Instrumenter<KafkaProducerRequest, RecordMetadata> getProducerInstrumenter() {
56+
return producerInstrumenter;
57+
}
58+
59+
public TextMapSetter<Headers> getSetter() {
60+
return SETTER;
61+
}
62+
4563
/**
4664
* Build and inject span into record.
4765
*
@@ -66,4 +84,58 @@ public <K, V> void buildAndInjectSpan(ProducerRecord<K, V> record, String client
6684
}
6785
producerInstrumenter.end(context, request, null, null);
6886
}
87+
88+
/**
89+
* Build and inject span into record.
90+
*
91+
* @param record the producer record to inject span info.
92+
* @param callback the producer send callback
93+
* @return send function's result
94+
*/
95+
@SuppressWarnings("FutureReturnValueIgnored")
96+
public <K, V> Future<RecordMetadata> buildAndInjectSpan(
97+
ProducerRecord<K, V> record,
98+
Producer<K, V> producer,
99+
Callback callback,
100+
BiFunction<ProducerRecord<K, V>, Callback, Future<RecordMetadata>> sendFn) {
101+
Context parentContext = Context.current();
102+
103+
KafkaProducerRequest request = KafkaProducerRequest.create(record, producer);
104+
if (!producerInstrumenter.shouldStart(parentContext, request)) {
105+
return sendFn.apply(record, callback);
106+
}
107+
108+
Context context = producerInstrumenter.start(parentContext, request);
109+
propagator.inject(context, record.headers(), SETTER);
110+
111+
try (Scope ignored = context.makeCurrent()) {
112+
return sendFn.apply(record, new ProducerCallback(callback, parentContext, context, request));
113+
}
114+
}
115+
116+
private class ProducerCallback implements Callback {
117+
private final Callback callback;
118+
private final Context parentContext;
119+
private final Context context;
120+
private final KafkaProducerRequest request;
121+
122+
ProducerCallback(
123+
Callback callback, Context parentContext, Context context, KafkaProducerRequest request) {
124+
this.callback = callback;
125+
this.parentContext = parentContext;
126+
this.context = context;
127+
this.request = request;
128+
}
129+
130+
@Override
131+
public void onCompletion(RecordMetadata metadata, Exception exception) {
132+
producerInstrumenter.end(context, request, metadata, exception);
133+
134+
if (callback != null) {
135+
try (Scope ignored = parentContext.makeCurrent()) {
136+
callback.onCompletion(metadata, exception);
137+
}
138+
}
139+
}
140+
}
69141
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/SerializationTestUtil.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@
1919
class SerializationTestUtil {
2020

2121
/**
22-
* Tests that a configuration map can be serialized and that OpenTelemetry classes are replaced
23-
* with null during serialization (via writeReplace()).
24-
*
25-
* @param map the configuration map to serialize
26-
* @param configKey the key to check for in the deserialized map
22+
* Tests that a configuration map can be serialized and that supplier instance is replaced with
23+
* null during serialization (via writeReplace()).
2724
*/
28-
static void testSerialize(Map<String, Object> map, String configKey)
25+
static void testSerialize(Map<String, Object> map, String supplierKey)
2926
throws IOException, ClassNotFoundException {
30-
Object supplierValue = map.get(configKey);
27+
28+
Object supplierValue = map.get(supplierKey);
3129
assertThat(supplierValue).isNotNull();
3230

3331
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
@@ -56,7 +54,7 @@ protected Class<?> resolveClass(ObjectStreamClass desc)
5654
@SuppressWarnings("unchecked")
5755
Map<String, Object> result = (Map<String, Object>) inputStream.readObject();
5856
// After deserialization, the supplier should be null (replaced via writeReplace())
59-
assertThat(result.get(configKey)).isNull();
57+
assertThat(result.get(supplierKey)).isNull();
6058
}
6159
}
6260

0 commit comments

Comments
 (0)