diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md index 2d4019de6fd1..b22e6d9fcf85 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md @@ -31,20 +31,31 @@ There are two options for capturing traces, either using interceptors or wrappin #### Using interceptors -The Kafka clients API provides a way to "intercept" messages before they are sent to the brokers as well as messages received from the broker before being passed to the application. -The OpenTelemetry instrumented Kafka library provides two interceptors to be configured to add tracing information automatically. -The interceptor class has to be set in the properties bag used to create the Kafka client. +The Kafka clients API provides a way to intercept messages before they are sent to the brokers as well as messages received from the broker before being passed to the application. -Use the `TracingProducerInterceptor` for the producer in order to create a "send" span automatically, each time a message is sent. +To intercept messages and emit telemetry for a `KafkaProducer`: ```java -props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); +KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry); + +Map props = new HashMap<>(); +props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); +props.putAll(telemetry.producerInterceptorConfigProperties()); + +Producer producer = new KafkaProducer<>(props); ``` -Use the `TracingConsumerInterceptor` for the consumer in order to create a "receive" span automatically, each time a message is received. +To intercept messages and emit telemetry for a `KafkaConsumer`: ```java -props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); +KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry); + +Map props = new HashMap<>(); +props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); +props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); +props.putAll(telemetry.consumerInterceptorConfigProperties()); + +Consumer consumer = new KafkaConsumer<>(props); ``` #### Wrapping clients diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts index b23528d55646..5fe805b69792 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts @@ -22,27 +22,33 @@ tasks { systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) } - val testReceiveSpansDisabled by registering(Test::class) { + test { + filter { + excludeTestsMatching("*Deprecated*") + } + } + + val testDeprecated by registering(Test::class) { testClassesDirs = sourceSets.test.get().output.classesDirs classpath = sourceSets.test.get().runtimeClasspath filter { - includeTestsMatching("InterceptorsSuppressReceiveSpansTest") - includeTestsMatching("WrapperSuppressReceiveSpansTest") + includeTestsMatching("*DeprecatedInterceptorsTest") } - include("**/InterceptorsSuppressReceiveSpansTest.*", "**/WrapperSuppressReceiveSpansTest.*") + forkEvery = 1 // to avoid system properties polluting other tests + systemProperty("otel.instrumentation.messaging.experimental.receive-telemetry.enabled", "true") + systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "Test-Message-Header") } - test { + val testDeprecatedSuppressReceiveSpans by registering(Test::class) { + testClassesDirs = sourceSets.test.get().output.classesDirs + classpath = sourceSets.test.get().runtimeClasspath filter { - excludeTestsMatching("InterceptorsSuppressReceiveSpansTest") - excludeTestsMatching("WrapperSuppressReceiveSpansTest") + includeTestsMatching("*DeprecatedInterceptorsSuppressReceiveSpansTest") } - jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") - systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "Test-Message-Header") } check { - dependsOn(testReceiveSpansDisabled) + dependsOn(testDeprecated, testDeprecatedSuppressReceiveSpans) } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java index d66003f6607f..8378c4616ffa 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java @@ -5,61 +5,47 @@ package io.opentelemetry.instrumentation.kafkaclients.v2_6; -import static java.util.logging.Level.WARNING; - import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.context.propagation.TextMapPropagator; -import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil; -import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaHeadersSetter; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest; -import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.MetricsReporterList; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetryMetricsReporter; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier; -import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingList; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaConsumerTelemetry; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaConsumerTelemetrySupplier; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaProducerTelemetry; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaProducerTelemetrySupplier; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.OpenTelemetryConsumerInterceptor; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.OpenTelemetryProducerInterceptor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Proxy; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.Future; -import java.util.function.BiFunction; -import java.util.logging.Logger; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.metrics.MetricsReporter; public final class KafkaTelemetry { - private static final Logger logger = Logger.getLogger(KafkaTelemetry.class.getName()); - - private static final TextMapSetter SETTER = KafkaHeadersSetter.INSTANCE; private final OpenTelemetry openTelemetry; - private final Instrumenter producerInstrumenter; - private final Instrumenter consumerReceiveInstrumenter; - private final Instrumenter consumerProcessInstrumenter; - private final boolean producerPropagationEnabled; + private final KafkaProducerTelemetry producerTelemetry; + private final KafkaConsumerTelemetry consumerTelemetry; KafkaTelemetry( OpenTelemetry openTelemetry, @@ -68,10 +54,13 @@ public final class KafkaTelemetry { Instrumenter consumerProcessInstrumenter, boolean producerPropagationEnabled) { this.openTelemetry = openTelemetry; - this.producerInstrumenter = producerInstrumenter; - this.consumerReceiveInstrumenter = consumerReceiveInstrumenter; - this.consumerProcessInstrumenter = consumerProcessInstrumenter; - this.producerPropagationEnabled = producerPropagationEnabled; + this.producerTelemetry = + new KafkaProducerTelemetry( + openTelemetry.getPropagators().getTextMapPropagator(), + producerInstrumenter, + producerPropagationEnabled); + this.consumerTelemetry = + new KafkaConsumerTelemetry(consumerReceiveInstrumenter, consumerProcessInstrumenter); } /** Returns a new {@link KafkaTelemetry} configured with the given {@link OpenTelemetry}. */ @@ -86,8 +75,14 @@ public static KafkaTelemetryBuilder builder(OpenTelemetry openTelemetry) { return new KafkaTelemetryBuilder(openTelemetry); } - private TextMapPropagator propagator() { - return openTelemetry.getPropagators().getTextMapPropagator(); + // this method can be removed when the deprecated TracingProducerInterceptor is removed + KafkaProducerTelemetry getProducerTelemetry() { + return producerTelemetry; + } + + // this method can be removed when the deprecated TracingProducerInterceptor is removed + KafkaConsumerTelemetry getConsumerTelemetry() { + return consumerTelemetry; } /** Returns a decorated {@link Producer} that emits spans for each sent message. */ @@ -109,7 +104,8 @@ public Producer wrap(Producer producer) { && method.getParameterTypes()[1] == Callback.class ? (Callback) args[1] : null; - return buildAndInjectSpan(record, producer, callback, producer::send); + return producerTelemetry.buildAndInjectSpan( + record, producer, callback, producer::send); } try { return method.invoke(producer, args); @@ -138,35 +134,19 @@ public Consumer wrap(Consumer consumer) { // ConsumerRecords poll(Duration duration) if ("poll".equals(method.getName()) && result instanceof ConsumerRecords) { ConsumerRecords consumerRecords = (ConsumerRecords) result; - Context receiveContext = buildAndFinishSpan(consumerRecords, consumer, timer); + Context receiveContext = + consumerTelemetry.buildAndFinishSpan(consumerRecords, consumer, timer); if (receiveContext == null) { receiveContext = Context.current(); } KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.create(receiveContext, consumer); - result = addTracing(consumerRecords, consumerContext); + result = consumerTelemetry.addTracing(consumerRecords, consumerContext); } return result; }); } - ConsumerRecords addTracing( - ConsumerRecords consumerRecords, KafkaConsumerContext consumerContext) { - if (consumerRecords.isEmpty()) { - return consumerRecords; - } - - Map>> records = new LinkedHashMap<>(); - for (TopicPartition partition : consumerRecords.partitions()) { - List> list = consumerRecords.records(partition); - if (list != null && !list.isEmpty()) { - list = TracingList.wrap(list, consumerProcessInstrumenter, () -> true, consumerContext); - } - records.put(partition, list); - } - return new ConsumerRecords<>(records); - } - /** * Produces a set of kafka client config properties (consumer or producer) to register a {@link * MetricsReporter} that records metrics to an {@code openTelemetry} instance. Add these resulting @@ -208,113 +188,58 @@ ConsumerRecords addTracing( } /** - * Build and inject span into record. + * Returns configuration properties that can be used to enable OpenTelemetry instrumentation via + * {@code OpenTelemetryProducerInterceptor}. Add these resulting properties to the configuration + * map used to initialize a {@link org.apache.kafka.clients.producer.KafkaProducer}. + * + *

