Skip to content

Commit 3b68915

Browse files
Copilottrask
andcommitted
Refactor kafka interceptors to be configurable
Co-authored-by: trask <[email protected]>
1 parent 34d68ec commit 3b68915

File tree

8 files changed

+166
-22
lines changed

8 files changed

+166
-22
lines changed

conventions/.kotlin/sessions/kotlin-compiler-16318826606158418015.salive

Whitespace-only changes.

gradle-plugins/.kotlin/sessions/kotlin-compiler-16068091306083868015.salive

Whitespace-only changes.

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest;
2222
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
2323
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
24+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaTelemetrySupplier;
2425
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil;
2526
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.MetricsReporterList;
2627
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetryMetricsReporter;
@@ -207,6 +208,61 @@ <K, V> ConsumerRecords<K, V> addTracing(
207208
return Collections.unmodifiableMap(config);
208209
}
209210

211+
/**
212+
* Returns a config property key that can be used to pass this {@link KafkaTelemetry} instance to
213+
* interceptors.
214+
*
215+
* <p>This is an internal config key used by the library instrumentation interceptors.
216+
*/
217+
static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
218+
"opentelemetry.experimental.kafka-telemetry.supplier";
219+
220+
/**
221+
* Produces a set of kafka producer config properties to configure tracing in {@code
222+
* TracingProducerInterceptor}. Add these resulting properties to the configuration map used to
223+
* initialize a {@link KafkaProducer}.
224+
*
225+
* <p>Example usage:
226+
*
227+
* <pre>{@code
228+
* // Map<String, Object> config = new HashMap<>();
229+
* // config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
230+
* // config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
231+
* // config.putAll(kafkaTelemetry.producerInterceptorConfigProperties());
232+
* // try (KafkaProducer<?, ?> producer = new KafkaProducer<>(config)) { ... }
233+
* }</pre>
234+
*
235+
* @return the kafka producer interceptor config properties
236+
*/
237+
public Map<String, ?> producerInterceptorConfigProperties() {
238+
Map<String, Object> config = new HashMap<>();
239+
config.put(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER, new KafkaTelemetrySupplier(this));
240+
return Collections.unmodifiableMap(config);
241+
}
242+
243+
/**
244+
* Produces a set of kafka consumer config properties to configure tracing in {@code
245+
* TracingConsumerInterceptor}. Add these resulting properties to the configuration map used to
246+
* initialize a {@link KafkaConsumer}.
247+
*
248+
* <p>Example usage:
249+
*
250+
* <pre>{@code
251+
* // Map<String, Object> config = new HashMap<>();
252+
* // config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
253+
* // config.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
254+
* // config.putAll(kafkaTelemetry.consumerInterceptorConfigProperties());
255+
* // try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(config)) { ... }
256+
* }</pre>
257+
*
258+
* @return the kafka consumer interceptor config properties
259+
*/
260+
public Map<String, ?> consumerInterceptorConfigProperties() {
261+
Map<String, Object> config = new HashMap<>();
262+
config.put(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER, new KafkaTelemetrySupplier(this));
263+
return Collections.unmodifiableMap(config);
264+
}
265+
210266
/**
211267
* Build and inject span into record.
212268
*

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
1717
import java.util.Map;
1818
import java.util.Objects;
19+
import java.util.function.Supplier;
20+
import javax.annotation.Nullable;
1921
import org.apache.kafka.clients.consumer.ConsumerConfig;
2022
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
2123
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -29,22 +31,16 @@
2931
*/
3032
public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
3133

