diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md index 2d4019de6fd1..22480c034082 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md @@ -35,18 +35,59 @@ The Kafka clients API provides a way to "intercept" messages before they are sen The OpenTelemetry instrumented Kafka library provides two interceptors to be configured to add tracing information automatically. The interceptor class has to be set in the properties bag used to create the Kafka client. -Use the `TracingProducerInterceptor` for the producer in order to create a "send" span automatically, each time a message is sent. +##### Recommended approach: Configuring interceptors with KafkaTelemetry + +The recommended way to use interceptors is to configure them with a `KafkaTelemetry` instance. +Interceptors will use system properties for additional configuration like captured headers and receive telemetry settings. + +For the producer: + +```java +KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry); + +Map props = new HashMap<>(); +props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); +props.putAll(telemetry.producerInterceptorConfigProperties()); + +Producer producer = new KafkaProducer<>(props); +``` + +For the consumer: + +```java +KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry); + +Map props = new HashMap<>(); +props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); +props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); +props.putAll(telemetry.consumerInterceptorConfigProperties()); + +Consumer consumer = new KafkaConsumer<>(props); +``` + +##### Alternative: Using interceptors with global OpenTelemetry + +If you don't explicitly configure the interceptors with a `KafkaTelemetry` instance, they will fall back to using +`GlobalOpenTelemetry.get()` and system properties for configuration. + +Use the `OpenTelemetryProducerInterceptor` for the producer in order to create a "send" span automatically, each time a message is sent. ```java -props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); +props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, OpenTelemetryProducerInterceptor.class.getName()); ``` -Use the `TracingConsumerInterceptor` for the consumer in order to create a "receive" span automatically, each time a message is received. +Use the `OpenTelemetryConsumerInterceptor` for the consumer in order to create a "receive" span automatically, each time a message is received. ```java -props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); +props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, OpenTelemetryConsumerInterceptor.class.getName()); ``` +Note: The `TracingProducerInterceptor` and `TracingConsumerInterceptor` classes are still available for backwards compatibility, but new code should use the `OpenTelemetry*` variants. + +The interceptors will use the following system properties for configuration: +- `otel.instrumentation.messaging.experimental.receive-telemetry.enabled` - Enable receive telemetry (default: false) +- `otel.instrumentation.messaging.experimental.capture-headers` - List of headers to capture as span attributes + #### Wrapping clients The other way is by wrapping the Kafka client with a tracing enabled Kafka client. diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java index d66003f6607f..77f91e110afa 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java @@ -38,12 +38,14 @@ import java.util.logging.Logger; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; @@ -207,6 +209,62 @@ ConsumerRecords addTracing( return Collections.unmodifiableMap(config); } + /** + * Returns configuration properties that can be used to enable OpenTelemetry instrumentation via + * {@code OpenTelemetryProducerInterceptor}. Add these resulting properties to the configuration + * map used to initialize a {@link org.apache.kafka.clients.producer.KafkaProducer}. + * + *

Example usage: + * + *