Example usage: + * + *

{@code
+   * //    KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
+   * //    Map config = new HashMap<>();
+   * //    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
+   * //    config.putAll(telemetry.producerInterceptorConfigProperties());
+   * //    try (KafkaProducer producer = new KafkaProducer<>(config)) { ... }
+   * }
* - * @param record the producer record to inject span info. + * @return the kafka producer interceptor config properties */ - void buildAndInjectSpan(ProducerRecord record, String clientId) { - Context parentContext = Context.current(); - - KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId); - if (!producerInstrumenter.shouldStart(parentContext, request)) { - return; - } - - Context context = producerInstrumenter.start(parentContext, request); - if (producerPropagationEnabled) { - try { - propagator().inject(context, record.headers(), SETTER); - } catch (Throwable t) { - // it can happen if headers are read only (when record is sent second time) - logger.log(WARNING, "failed to inject span context. sending record second time?", t); - } - } - producerInstrumenter.end(context, request, null, null); + public Map producerInterceptorConfigProperties() { + Map config = new HashMap<>(); + config.put( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, + OpenTelemetryProducerInterceptor.class.getName()); + config.put( + OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER, + new KafkaProducerTelemetrySupplier(producerTelemetry)); + return Collections.unmodifiableMap(config); } /** - * Build and inject span into record. + * Returns configuration properties that can be used to enable OpenTelemetry instrumentation via + * {@code OpenTelemetryConsumerInterceptor}. Add these resulting properties to the configuration + * map used to initialize a {@link org.apache.kafka.clients.consumer.KafkaConsumer}. + * + *

Example usage: + * + *

{@code
+   * //    KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
+   * //    Map config = new HashMap<>();
+   * //    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
+   * //    config.putAll(telemetry.consumerInterceptorConfigProperties());
+   * //    try (KafkaConsumer consumer = new KafkaConsumer<>(config)) { ... }
+   * }
* - * @param record the producer record to inject span info. - * @param callback the producer send callback - * @return send function's result + * @return the kafka consumer interceptor config properties */ - Future buildAndInjectSpan( - ProducerRecord record, - Producer producer, - Callback callback, - BiFunction, Callback, Future> sendFn) { - Context parentContext = Context.current(); - - KafkaProducerRequest request = KafkaProducerRequest.create(record, producer); - if (!producerInstrumenter.shouldStart(parentContext, request)) { - return sendFn.apply(record, callback); - } - - Context context = producerInstrumenter.start(parentContext, request); - propagator().inject(context, record.headers(), SETTER); - - try (Scope ignored = context.makeCurrent()) { - return sendFn.apply(record, new ProducerCallback(callback, parentContext, context, request)); - } - } - - private Context buildAndFinishSpan( - ConsumerRecords records, Consumer consumer, Timer timer) { - return buildAndFinishSpan( - records, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer), timer); - } - - Context buildAndFinishSpan( - ConsumerRecords records, String consumerGroup, String clientId, Timer timer) { - if (records.isEmpty()) { - return null; - } - Context parentContext = Context.current(); - KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumerGroup, clientId); - Context context = null; - if (consumerReceiveInstrumenter.shouldStart(parentContext, request)) { - context = - InstrumenterUtil.startAndEnd( - consumerReceiveInstrumenter, - parentContext, - request, - null, - null, - timer.startTime(), - timer.now()); - } - - // we're returning the context of the receive span so that process spans can use it as - // parent context even though the span has ended - // this is the suggested behavior according to the spec batch receive scenario: - // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#batch-receiving - return context; - } - - private class ProducerCallback implements Callback { - private final Callback callback; - private final Context parentContext; - private final Context context; - private final KafkaProducerRequest request; - - ProducerCallback( - Callback callback, Context parentContext, Context context, KafkaProducerRequest request) { - this.callback = callback; - this.parentContext = parentContext; - this.context = context; - this.request = request; - } - - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - producerInstrumenter.end(context, request, metadata, exception); - - if (callback != null) { - try (Scope ignored = parentContext.makeCurrent()) { - callback.onCompletion(metadata, exception); - } - } - } + public Map consumerInterceptorConfigProperties() { + Map config = new HashMap<>(); + config.put( + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + OpenTelemetryConsumerInterceptor.class.getName()); + config.put( + OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER, + new KafkaConsumerTelemetrySupplier(consumerTelemetry)); + return Collections.unmodifiableMap(config); } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java index 8f60b07e6a88..96c040074088 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java @@ -26,7 +26,10 @@ * A ConsumerInterceptor that adds tracing capability. Add this interceptor's class name or class * via ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Consumer's properties to get it * instantiated and used. See more details on ConsumerInterceptor usage in its Javadoc. + * + * @deprecated Use {@link KafkaTelemetry#consumerInterceptorConfigProperties()} instead. */ +@Deprecated public class TracingConsumerInterceptor implements ConsumerInterceptor { private static final KafkaTelemetry telemetry = @@ -47,13 +50,16 @@ public class TracingConsumerInterceptor implements ConsumerInterceptor onConsume(ConsumerRecords records) { // timer should be started before fetching ConsumerRecords, but there is no callback for that Timer timer = Timer.start(); - Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer); + Context receiveContext = + telemetry + .getConsumerTelemetry() + .buildAndFinishSpan(records, consumerGroup, clientId, timer); if (receiveContext == null) { receiveContext = Context.current(); } KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId); - return telemetry.addTracing(records, consumerContext); + return telemetry.getConsumerTelemetry().addTracing(records, consumerContext); } @Override diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java index 833b0785d181..007e1df51d4b 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java @@ -22,7 +22,10 @@ * A ProducerInterceptor that adds tracing capability. Add this interceptor's class name or class * via ProducerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Producer's properties to get it * instantiated and used. See more details on ProducerInterceptor usage in its Javadoc. + * + * @deprecated Use {@link KafkaTelemetry#producerInterceptorConfigProperties()} instead. */ +@Deprecated public class TracingProducerInterceptor implements ProducerInterceptor { private static final KafkaTelemetry telemetry = @@ -37,7 +40,7 @@ public class TracingProducerInterceptor implements ProducerInterceptor onSend(ProducerRecord producerRecord) { - telemetry.buildAndInjectSpan(producerRecord, clientId); + telemetry.getProducerTelemetry().buildAndInjectSpan(producerRecord, clientId); return producerRecord; } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaConsumerTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaConsumerTelemetry.java new file mode 100644 index 000000000000..97debc510193 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaConsumerTelemetry.java @@ -0,0 +1,92 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal; + +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; +import io.opentelemetry.instrumentation.api.internal.Timer; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; + +/** + * Helper for consumer-side instrumentation. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class KafkaConsumerTelemetry { + + private final Instrumenter consumerReceiveInstrumenter; + private final Instrumenter consumerProcessInstrumenter; + + public KafkaConsumerTelemetry( + Instrumenter consumerReceiveInstrumenter, + Instrumenter consumerProcessInstrumenter) { + this.consumerReceiveInstrumenter = consumerReceiveInstrumenter; + this.consumerProcessInstrumenter = consumerProcessInstrumenter; + } + + public ConsumerRecords addTracing( + ConsumerRecords consumerRecords, KafkaConsumerContext consumerContext) { + if (consumerRecords.isEmpty()) { + return consumerRecords; + } + + Map>> records = new LinkedHashMap<>(); + for (TopicPartition partition : consumerRecords.partitions()) { + List> list = consumerRecords.records(partition); + if (list != null && !list.isEmpty()) { + list = TracingList.wrap(list, consumerProcessInstrumenter, () -> true, consumerContext); + } + records.put(partition, list); + } + return new ConsumerRecords<>(records); + } + + public Context buildAndFinishSpan( + ConsumerRecords records, Consumer consumer, Timer timer) { + return buildAndFinishSpan( + records, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer), timer); + } + + public Context buildAndFinishSpan( + ConsumerRecords records, String consumerGroup, String clientId, Timer timer) { + if (records.isEmpty()) { + return null; + } + Context parentContext = Context.current(); + KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumerGroup, clientId); + Context context = null; + if (consumerReceiveInstrumenter.shouldStart(parentContext, request)) { + context = + InstrumenterUtil.startAndEnd( + consumerReceiveInstrumenter, + parentContext, + request, + null, + null, + timer.startTime(), + timer.now()); + } + + // we're returning the context of the receive span so that process spans can use it as + // parent context even though the span has ended + // this is the suggested behavior according to the spec batch receive scenario: + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#batch-receiving + return context; + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaConsumerTelemetrySupplier.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaConsumerTelemetrySupplier.java new file mode 100644 index 000000000000..4613e202ad43 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaConsumerTelemetrySupplier.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Wrapper for KafkaConsumerTelemetry that can be injected into kafka configuration without breaking + * serialization. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class KafkaConsumerTelemetrySupplier + implements Supplier, Serializable { + + private static final long serialVersionUID = 1L; + + private final KafkaConsumerTelemetry consumerTelemetry; + + public KafkaConsumerTelemetrySupplier(KafkaConsumerTelemetry consumerTelemetry) { + this.consumerTelemetry = Objects.requireNonNull(consumerTelemetry); + } + + @Override + public KafkaConsumerTelemetry get() { + return consumerTelemetry; + } + + private Object writeReplace() { + // serialize this object to null + return null; + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java new file mode 100644 index 000000000000..acd7099125f0 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java @@ -0,0 +1,128 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal; + +import static java.util.logging.Level.WARNING; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.context.propagation.TextMapSetter; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaHeadersSetter; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest; +import java.util.concurrent.Future; +import java.util.function.BiFunction; +import java.util.logging.Logger; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Headers; + +/** + * Helper for producer-side instrumentation. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class KafkaProducerTelemetry { + private static final Logger logger = Logger.getLogger(KafkaProducerTelemetry.class.getName()); + + private static final TextMapSetter SETTER = KafkaHeadersSetter.INSTANCE; + + private final TextMapPropagator propagator; + private final Instrumenter producerInstrumenter; + private final boolean producerPropagationEnabled; + + public KafkaProducerTelemetry( + TextMapPropagator propagator, + Instrumenter producerInstrumenter, + boolean producerPropagationEnabled) { + this.propagator = propagator; + this.producerInstrumenter = producerInstrumenter; + this.producerPropagationEnabled = producerPropagationEnabled; + } + + /** + * Build and inject span into record. + * + * @param record the producer record to inject span info. + */ + public void buildAndInjectSpan(ProducerRecord record, String clientId) { + Context parentContext = Context.current(); + + KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId); + if (!producerInstrumenter.shouldStart(parentContext, request)) { + return; + } + + Context context = producerInstrumenter.start(parentContext, request); + if (producerPropagationEnabled) { + try { + propagator.inject(context, record.headers(), SETTER); + } catch (Throwable t) { + // it can happen if headers are read only (when record is sent second time) + logger.log(WARNING, "failed to inject span context. sending record second time?", t); + } + } + producerInstrumenter.end(context, request, null, null); + } + + /** + * Build and inject span into record. + * + * @param record the producer record to inject span info. + * @param callback the producer send callback + * @return send function's result + */ + @SuppressWarnings("FutureReturnValueIgnored") + public Future buildAndInjectSpan( + ProducerRecord record, + Producer producer, + Callback callback, + BiFunction, Callback, Future> sendFn) { + Context parentContext = Context.current(); + + KafkaProducerRequest request = KafkaProducerRequest.create(record, producer); + if (!producerInstrumenter.shouldStart(parentContext, request)) { + return sendFn.apply(record, callback); + } + + Context context = producerInstrumenter.start(parentContext, request); + propagator.inject(context, record.headers(), SETTER); + + try (Scope ignored = context.makeCurrent()) { + return sendFn.apply(record, new ProducerCallback(callback, parentContext, context, request)); + } + } + + private class ProducerCallback implements Callback { + private final Callback callback; + private final Context parentContext; + private final Context context; + private final KafkaProducerRequest request; + + ProducerCallback( + Callback callback, Context parentContext, Context context, KafkaProducerRequest request) { + this.callback = callback; + this.parentContext = parentContext; + this.context = context; + this.request = request; + } + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + producerInstrumenter.end(context, request, metadata, exception); + + if (callback != null) { + try (Scope ignored = parentContext.makeCurrent()) { + callback.onCompletion(metadata, exception); + } + } + } + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetrySupplier.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetrySupplier.java new file mode 100644 index 000000000000..87afe340c9a9 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetrySupplier.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Wrapper for KafkaProducerTelemetry that can be injected into kafka configuration without breaking + * serialization. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class KafkaProducerTelemetrySupplier + implements Supplier, Serializable { + + private static final long serialVersionUID = 1L; + + private final KafkaProducerTelemetry producerTelemetry; + + public KafkaProducerTelemetrySupplier(KafkaProducerTelemetry producerTelemetry) { + this.producerTelemetry = Objects.requireNonNull(producerTelemetry); + } + + @Override + public KafkaProducerTelemetry get() { + return producerTelemetry; + } + + private Object writeReplace() { + // serialize this object to null + return null; + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryConsumerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryConsumerInterceptor.java new file mode 100644 index 000000000000..21b5d3ef8999 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryConsumerInterceptor.java @@ -0,0 +1,86 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.internal.Timer; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerInterceptor; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +/** + * A ConsumerInterceptor that adds OpenTelemetry instrumentation. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class OpenTelemetryConsumerInterceptor implements ConsumerInterceptor { + + public static final String CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER = + "opentelemetry.kafka-consumer-telemetry.supplier"; + + @Nullable private KafkaConsumerTelemetry consumerTelemetry; + private String consumerGroup; + private String clientId; + + @Override + @CanIgnoreReturnValue + public ConsumerRecords onConsume(ConsumerRecords records) { + if (consumerTelemetry == null) { + return records; + } + // timer should be started before fetching ConsumerRecords, but there is no callback for that + Timer timer = Timer.start(); + Context receiveContext = + consumerTelemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer); + if (receiveContext == null) { + receiveContext = Context.current(); + } + KafkaConsumerContext consumerContext = + KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId); + return consumerTelemetry.addTracing(records, consumerContext); + } + + @Override + public void onCommit(Map offsets) {} + + @Override + public void close() {} + + @Override + public void configure(Map configs) { + consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null); + clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null); + + KafkaConsumerTelemetrySupplier supplier = + getProperty( + configs, + CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER, + KafkaConsumerTelemetrySupplier.class); + this.consumerTelemetry = supplier.get(); + } + + @SuppressWarnings("unchecked") + private static T getProperty(Map configs, String key, Class requiredType) { + Object value = configs.get(key); + if (value == null) { + throw new IllegalStateException("Missing required configuration property: " + key); + } + if (!requiredType.isInstance(value)) { + throw new IllegalStateException( + "Configuration property " + key + " is not instance of " + requiredType.getSimpleName()); + } + return (T) value; + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptor.java new file mode 100644 index 000000000000..4cb65adf70a3 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptor.java @@ -0,0 +1,70 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerInterceptor; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +/** + * A ProducerInterceptor that adds OpenTelemetry instrumentation. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class OpenTelemetryProducerInterceptor implements ProducerInterceptor { + + public static final String CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER = + "opentelemetry.kafka-producer-telemetry.supplier"; + + @Nullable private KafkaProducerTelemetry producerTelemetry; + @Nullable private String clientId; + + @Override + @CanIgnoreReturnValue + public ProducerRecord onSend(ProducerRecord producerRecord) { + if (producerTelemetry != null) { + producerTelemetry.buildAndInjectSpan(producerRecord, clientId); + } + return producerRecord; + } + + @Override + public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {} + + @Override + public void close() {} + + @Override + public void configure(Map configs) { + clientId = Objects.toString(configs.get(ProducerConfig.CLIENT_ID_CONFIG), null); + + KafkaProducerTelemetrySupplier supplier = + getProperty( + configs, + CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER, + KafkaProducerTelemetrySupplier.class); + this.producerTelemetry = supplier.get(); + } + + @SuppressWarnings("unchecked") + private static T getProperty(Map configs, String key, Class requiredType) { + Object value = configs.get(key); + if (value == null) { + throw new IllegalStateException("Missing required configuration property: " + key); + } + if (!requiredType.isInstance(value)) { + throw new IllegalStateException( + "Configuration property " + key + " is not instance of " + requiredType.getSimpleName()); + } + return (T) value; + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractDeprecatedInterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractDeprecatedInterceptorsTest.java new file mode 100644 index 000000000000..81a161ab6541 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractDeprecatedInterceptorsTest.java @@ -0,0 +1,92 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaClientBaseTest; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +abstract class AbstractDeprecatedInterceptorsTest extends KafkaClientBaseTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + static final String greeting = "Hello Kafka!"; + + @SuppressWarnings("deprecation") // testing deprecated interceptors + @Override + public Map producerProps() { + Map props = super.producerProps(); + props.put( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + return props; + } + + @SuppressWarnings("deprecation") // testing deprecated interceptors + @Override + public Map consumerProps() { + Map props = super.consumerProps(); + props.put( + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); + return props; + } + + @Test + void testInterceptors() throws InterruptedException { + testing.runWithSpan( + "parent", + () -> { + ProducerRecord producerRecord = + new ProducerRecord<>(SHARED_TOPIC, greeting); + producerRecord + .headers() + // add header to test capturing header value as span attribute + .add("Test-Message-Header", "test".getBytes(StandardCharsets.UTF_8)) + // adding baggage header in w3c baggage format + .add( + "baggage", + "test-baggage-key-1=test-baggage-value-1".getBytes(StandardCharsets.UTF_8)) + .add( + "baggage", + "test-baggage-key-2=test-baggage-value-2".getBytes(StandardCharsets.UTF_8)); + producer.send( + producerRecord, + (meta, ex) -> { + if (ex == null) { + testing.runWithSpan("producer callback", () -> {}); + } else { + testing.runWithSpan("producer exception: " + ex, () -> {}); + } + }); + }); + + awaitUntilConsumerIsReady(); + // check that the message was received + ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); + assertThat(records.count()).isEqualTo(1); + for (ConsumerRecord record : records) { + assertThat(record.value()).isEqualTo(greeting); + assertThat(record.key()).isNull(); + testing.runWithSpan("process child", () -> {}); + } + + assertTraces(); + } + + abstract void assertTraces(); +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java index 108b80635425..402fada6c6ca 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java @@ -13,10 +13,8 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -28,19 +26,19 @@ abstract class AbstractInterceptorsTest extends KafkaClientBaseTest { static final String greeting = "Hello Kafka!"; + protected abstract KafkaTelemetry kafkaTelemetry(); + @Override public Map producerProps() { Map props = super.producerProps(); - props.put( - ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + props.putAll(kafkaTelemetry().producerInterceptorConfigProperties()); return props; } @Override public Map consumerProps() { Map props = super.consumerProps(); - props.put( - ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); + props.putAll(kafkaTelemetry().consumerInterceptorConfigProperties()); return props; } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/DeprecatedInterceptorsSuppressReceiveSpansTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/DeprecatedInterceptorsSuppressReceiveSpansTest.java new file mode 100644 index 000000000000..93554d5c7f5d --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/DeprecatedInterceptorsSuppressReceiveSpansTest.java @@ -0,0 +1,81 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import java.nio.charset.StandardCharsets; +import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.AbstractStringAssert; + +class DeprecatedInterceptorsSuppressReceiveSpansTest extends AbstractDeprecatedInterceptorsTest { + + @SuppressWarnings("deprecation") // using deprecated semconv + @Override + void assertTraces() { + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(SHARED_TOPIC + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_SYSTEM, "kafka"), + equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), + equalTo(MESSAGING_OPERATION, "publish"), + satisfies( + MESSAGING_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer"))), + span -> + span.hasName(SHARED_TOPIC + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_SYSTEM, "kafka"), + equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), + equalTo(MESSAGING_OPERATION, "process"), + equalTo( + MESSAGING_MESSAGE_BODY_SIZE, + greeting.getBytes(StandardCharsets.UTF_8).length), + satisfies( + MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), + satisfies( + MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), + equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"), + satisfies( + MESSAGING_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + equalTo( + AttributeKey.stringKey("test-baggage-key-1"), + "test-baggage-value-1"), + equalTo( + AttributeKey.stringKey("test-baggage-key-2"), + "test-baggage-value-2")), + span -> + span.hasName("process child") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2))), + // ideally we'd want producer callback to be part of the main trace, we just aren't able to + // instrument that + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("producer callback").hasKind(SpanKind.INTERNAL).hasNoParent())); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/DeprecatedInterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/DeprecatedInterceptorsTest.java new file mode 100644 index 000000000000..803a61f5b23d --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/DeprecatedInterceptorsTest.java @@ -0,0 +1,118 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6; + +import static io.opentelemetry.api.common.AttributeKey.stringArrayKey; +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.trace.data.LinkData; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicReference; +import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.AbstractStringAssert; + +class DeprecatedInterceptorsTest extends AbstractDeprecatedInterceptorsTest { + + @SuppressWarnings("deprecation") // using deprecated semconv + @Override + void assertTraces() { + AtomicReference producerSpanContext = new AtomicReference<>(); + testing.waitAndAssertSortedTraces( + orderByRootSpanName("parent", SHARED_TOPIC + " receive", "producer callback"), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(SHARED_TOPIC + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo( + stringArrayKey("messaging.header.Test_Message_Header"), + singletonList("test")), + equalTo(MESSAGING_SYSTEM, "kafka"), + equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), + equalTo(MESSAGING_OPERATION, "publish"), + satisfies( + MESSAGING_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")))); + SpanContext spanContext = trace.getSpan(1).getSpanContext(); + producerSpanContext.set( + SpanContext.createFromRemoteParent( + spanContext.getTraceId(), + spanContext.getSpanId(), + spanContext.getTraceFlags(), + spanContext.getTraceState())); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(SHARED_TOPIC + " receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinksSatisfying(links -> assertThat(links).isEmpty()) + .hasAttributesSatisfyingExactly( + equalTo( + stringArrayKey("messaging.header.Test_Message_Header"), + singletonList("test")), + equalTo(MESSAGING_SYSTEM, "kafka"), + equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), + equalTo(MESSAGING_OPERATION, "receive"), + equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"), + satisfies( + MESSAGING_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1)), + span -> + span.hasName(SHARED_TOPIC + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producerSpanContext.get())) + .hasAttributesSatisfyingExactly( + equalTo( + stringArrayKey("messaging.header.Test_Message_Header"), + singletonList("test")), + equalTo(MESSAGING_SYSTEM, "kafka"), + equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), + equalTo(MESSAGING_OPERATION, "process"), + equalTo( + MESSAGING_MESSAGE_BODY_SIZE, + greeting.getBytes(StandardCharsets.UTF_8).length), + satisfies( + MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), + satisfies( + MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), + equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"), + satisfies( + MESSAGING_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer"))), + span -> + span.hasName("process child") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1))), + // ideally we'd want producer callback to be part of the main trace, we just aren't able to + // instrument that + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("producer callback").hasKind(SpanKind.INTERNAL).hasNoParent())); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java index 049020a2d074..44d1603743b1 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java @@ -23,6 +23,14 @@ class InterceptorsSuppressReceiveSpansTest extends AbstractInterceptorsTest { + private static final KafkaTelemetry kafkaTelemetry = + KafkaTelemetry.create(testing.getOpenTelemetry()); + + @Override + protected KafkaTelemetry kafkaTelemetry() { + return kafkaTelemetry; + } + @SuppressWarnings("deprecation") // using deprecated semconv @Override void assertTraces() { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java index f8003095032d..3e42dce07622 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java @@ -30,6 +30,17 @@ class InterceptorsTest extends AbstractInterceptorsTest { + private static final KafkaTelemetry kafkaTelemetry = + KafkaTelemetry.builder(testing.getOpenTelemetry()) + .setMessagingReceiveInstrumentationEnabled(true) + .setCapturedHeaders(singletonList("Test-Message-Header")) + .build(); + + @Override + protected KafkaTelemetry kafkaTelemetry() { + return kafkaTelemetry; + } + @SuppressWarnings("deprecation") // using deprecated semconv @Override void assertTraces() { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryConsumerInterceptorTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryConsumerInterceptorTest.java new file mode 100644 index 000000000000..ce5e9014c773 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryConsumerInterceptorTest.java @@ -0,0 +1,74 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class OpenTelemetryConsumerInterceptorTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + private static Map consumerConfig() { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + config.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + config.putAll( + KafkaTelemetry.create(testing.getOpenTelemetry()).consumerInterceptorConfigProperties()); + return config; + } + + @Test + void badConfig() { + // Bad config - wrong type for supplier + assertThatThrownBy( + () -> { + Map consumerConfig = consumerConfig(); + consumerConfig.put( + OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER, + "foo"); + new KafkaConsumer<>(consumerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage( + "Configuration property opentelemetry.kafka-consumer-telemetry.supplier is not instance of KafkaConsumerTelemetrySupplier"); + + // Bad config - supplier returns wrong type + assertThatThrownBy( + () -> { + Map consumerConfig = consumerConfig(); + consumerConfig.put( + OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER, + (Supplier) () -> "not a KafkaConsumerTelemetry"); + new KafkaConsumer<>(consumerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage( + "Configuration property opentelemetry.kafka-consumer-telemetry.supplier is not instance of KafkaConsumerTelemetrySupplier"); + } + + @Test + void serializableConfig() throws IOException, ClassNotFoundException { + SerializationTestUtil.testSerialize( + consumerConfig(), + OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryMetricsReporterTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryMetricsReporterTest.java index 68403de9b92b..e92666d90755 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryMetricsReporterTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryMetricsReporterTest.java @@ -5,26 +5,17 @@ package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal; -import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.AbstractOpenTelemetryMetricsReporterTest; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetryMetricsReporter; -import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier; import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.ObjectStreamClass; import java.util.Map; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; -import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -45,8 +36,6 @@ protected InstrumentationExtension testing() { @Test void badConfig() { - Assumptions.assumeFalse(Boolean.getBoolean("testLatestDeps")); - // Bad producer config assertThatThrownBy( () -> { @@ -130,44 +119,9 @@ void badConfig() { @Test void serializableConfig() throws IOException, ClassNotFoundException { - testSerialize(producerConfig()); - testSerialize(consumerConfig()); - } - - @SuppressWarnings("unchecked") - private static void testSerialize(Map map) - throws IOException, ClassNotFoundException { - OpenTelemetrySupplier supplier = - (OpenTelemetrySupplier) - map.get(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER); - assertThat(supplier).isNotNull(); - assertThat(supplier.get()).isNotNull(); - ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); - try (ObjectOutputStream outputStream = new ObjectOutputStream(byteOutputStream)) { - outputStream.writeObject(map); - } - - class CustomObjectInputStream extends ObjectInputStream { - CustomObjectInputStream(InputStream inputStream) throws IOException { - super(inputStream); - } - - @Override - protected Class resolveClass(ObjectStreamClass desc) - throws IOException, ClassNotFoundException { - if (desc.getName().startsWith("io.opentelemetry.")) { - throw new IllegalStateException( - "Serial form contains opentelemetry class " + desc.getName()); - } - return super.resolveClass(desc); - } - } - - try (ObjectInputStream inputStream = - new CustomObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) { - Map result = (Map) inputStream.readObject(); - assertThat(result.get(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER)) - .isNull(); - } + SerializationTestUtil.testSerialize( + producerConfig(), OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER); + SerializationTestUtil.testSerialize( + consumerConfig(), OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER); } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptorTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptorTest.java new file mode 100644 index 000000000000..860007c9cad5 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptorTest.java @@ -0,0 +1,73 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class OpenTelemetryProducerInterceptorTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + private static Map producerConfig() { + Map config = new HashMap<>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + config.putAll( + KafkaTelemetry.create(testing.getOpenTelemetry()).producerInterceptorConfigProperties()); + return config; + } + + @Test + void badConfig() { + // Bad config - wrong type for supplier + assertThatThrownBy( + () -> { + Map producerConfig = producerConfig(); + producerConfig.put( + OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER, + "foo"); + new KafkaProducer<>(producerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage( + "Configuration property opentelemetry.kafka-producer-telemetry.supplier is not instance of KafkaProducerTelemetrySupplier"); + + // Bad config - supplier returns wrong type + assertThatThrownBy( + () -> { + Map producerConfig = producerConfig(); + producerConfig.put( + OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER, + (Supplier) () -> "not a KafkaProducerTelemetry"); + new KafkaProducer<>(producerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage( + "Configuration property opentelemetry.kafka-producer-telemetry.supplier is not instance of KafkaProducerTelemetrySupplier"); + } + + @Test + void serializableConfig() throws IOException, ClassNotFoundException { + SerializationTestUtil.testSerialize( + producerConfig(), + OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/SerializationTestUtil.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/SerializationTestUtil.java new file mode 100644 index 000000000000..f3d5dee71ab9 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/SerializationTestUtil.java @@ -0,0 +1,62 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; +import java.util.Map; + +class SerializationTestUtil { + + /** + * Tests that a configuration map can be serialized and that supplier instance is replaced with + * null during serialization (via writeReplace()). + */ + static void testSerialize(Map map, String supplierKey) + throws IOException, ClassNotFoundException { + + Object supplierValue = map.get(supplierKey); + assertThat(supplierValue).isNotNull(); + + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + try (ObjectOutputStream outputStream = new ObjectOutputStream(byteOutputStream)) { + outputStream.writeObject(map); + } + + class CustomObjectInputStream extends ObjectInputStream { + CustomObjectInputStream(InputStream inputStream) throws IOException { + super(inputStream); + } + + @Override + protected Class resolveClass(ObjectStreamClass desc) + throws IOException, ClassNotFoundException { + if (desc.getName().startsWith("io.opentelemetry.")) { + throw new IllegalStateException( + "Serial form contains opentelemetry class " + desc.getName()); + } + return super.resolveClass(desc); + } + } + + try (ObjectInputStream inputStream = + new CustomObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) { + @SuppressWarnings("unchecked") + Map result = (Map) inputStream.readObject(); + // After deserialization, the supplier should be null (replaced via writeReplace()) + assertThat(result.get(supplierKey)).isNull(); + } + } + + private SerializationTestUtil() {} +}