32-
private static final KafkaTelemetry telemetry =
33-
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
34-
.setMessagingReceiveInstrumentationEnabled(
35-
ConfigPropertiesUtil.getBoolean(
36-
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
37-
.setCapturedHeaders(
38-
ConfigPropertiesUtil.getList(
39-
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
40-
.build();
41-
34+
@Nullable private KafkaTelemetry telemetry;
4235
private String consumerGroup;
4336
private String clientId;
4437

4538
@Override
4639
@CanIgnoreReturnValue
4740
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
41+
if (telemetry == null) {
42+
return records;
43+
}
4844
// timer should be started before fetching ConsumerRecords, but there is no callback for that
4945
Timer timer = Timer.start();
5046
Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer);
@@ -67,6 +63,25 @@ public void configure(Map<String, ?> configs) {
6763
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null);
6864
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null);
6965

70-
// TODO: support experimental attributes config
66+
// Try to get KafkaTelemetry from config
67+
Object telemetrySupplier = configs.get(KafkaTelemetry.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
68+
if (telemetrySupplier instanceof Supplier) {
69+
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
70+
if (kafkaTelemetry instanceof KafkaTelemetry) {
71+
this.telemetry = (KafkaTelemetry) kafkaTelemetry;
72+
return;
73+
}
74+
}
75+
76+
// Fallback to GlobalOpenTelemetry if not configured
77+
this.telemetry =
78+
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
79+
.setMessagingReceiveInstrumentationEnabled(
80+
ConfigPropertiesUtil.getBoolean(
81+
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
82+
.setCapturedHeaders(
83+
ConfigPropertiesUtil.getList(
84+
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
85+
.build();
7186
}
7287
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
1313
import java.util.Map;
1414
import java.util.Objects;
15+
import java.util.function.Supplier;
1516
import javax.annotation.Nullable;
1617
import org.apache.kafka.clients.producer.ProducerConfig;
1718
import org.apache.kafka.clients.producer.ProducerInterceptor;
@@ -25,19 +26,15 @@
2526
*/
2627
public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
2728

28-
private static final KafkaTelemetry telemetry =
29-
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
30-
.setCapturedHeaders(
31-
ConfigPropertiesUtil.getList(
32-
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
33-
.build();
34-
29+
@Nullable private KafkaTelemetry telemetry;
3530
@Nullable private String clientId;
3631

3732
@Override
3833
@CanIgnoreReturnValue
3934
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
40-
telemetry.buildAndInjectSpan(producerRecord, clientId);
35+
if (telemetry != null) {
36+
telemetry.buildAndInjectSpan(producerRecord, clientId);
37+
}
4138
return producerRecord;
4239
}
4340

@@ -48,9 +45,25 @@ public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}
4845
public void close() {}
4946

5047
@Override
51-
public void configure(Map<String, ?> map) {
52-
clientId = Objects.toString(map.get(ProducerConfig.CLIENT_ID_CONFIG), null);
48+
public void configure(Map<String, ?> configs) {
49+
clientId = Objects.toString(configs.get(ProducerConfig.CLIENT_ID_CONFIG), null);
50+
51+
// Try to get KafkaTelemetry from config
52+
Object telemetrySupplier = configs.get(KafkaTelemetry.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
53+
if (telemetrySupplier instanceof Supplier) {
54+
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
55+
if (kafkaTelemetry instanceof KafkaTelemetry) {
56+
this.telemetry = (KafkaTelemetry) kafkaTelemetry;
57+
return;
58+
}
59+
}
5360

54-
// TODO: support experimental attributes config
61+
// Fallback to GlobalOpenTelemetry if not configured
62+
this.telemetry =
63+
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
64+
.setCapturedHeaders(
65+
ConfigPropertiesUtil.getList(
66+
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
67+
.build();
5568
}
5669
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,36 @@ abstract class AbstractInterceptorsTest extends KafkaClientBaseTest {
2828

2929
static final String greeting = "Hello Kafka!";
3030

31+
private static KafkaTelemetry kafkaTelemetry;
32+
33+
protected static KafkaTelemetry createKafkaTelemetry() {
34+
return KafkaTelemetry.builder(testing.getOpenTelemetry())
35+
.setCapturedHeaders(java.util.Collections.singletonList("Test-Message-Header"))
36+
.setMessagingReceiveInstrumentationEnabled(true)
37+
.build();
38+
}
39+
3140
@Override
3241
public Map<String, Object> producerProps() {
42+
if (kafkaTelemetry == null) {
43+
kafkaTelemetry = createKafkaTelemetry();
44+
}
3345
Map<String, Object> props = super.producerProps();
3446
props.put(
3547
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
48+
props.putAll(kafkaTelemetry.producerInterceptorConfigProperties());
3649
return props;
3750
}
3851

3952
@Override
4053
public Map<String, Object> consumerProps() {
54+
if (kafkaTelemetry == null) {
55+
kafkaTelemetry = createKafkaTelemetry();
56+
}
4157
Map<String, Object> props = super.consumerProps();
4258
props.put(
4359
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
60+
props.putAll(kafkaTelemetry.consumerInterceptorConfigProperties());
4461
return props;
4562
}
4663

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@
2323

2424
class InterceptorsSuppressReceiveSpansTest extends AbstractInterceptorsTest {
2525

26+
protected static KafkaTelemetry createKafkaTelemetry() {
27+
return KafkaTelemetry.builder(testing.getOpenTelemetry())
28+
.setMessagingReceiveInstrumentationEnabled(false)
29+
.build();
30+
}
31+
2632
@SuppressWarnings("deprecation") // using deprecated semconv
2733
@Override
2834
void assertTraces() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal;
7+
8+
import java.io.Serializable;
9+
import java.util.Objects;
10+
import java.util.function.Supplier;
11+
12+
/**
13+
* Wrapper for KafkaTelemetry that can be injected into kafka configuration without breaking
14+
* serialization. Used by kafka interceptors to get a configured KafkaTelemetry instance.
15+
*
16+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
17+
* at any time.
18+
*/
19+
public final class KafkaTelemetrySupplier implements Supplier<Object>, Serializable {
20+
private static final long serialVersionUID = 1L;
21+
private final transient Object kafkaTelemetry;
22+
23+
public KafkaTelemetrySupplier(Object kafkaTelemetry) {
24+
Objects.requireNonNull(kafkaTelemetry);
25+
this.kafkaTelemetry = kafkaTelemetry;
26+
}
27+
28+
@Override
29+
public Object get() {
30+
return kafkaTelemetry;
31+
}
32+
33+
private Object writeReplace() {
34+
// serialize this object to null
35+
return null;
36+
}
37+
}

0 commit comments

Comments
 (0)