Skip to content

Commit 85ec089

Browse files
Copilottrask
andcommitted
Avoid changing KafkaTelemetry - move config keys to interceptors
Co-authored-by: trask <[email protected]>
1 parent b187791 commit 85ec089

File tree

5 files changed

+26
-66
lines changed

5 files changed

+26
-66
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ KafkaTelemetry telemetry = KafkaTelemetry.builder(openTelemetry)
5151
Map<String, Object> props = new HashMap<>();
5252
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
5353
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
54-
props.putAll(telemetry.producerInterceptorConfigProperties());
54+
props.put(TracingProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
55+
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaTelemetrySupplier(telemetry));
5556

5657
Producer<String, String> producer = new KafkaProducer<>(props);
5758
```
@@ -68,7 +69,8 @@ Map<String, Object> props = new HashMap<>();
6869
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
6970
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
7071
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
71-
props.putAll(telemetry.consumerInterceptorConfigProperties());
72+
props.put(TracingConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
73+
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaTelemetrySupplier(telemetry));
7274

7375
Consumer<String, String> consumer = new KafkaConsumer<>(props);
7476
```

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest;
2222
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
2323
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
24-
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaTelemetrySupplier;
2524
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil;
2625
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.MetricsReporterList;
2726
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetryMetricsReporter;
@@ -208,61 +207,6 @@ <K, V> ConsumerRecords<K, V> addTracing(
208207
return Collections.unmodifiableMap(config);
209208
}
210209

211-
/**
212-
* Returns a config property key that can be used to pass this {@link KafkaTelemetry} instance to
213-
* interceptors.
214-
*
215-
* <p>This is an internal config key used by the library instrumentation interceptors.
216-
*/
217-
static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
218-
"opentelemetry.experimental.kafka-telemetry.supplier";
219-
220-
/**
221-
* Produces a set of kafka producer config properties to configure tracing in {@code
222-
* TracingProducerInterceptor}. Add these resulting properties to the configuration map used to
223-
* initialize a {@link KafkaProducer}.
224-
*
225-
* <p>Example usage:
226-
*
227-
* <pre>{@code
228-
* // Map<String, Object> config = new HashMap<>();
229-
* // config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
230-
* // config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
231-
* // config.putAll(kafkaTelemetry.producerInterceptorConfigProperties());
232-
* // try (KafkaProducer<?, ?> producer = new KafkaProducer<>(config)) { ... }
233-
* }</pre>
234-
*
235-
* @return the kafka producer interceptor config properties
236-
*/
237-
public Map<String, ?> producerInterceptorConfigProperties() {
238-
Map<String, Object> config = new HashMap<>();
239-
config.put(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER, new KafkaTelemetrySupplier(this));
240-
return Collections.unmodifiableMap(config);
241-
}
242-
243-
/**
244-
* Produces a set of kafka consumer config properties to configure tracing in {@code
245-
* TracingConsumerInterceptor}. Add these resulting properties to the configuration map used to
246-
* initialize a {@link KafkaConsumer}.
247-
*
248-
* <p>Example usage:
249-
*
250-
* <pre>{@code
251-
* // Map<String, Object> config = new HashMap<>();
252-
* // config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
253-
* // config.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
254-
* // config.putAll(kafkaTelemetry.consumerInterceptorConfigProperties());
255-
* // try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(config)) { ... }
256-
* }</pre>
257-
*
258-
* @return the kafka consumer interceptor config properties
259-
*/
260-
public Map<String, ?> consumerInterceptorConfigProperties() {
261-
Map<String, Object> config = new HashMap<>();
262-
config.put(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER, new KafkaTelemetrySupplier(this));
263-
return Collections.unmodifiableMap(config);
264-
}
265-
266210
/**
267211
* Build and inject span into record.
268212
*

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
*/
3232
public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
3333

34+
public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
35+
"opentelemetry.experimental.kafka-telemetry.supplier";
36+
3437
@Nullable private KafkaTelemetry telemetry;
3538
private String consumerGroup;
3639
private String clientId;
@@ -63,7 +66,7 @@ public void configure(Map<String, ?> configs) {
6366
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null);
6467
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null);
6568

66-
Object telemetrySupplier = configs.get(KafkaTelemetry.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
69+
Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
6770
if (telemetrySupplier == null) {
6871
// Fallback to GlobalOpenTelemetry if not configured
6972
this.telemetry =
@@ -82,15 +85,15 @@ public void configure(Map<String, ?> configs) {
8285
if (!(telemetrySupplier instanceof Supplier)) {
8386
throw new IllegalStateException(
8487
"Configuration property "
85-
+ KafkaTelemetry.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
88+
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
8689
+ " is not instance of Supplier");
8790
}
8891

8992
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
9093
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
9194
throw new IllegalStateException(
9295
"Configuration property "
93-
+ KafkaTelemetry.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
96+
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
9497
+ " supplier does not return KafkaTelemetry instance");
9598
}
9699

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
*/
2727
public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
2828

29+
public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
30+
"opentelemetry.experimental.kafka-telemetry.supplier";
31+
2932
@Nullable private KafkaTelemetry telemetry;
3033
@Nullable private String clientId;
3134

@@ -48,7 +51,7 @@ public void close() {}
4851
public void configure(Map<String, ?> configs) {
4952
clientId = Objects.toString(configs.get(ProducerConfig.CLIENT_ID_CONFIG), null);
5053

51-
Object telemetrySupplier = configs.get(KafkaTelemetry.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
54+
Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
5255
if (telemetrySupplier == null) {
5356
// Fallback to GlobalOpenTelemetry if not configured
5457
this.telemetry =
@@ -63,15 +66,15 @@ public void configure(Map<String, ?> configs) {
6366
if (!(telemetrySupplier instanceof Supplier)) {
6467
throw new IllegalStateException(
6568
"Configuration property "
66-
+ KafkaTelemetry.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
69+
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
6770
+ " is not instance of Supplier");
6871
}
6972

7073
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
7174
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
7275
throw new IllegalStateException(
7376
"Configuration property "
74-
+ KafkaTelemetry.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
77+
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
7578
+ " supplier does not return KafkaTelemetry instance");
7679
}
7780

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,11 @@ public Map<String, Object> producerProps() {
4545
Map<String, Object> props = super.producerProps();
4646
props.put(
4747
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
48-
props.putAll(kafkaTelemetry.producerInterceptorConfigProperties());
48+
props.put(
49+
TracingProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
50+
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal
51+
.KafkaTelemetrySupplier(
52+
kafkaTelemetry));
4953
return props;
5054
}
5155

@@ -57,7 +61,11 @@ public Map<String, Object> consumerProps() {
5761
Map<String, Object> props = super.consumerProps();
5862
props.put(
5963
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
60-
props.putAll(kafkaTelemetry.consumerInterceptorConfigProperties());
64+
props.put(
65+
TracingConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
66+
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal
67+
.KafkaTelemetrySupplier(
68+
kafkaTelemetry));
6169
return props;
6270
}
6371

0 commit comments

Comments
 (0)