{@code
+   * //    KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
+   * //    Map config = new HashMap<>();
+   * //    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
+   * //    config.putAll(telemetry.producerInterceptorConfigProperties());
+   * //    try (KafkaProducer producer = new KafkaProducer<>(config)) { ... }
+   * }
+ * + * @return the kafka producer interceptor config properties + */ + public Map producerInterceptorConfigProperties() { + Map config = new HashMap<>(); + config.put( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, + OpenTelemetryProducerInterceptor.class.getName()); + config.put( + OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER, + new KafkaTelemetrySupplier(this)); + return Collections.unmodifiableMap(config); + } + + /** + * Returns configuration properties that can be used to enable OpenTelemetry instrumentation via + * {@code OpenTelemetryConsumerInterceptor}. Add these resulting properties to the configuration + * map used to initialize a {@link org.apache.kafka.clients.consumer.KafkaConsumer}. + * + *

Example usage: + * + *

{@code
+   * //    KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
+   * //    Map config = new HashMap<>();
+   * //    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
+   * //    config.putAll(telemetry.consumerInterceptorConfigProperties());
+   * //    try (KafkaConsumer consumer = new KafkaConsumer<>(config)) { ... }
+   * }
+ * + * @return the kafka consumer interceptor config properties + */ + public Map consumerInterceptorConfigProperties() { + Map config = new HashMap<>(); + config.put( + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + OpenTelemetryConsumerInterceptor.class.getName()); + config.put( + OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER, + new KafkaTelemetrySupplier(this)); + return Collections.unmodifiableMap(config); + } + /** * Build and inject span into record. * diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetrySupplier.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetrySupplier.java new file mode 100644 index 000000000000..9d87a3b1af21 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetrySupplier.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Wrapper for KafkaTelemetry that can be injected into kafka configuration without breaking + * serialization. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +final class KafkaTelemetrySupplier implements Supplier, Serializable { + private static final long serialVersionUID = 1L; + private final transient Object kafkaTelemetry; + + KafkaTelemetrySupplier(Object kafkaTelemetry) { + Objects.requireNonNull(kafkaTelemetry); + this.kafkaTelemetry = kafkaTelemetry; + } + + @Override + public Object get() { + return kafkaTelemetry; + } + + private Object writeReplace() { + // serialize this object to null + return null; + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/OpenTelemetryConsumerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/OpenTelemetryConsumerInterceptor.java new file mode 100644 index 000000000000..5a98f18f71d7 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/OpenTelemetryConsumerInterceptor.java @@ -0,0 +1,92 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.internal.Timer; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil; +import java.util.Map; +import java.util.Objects; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerInterceptor; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +/** + * A ConsumerInterceptor that adds OpenTelemetry instrumentation. Add this interceptor's class name + * or class via ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Consumer's properties to + * get it instantiated and used. See more details on ConsumerInterceptor usage in its Javadoc. + * + *

To configure the interceptor, use {@link KafkaTelemetry#consumerInterceptorConfigProperties} + * to obtain the configuration properties and add them to your consumer configuration. + * + * @see KafkaTelemetry#consumerInterceptorConfigProperties() + */ +public class OpenTelemetryConsumerInterceptor implements ConsumerInterceptor { + + public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER = + "opentelemetry.kafka-telemetry.supplier"; + + @Nullable private KafkaTelemetry telemetry; + private String consumerGroup; + private String clientId; + + @Override + @CanIgnoreReturnValue + public ConsumerRecords onConsume(ConsumerRecords records) { + if (telemetry == null) { + return records; + } + // timer should be started before fetching ConsumerRecords, but there is no callback for that + Timer timer = Timer.start(); + Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer); + if (receiveContext == null) { + receiveContext = Context.current(); + } + KafkaConsumerContext consumerContext = + KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId); + return telemetry.addTracing(records, consumerContext); + } + + @Override + public void onCommit(Map offsets) {} + + @Override + public void close() {} + + @Override + public void configure(Map configs) { + consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null); + clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null); + + Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER); + if (telemetrySupplier == null) { + return; + } + + if (!(telemetrySupplier instanceof Supplier)) { + throw new IllegalStateException( + "Configuration property " + + CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER + + " is not instance of Supplier"); + } + + Object kafkaTelemetry = ((Supplier) telemetrySupplier).get(); + if (!(kafkaTelemetry instanceof KafkaTelemetry)) { + throw new IllegalStateException( + "Configuration property " + + CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER + + " supplier does not return KafkaTelemetry instance"); + } + + this.telemetry = (KafkaTelemetry) kafkaTelemetry; + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/OpenTelemetryProducerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/OpenTelemetryProducerInterceptor.java new file mode 100644 index 000000000000..242a10eb5693 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/OpenTelemetryProducerInterceptor.java @@ -0,0 +1,77 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import java.util.Map; +import java.util.Objects; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerInterceptor; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +/** + * A ProducerInterceptor that adds OpenTelemetry instrumentation. Add this interceptor's class name + * or class via ProducerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Producer's properties to + * get it instantiated and used. See more details on ProducerInterceptor usage in its Javadoc. + * + *

To configure the interceptor, use {@link KafkaTelemetry#producerInterceptorConfigProperties} + * to obtain the configuration properties and add them to your producer configuration. + * + * @see KafkaTelemetry#producerInterceptorConfigProperties() + */ +public class OpenTelemetryProducerInterceptor implements ProducerInterceptor { + + public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER = + "opentelemetry.kafka-telemetry.supplier"; + + @Nullable private KafkaTelemetry telemetry; + @Nullable private String clientId; + + @Override + @CanIgnoreReturnValue + public ProducerRecord onSend(ProducerRecord producerRecord) { + if (telemetry != null) { + telemetry.buildAndInjectSpan(producerRecord, clientId); + } + return producerRecord; + } + + @Override + public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {} + + @Override + public void close() {} + + @Override + public void configure(Map configs) { + clientId = Objects.toString(configs.get(ProducerConfig.CLIENT_ID_CONFIG), null); + + Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER); + if (telemetrySupplier == null) { + return; + } + + if (!(telemetrySupplier instanceof Supplier)) { + throw new IllegalStateException( + "Configuration property " + + CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER + + " is not instance of Supplier"); + } + + Object kafkaTelemetry = ((Supplier) telemetrySupplier).get(); + if (!(kafkaTelemetry instanceof KafkaTelemetry)) { + throw new IllegalStateException( + "Configuration property " + + CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER + + " supplier does not return KafkaTelemetry instance"); + } + + this.telemetry = (KafkaTelemetry) kafkaTelemetry; + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java index 8f60b07e6a88..8f9d060ff968 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java @@ -26,7 +26,10 @@ * A ConsumerInterceptor that adds tracing capability. Add this interceptor's class name or class * via ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Consumer's properties to get it * instantiated and used. See more details on ConsumerInterceptor usage in its Javadoc. + * + * @deprecated Use {@link OpenTelemetryConsumerInterceptor} instead. */ +@Deprecated public class TracingConsumerInterceptor implements ConsumerInterceptor { private static final KafkaTelemetry telemetry = diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java index 833b0785d181..ea631f78054d 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java @@ -22,7 +22,10 @@ * A ProducerInterceptor that adds tracing capability. Add this interceptor's class name or class * via ProducerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Producer's properties to get it * instantiated and used. See more details on ProducerInterceptor usage in its Javadoc. + * + * @deprecated Use {@link OpenTelemetryProducerInterceptor} instead. */ +@Deprecated public class TracingProducerInterceptor implements ProducerInterceptor { private static final KafkaTelemetry telemetry = diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java index 108b80635425..2c3f87b2acd1 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java @@ -13,10 +13,8 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -28,19 +26,20 @@ abstract class AbstractInterceptorsTest extends KafkaClientBaseTest { static final String greeting = "Hello Kafka!"; + private static final KafkaTelemetry kafkaTelemetry = + KafkaTelemetry.create(testing.getOpenTelemetry()); + @Override public Map producerProps() { Map props = super.producerProps(); - props.put( - ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + props.putAll(kafkaTelemetry.producerInterceptorConfigProperties()); return props; } @Override public Map consumerProps() { Map props = super.consumerProps(); - props.put( - ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); + props.putAll(kafkaTelemetry.consumerInterceptorConfigProperties()); return props; } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryInterceptorTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryInterceptorTest.java new file mode 100644 index 000000000000..82c4292bf70d --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryInterceptorTest.java @@ -0,0 +1,175 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaTelemetrySupplier; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class KafkaTelemetryInterceptorTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + private static Map producerConfig() { + Map config = new HashMap<>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + config.putAll( + KafkaTelemetry.create(testing.getOpenTelemetry()).producerInterceptorConfigProperties()); + return config; + } + + private static Map consumerConfig() { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + config.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + config.putAll( + KafkaTelemetry.create(testing.getOpenTelemetry()).consumerInterceptorConfigProperties()); + return config; + } + + @Test + void badProducerConfig() { + Assumptions.assumeFalse(Boolean.getBoolean("testLatestDeps")); + + // Bad config - wrong type for supplier + assertThatThrownBy( + () -> { + Map producerConfig = producerConfig(); + producerConfig.put( + OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER, "foo"); + new KafkaProducer<>(producerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage( + "Configuration property opentelemetry.kafka-telemetry.supplier is not instance of Supplier"); + + // Bad config - supplier returns wrong type + assertThatThrownBy( + () -> { + Map producerConfig = producerConfig(); + producerConfig.put( + OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER, + (Supplier) () -> "not a KafkaTelemetry"); + new KafkaProducer<>(producerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage( + "Configuration property opentelemetry.kafka-telemetry.supplier supplier does not return KafkaTelemetry instance"); + } + + @Test + void badConsumerConfig() { + Assumptions.assumeFalse(Boolean.getBoolean("testLatestDeps")); + + // Bad config - wrong type for supplier + assertThatThrownBy( + () -> { + Map consumerConfig = consumerConfig(); + consumerConfig.put( + OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER, "foo"); + new KafkaConsumer<>(consumerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage( + "Configuration property opentelemetry.kafka-telemetry.supplier is not instance of Supplier"); + + // Bad config - supplier returns wrong type + assertThatThrownBy( + () -> { + Map consumerConfig = consumerConfig(); + consumerConfig.put( + OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER, + (Supplier) () -> "not a KafkaTelemetry"); + new KafkaConsumer<>(consumerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage( + "Configuration property opentelemetry.kafka-telemetry.supplier supplier does not return KafkaTelemetry instance"); + } + + @Test + void serializableConfig() throws IOException, ClassNotFoundException { + testSerialize(producerConfig()); + testSerialize(consumerConfig()); + } + + @SuppressWarnings("unchecked") + private static void testSerialize(Map map) + throws IOException, ClassNotFoundException { + // Check that producer config has the supplier + Object producerSupplier = + map.get(OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER); + Object consumerSupplier = + map.get(OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER); + + KafkaTelemetrySupplier supplier = null; + if (producerSupplier instanceof KafkaTelemetrySupplier) { + supplier = (KafkaTelemetrySupplier) producerSupplier; + } else if (consumerSupplier instanceof KafkaTelemetrySupplier) { + supplier = (KafkaTelemetrySupplier) consumerSupplier; + } + + assertThat(supplier).isNotNull(); + assertThat(supplier.get()).isNotNull(); + + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + try (ObjectOutputStream outputStream = new ObjectOutputStream(byteOutputStream)) { + outputStream.writeObject(map); + } + + class CustomObjectInputStream extends ObjectInputStream { + CustomObjectInputStream(InputStream inputStream) throws IOException { + super(inputStream); + } + + @Override + protected Class resolveClass(ObjectStreamClass desc) + throws IOException, ClassNotFoundException { + if (desc.getName().startsWith("io.opentelemetry.")) { + throw new IllegalStateException( + "Serial form contains opentelemetry class " + desc.getName()); + } + return super.resolveClass(desc); + } + } + + try (ObjectInputStream inputStream = + new CustomObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) { + Map result = (Map) inputStream.readObject(); + assertThat(result.get(OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER)) + .isNull(); + assertThat(result.get(OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER)) + .isNull(); + } + } +}