diff --git a/docs/instrumentation-list.yaml b/docs/instrumentation-list.yaml index 2531a97646a9..442bed7001b2 100644 --- a/docs/instrumentation-list.yaml +++ b/docs/instrumentation-list.yaml @@ -1933,7 +1933,7 @@ libraries: type: boolean default: true - name: otel.instrumentation.kafka.experimental-span-attributes - description: Enables the capture of the experimental consumer attribute "kafka.record.queue_time_ms" + description: Enables the capture of the experimental consumer attributes "kafka.record.queue_time_ms" and "messaging.kafka.bootstrap.servers" type: boolean default: false - name: kafka-clients-2.6 diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java index 75d283930d83..a21bea719e3c 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java @@ -7,6 +7,7 @@ import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.consumerReceiveInstrumenter; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.returns; @@ -18,14 +19,19 @@ import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Properties; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -38,6 +44,10 @@ public ElementMatcher typeMatcher() { @Override public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor().and(takesArgument(0, Map.class).or(takesArgument(0, Properties.class))), + this.getClass().getName() + "$ConstructorAdvice"); + transformer.applyAdviceToMethod( named("poll") .and(isPublic()) @@ -47,6 +57,26 @@ public void transform(TypeTransformer transformer) { this.getClass().getName() + "$PollAdvice"); } + @SuppressWarnings("unused") + public static class ConstructorAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.This Consumer consumer, @Advice.Argument(0) Object configs) { + + Object bootstrapServersConfig = null; + if (configs instanceof Map) { + bootstrapServersConfig = ((Map) configs).get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + } + + if (bootstrapServersConfig != null + && KafkaSingletons.CONSUMER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.get(consumer) == null) { + List bootstrapServers = KafkaUtil.parseBootstrapServers(bootstrapServersConfig); + KafkaSingletons.CONSUMER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.set(consumer, bootstrapServers); + } + } + } + @SuppressWarnings("unused") public static class PollAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java index c3614f95f59b..0136c229c22a 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java @@ -6,6 +6,7 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.producerInstrumenter; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; @@ -15,14 +16,20 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaPropagation; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil; import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.util.List; +import java.util.Map; +import java.util.Properties; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducerInstrumentation implements TypeInstrumentation { @@ -34,6 +41,10 @@ public ElementMatcher typeMatcher() { @Override public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor().and(takesArgument(0, Map.class).or(takesArgument(0, Properties.class))), + this.getClass().getName() + "$ConstructorAdvice"); + transformer.applyAdviceToMethod( isMethod() .and(isPublic()) @@ -43,11 +54,32 @@ public void transform(TypeTransformer transformer) { KafkaProducerInstrumentation.class.getName() + "$SendAdvice"); } + @SuppressWarnings("unused") + public static class ConstructorAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.This Producer producer, @Advice.Argument(0) Object configs) { + + Object bootstrapServersConfig = null; + if (configs instanceof Map) { + bootstrapServersConfig = ((Map) configs).get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); + } + + if (bootstrapServersConfig != null + && KafkaSingletons.PRODUCER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.get(producer) == null) { + List bootstrapServers = KafkaUtil.parseBootstrapServers(bootstrapServersConfig); + KafkaSingletons.PRODUCER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.set(producer, bootstrapServers); + } + } + } + @SuppressWarnings("unused") public static class SendAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static KafkaProducerRequest onEnter( + @Advice.This Producer producer, @Advice.FieldValue("apiVersions") ApiVersions apiVersions, @Advice.FieldValue("clientId") String clientId, @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, @@ -55,7 +87,10 @@ public static KafkaProducerRequest onEnter( @Advice.Local("otelContext") Context context, @Advice.Local("otelScope") Scope scope) { - KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId); + List bootstrapServers = + KafkaSingletons.PRODUCER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.get(producer); + KafkaProducerRequest request = + KafkaProducerRequest.create(record, clientId, bootstrapServers); Context parentContext = Java8BytecodeBridge.currentContext(); if (!producerInstrumenter().shouldStart(parentContext, request)) { return null; diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java index fa181e8dbb24..71cd46a9a9f1 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java @@ -7,12 +7,16 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaInstrumenterFactory; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest; import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; +import java.util.List; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.RecordMetadata; public final class KafkaSingletons { @@ -26,6 +30,11 @@ public final class KafkaSingletons { private static final Instrumenter CONSUMER_RECEIVE_INSTRUMENTER; private static final Instrumenter CONSUMER_PROCESS_INSTRUMENTER; + public static final VirtualField, List> + CONSUMER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD = VirtualField.find(Consumer.class, List.class); + public static final VirtualField, List> + PRODUCER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD = VirtualField.find(Producer.class, List.class); + static { KafkaInstrumenterFactory instrumenterFactory = new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/metadata.yaml b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/metadata.yaml index 7b42919bb1b8..239aaec9e0d8 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/metadata.yaml +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/metadata.yaml @@ -4,6 +4,6 @@ configurations: type: boolean default: true - name: otel.instrumentation.kafka.experimental-span-attributes - description: Enables the capture of the experimental consumer attribute "kafka.record.queue_time_ms" + description: Enables the capture of the experimental consumer attributes "kafka.record.queue_time_ms" and "messaging.kafka.bootstrap.servers" type: boolean default: false diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java index 69b8624c127f..478e730bdc6d 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java @@ -17,6 +17,7 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; @@ -66,6 +67,20 @@ public abstract class KafkaClientBaseTest { protected static final String SHARED_TOPIC = "shared.topic"; protected static final AttributeKey MESSAGING_CLIENT_ID = AttributeKey.stringKey("messaging.client_id"); + protected static final AttributeKey> MESSAGING_KAFKA_BOOTSTRAP_SERVERS = + AttributeKey.stringArrayKey("messaging.kafka.bootstrap.servers"); + + protected static AttributeAssertion bootstrapServersAssertion() { + return satisfies( + MESSAGING_KAFKA_BOOTSTRAP_SERVERS, + listAssert -> { + if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) { + listAssert.isNotEmpty().allSatisfy(server -> assertThat(server).isNotEmpty()); + } else { + listAssert.isNullOrEmpty(); + } + }); + } private KafkaContainer kafka; protected Producer producer; @@ -177,6 +192,7 @@ protected static List sendAttributes( equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative))); @@ -203,6 +219,7 @@ protected static List receiveAttributes(boolean testHeaders) equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "receive"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), satisfies(MESSAGING_BATCH_MESSAGE_COUNT, AbstractLongAssert::isPositive))); // consumer group is not available in version 0.11 @@ -227,6 +244,7 @@ protected static List processAttributes( equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "process"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java index 4f938a545e37..9a40f1615f43 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java @@ -212,10 +212,11 @@ ConsumerRecords addTracing( * * @param record the producer record to inject span info. */ - void buildAndInjectSpan(ProducerRecord record, String clientId) { + void buildAndInjectSpan( + ProducerRecord record, String clientId, List bootstrapServers) { Context parentContext = Context.current(); - KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId); + KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId, bootstrapServers); if (!producerInstrumenter.shouldStart(parentContext, request)) { return; } @@ -262,16 +263,25 @@ Future buildAndInjectSpan( private Context buildAndFinishSpan( ConsumerRecords records, Consumer consumer, Timer timer) { return buildAndFinishSpan( - records, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer), timer); + records, + KafkaUtil.getConsumerGroup(consumer), + KafkaUtil.getClientId(consumer), + KafkaUtil.getBootstrapServers(consumer), + timer); } Context buildAndFinishSpan( - ConsumerRecords records, String consumerGroup, String clientId, Timer timer) { + ConsumerRecords records, + String consumerGroup, + String clientId, + List bootstrapServers, + Timer timer) { if (records.isEmpty()) { return null; } Context parentContext = Context.current(); - KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumerGroup, clientId); + KafkaReceiveRequest request = + KafkaReceiveRequest.create(records, consumerGroup, clientId, bootstrapServers); Context context = null; if (consumerReceiveInstrumenter.shouldStart(parentContext, request)) { context = diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java index 368767c1022a..a188d8350565 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java @@ -12,6 +12,8 @@ import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil; +import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -34,6 +36,7 @@ public class TracingConsumerInterceptor implements ConsumerInterceptor bootstrapServers; private String consumerGroup; private String clientId; @@ -42,12 +45,13 @@ public class TracingConsumerInterceptor implements ConsumerInterceptor onConsume(ConsumerRecords records) { // timer should be started before fetching ConsumerRecords, but there is no callback for that Timer timer = Timer.start(); - Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer); + Context receiveContext = + telemetry.buildAndFinishSpan(records, consumerGroup, clientId, bootstrapServers, timer); if (receiveContext == null) { receiveContext = Context.current(); } KafkaConsumerContext consumerContext = - KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId); + KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId, bootstrapServers); return telemetry.addTracing(records, consumerContext); } @@ -59,6 +63,8 @@ public void close() {} @Override public void configure(Map configs) { + bootstrapServers = + KafkaUtil.parseBootstrapServers(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null); clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java index c64dae0496cb..a3b0b2b7495c 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java @@ -7,6 +7,8 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil; +import java.util.List; import java.util.Map; import java.util.Objects; import javax.annotation.Nullable; @@ -24,12 +26,14 @@ public class TracingProducerInterceptor implements ProducerInterceptor bootstrapServers; + @Nullable private String clientId; @Override @CanIgnoreReturnValue public ProducerRecord onSend(ProducerRecord producerRecord) { - telemetry.buildAndInjectSpan(producerRecord, clientId); + telemetry.buildAndInjectSpan(producerRecord, clientId, bootstrapServers); return producerRecord; } @@ -42,6 +46,8 @@ public void close() {} @Override public void configure(Map map) { clientId = Objects.toString(map.get(ProducerConfig.CLIENT_ID_CONFIG), null); + bootstrapServers = + KafkaUtil.parseBootstrapServers(map.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); // TODO: support experimental attributes config } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractWrapperTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractWrapperTest.java index c52d67d43440..c1969eeb036e 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractWrapperTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractWrapperTest.java @@ -37,6 +37,8 @@ void testWrappers(boolean testHeaders) throws InterruptedException { .setCapturedHeaders(singletonList("test-message-header")) // TODO run tests both with and without experimental span attributes .setCaptureExperimentalSpanAttributes(true); + System.setProperty( + "otel.instrumentation.kafka.experimental-span-attributes", String.valueOf(true)); configure(telemetryBuilder); KafkaTelemetry telemetry = telemetryBuilder.build(); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java index 049020a2d074..c42589308f5d 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java @@ -38,6 +38,7 @@ void assertTraces() { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies( MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer"))), @@ -52,6 +53,7 @@ void assertTraces() { equalTo( MESSAGING_MESSAGE_BODY_SIZE, greeting.getBytes(StandardCharsets.UTF_8).length), + bootstrapServersAssertion(), satisfies( MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java index 8f4f65dc2262..aedbe8c138d6 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java @@ -68,6 +68,7 @@ void assertTraces() { equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "receive"), equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"), + bootstrapServersAssertion(), satisfies( MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), @@ -81,6 +82,7 @@ void assertTraces() { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "process"), + bootstrapServersAssertion(), equalTo( MESSAGING_MESSAGE_BODY_SIZE, greeting.getBytes(StandardCharsets.UTF_8).length), diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java index ab588285c1b3..1094ffbfd9d3 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java @@ -67,6 +67,7 @@ protected static List sendAttributes(boolean testHeaders) { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative))); @@ -89,6 +90,7 @@ private static List processAttributes(String greeting, boole equalTo(MESSAGING_OPERATION, "process"), equalTo( MESSAGING_MESSAGE_BODY_SIZE, greeting.getBytes(StandardCharsets.UTF_8).length), + bootstrapServersAssertion(), satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), satisfies( diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java index 890b68733f2e..4b3c686c9218 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java @@ -90,6 +90,7 @@ protected static List sendAttributes(boolean testHeaders) { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative))); @@ -112,6 +113,7 @@ private static List processAttributes(String greeting, boole equalTo(MESSAGING_OPERATION, "process"), equalTo( MESSAGING_MESSAGE_BODY_SIZE, greeting.getBytes(StandardCharsets.UTF_8).length), + bootstrapServersAssertion(), satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), satisfies( @@ -138,6 +140,7 @@ protected static List receiveAttributes(boolean testHeaders) equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "receive"), equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1))); if (testHeaders) { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/build.gradle.kts b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/build.gradle.kts index ce4c31a87e47..14833163a92b 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/build.gradle.kts +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/build.gradle.kts @@ -4,7 +4,7 @@ plugins { dependencies { compileOnly(project(":muzzle")) - compileOnly("org.apache.kafka:kafka-clients:0.11.0.0") + library("org.apache.kafka:kafka-clients:0.11.0.0") compileOnly("com.google.auto.value:auto-value-annotations") annotationProcessor("com.google.auto.value:auto-value") } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/AbstractKafkaConsumerRequest.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/AbstractKafkaConsumerRequest.java index 3a12d6427263..df41b2ba68b8 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/AbstractKafkaConsumerRequest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/AbstractKafkaConsumerRequest.java @@ -5,14 +5,19 @@ package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal; +import java.util.List; import javax.annotation.Nullable; -abstract class AbstractKafkaConsumerRequest { +abstract class AbstractKafkaConsumerRequest extends AbstractKafkaRequest { @Nullable private final String consumerGroup; @Nullable private final String clientId; - AbstractKafkaConsumerRequest(String consumerGroup, String clientId) { + protected AbstractKafkaConsumerRequest( + @Nullable String consumerGroup, + @Nullable String clientId, + @Nullable List bootstrapServers) { + super(bootstrapServers); this.consumerGroup = consumerGroup; this.clientId = clientId; } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/AbstractKafkaRequest.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/AbstractKafkaRequest.java new file mode 100644 index 000000000000..8d9f924c2b64 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/AbstractKafkaRequest.java @@ -0,0 +1,23 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal; + +import java.util.List; +import javax.annotation.Nullable; + +abstract class AbstractKafkaRequest { + + @Nullable private final List bootstrapServers; + + protected AbstractKafkaRequest(@Nullable List bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + + @Nullable + public List getBootstrapServers() { + return bootstrapServers; + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaBatchProcessSpanLinksExtractor.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaBatchProcessSpanLinksExtractor.java index 394460ef54e1..05657e6d957d 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaBatchProcessSpanLinksExtractor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaBatchProcessSpanLinksExtractor.java @@ -31,7 +31,11 @@ public void extract( singleRecordLinkExtractor.extract( spanLinks, Context.root(), - KafkaProcessRequest.create(record, request.getConsumerGroup(), request.getClientId())); + KafkaProcessRequest.create( + record, + request.getConsumerGroup(), + request.getClientId(), + request.getBootstrapServers())); } } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerContext.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerContext.java index 79a5b25f4cd0..fb3885e11149 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerContext.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerContext.java @@ -7,6 +7,7 @@ import com.google.auto.value.AutoValue; import io.opentelemetry.context.Context; +import java.util.List; import javax.annotation.Nullable; /** @@ -17,8 +18,11 @@ public abstract class KafkaConsumerContext { static KafkaConsumerContext create( - @Nullable Context context, @Nullable String consumerGroup, @Nullable String clientId) { - return new AutoValue_KafkaConsumerContext(context, consumerGroup, clientId); + @Nullable Context context, + @Nullable String consumerGroup, + @Nullable String clientId, + @Nullable List bootstrapServers) { + return new AutoValue_KafkaConsumerContext(context, consumerGroup, clientId, bootstrapServers); } @Nullable @@ -29,4 +33,7 @@ static KafkaConsumerContext create( @Nullable abstract String getClientId(); + + @Nullable + abstract List getBootstrapServers(); } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerContextUtil.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerContextUtil.java index f7b27499c797..3cee9af2e9be 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerContextUtil.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerContextUtil.java @@ -7,6 +7,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.util.VirtualField; +import java.util.List; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -31,40 +32,49 @@ public static KafkaConsumerContext get(ConsumerRecord records) { Context receiveContext = recordContextField.get(records); String consumerGroup = null; String clientId = null; + List bootstrapServers = null; String[] consumerInfo = recordConsumerInfoField.get(records); if (consumerInfo != null) { consumerGroup = consumerInfo[0]; clientId = consumerInfo[1]; + bootstrapServers = KafkaUtil.parseBootstrapServers(consumerInfo[2]); } - return create(receiveContext, consumerGroup, clientId); + return create(receiveContext, consumerGroup, clientId, bootstrapServers); } public static KafkaConsumerContext get(ConsumerRecords records) { Context receiveContext = recordsContextField.get(records); String consumerGroup = null; String clientId = null; + List bootstrapServers = null; String[] consumerInfo = recordsConsumerInfoField.get(records); if (consumerInfo != null) { consumerGroup = consumerInfo[0]; clientId = consumerInfo[1]; + bootstrapServers = KafkaUtil.parseBootstrapServers(consumerInfo[2]); } - return create(receiveContext, consumerGroup, clientId); + return create(receiveContext, consumerGroup, clientId, bootstrapServers); } public static KafkaConsumerContext create(Context context, Consumer consumer) { - return create(context, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer)); + return create( + context, + KafkaUtil.getConsumerGroup(consumer), + KafkaUtil.getClientId(consumer), + KafkaUtil.getBootstrapServers(consumer)); } public static KafkaConsumerContext create( - Context context, String consumerGroup, String clientId) { - return KafkaConsumerContext.create(context, consumerGroup, clientId); + Context context, String consumerGroup, String clientId, List bootstrapServers) { + return KafkaConsumerContext.create(context, consumerGroup, clientId, bootstrapServers); } public static void set(ConsumerRecord record, Context context, Consumer consumer) { recordContextField.set(record, context); + List bootstrapServers = KafkaUtil.getBootstrapServers(consumer); String consumerGroup = KafkaUtil.getConsumerGroup(consumer); String clientId = KafkaUtil.getClientId(consumer); - set(record, context, consumerGroup, clientId); + set(record, context, consumerGroup, clientId, bootstrapServers); } public static void set(ConsumerRecord record, KafkaConsumerContext consumerContext) { @@ -72,25 +82,41 @@ public static void set(ConsumerRecord record, KafkaConsumerContext consume record, consumerContext.getContext(), consumerContext.getConsumerGroup(), - consumerContext.getClientId()); + consumerContext.getClientId(), + consumerContext.getBootstrapServers()); } public static void set( - ConsumerRecord record, Context context, String consumerGroup, String clientId) { + ConsumerRecord record, + Context context, + String consumerGroup, + String clientId, + List bootstrapServers) { recordContextField.set(record, context); - recordConsumerInfoField.set(record, new String[] {consumerGroup, clientId}); + String bootstrapServersString = + bootstrapServers != null ? String.join(",", bootstrapServers) : null; + recordConsumerInfoField.set( + record, new String[] {consumerGroup, clientId, bootstrapServersString}); } public static void set(ConsumerRecords records, Context context, Consumer consumer) { + List bootstrapServers = KafkaUtil.getBootstrapServers(consumer); String consumerGroup = KafkaUtil.getConsumerGroup(consumer); String clientId = KafkaUtil.getClientId(consumer); - set(records, context, consumerGroup, clientId); + set(records, context, consumerGroup, clientId, bootstrapServers); } public static void set( - ConsumerRecords records, Context context, String consumerGroup, String clientId) { + ConsumerRecords records, + Context context, + String consumerGroup, + String clientId, + List bootstrapServers) { recordsContextField.set(records, context); - recordsConsumerInfoField.set(records, new String[] {consumerGroup, clientId}); + String bootstrapServersString = + bootstrapServers != null ? String.join(",", bootstrapServers) : null; + recordsConsumerInfoField.set( + records, new String[] {consumerGroup, clientId, bootstrapServersString}); } public static void copy(ConsumerRecord from, ConsumerRecord to) { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerExperimentalAttributesExtractor.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerExperimentalAttributesExtractor.java index 6de256a23318..85163a68ad82 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerExperimentalAttributesExtractor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerExperimentalAttributesExtractor.java @@ -10,13 +10,12 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import javax.annotation.Nullable; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.record.TimestampType; final class KafkaConsumerExperimentalAttributesExtractor - implements AttributesExtractor { + extends KafkaExperimentalAttributesExtractor { private static final AttributeKey KAFKA_RECORD_QUEUE_TIME_MS = longKey("kafka.record.queue_time_ms"); @@ -25,6 +24,8 @@ final class KafkaConsumerExperimentalAttributesExtractor public void onStart( AttributesBuilder attributes, Context parentContext, KafkaProcessRequest request) { + super.onStart(attributes, parentContext, request); + ConsumerRecord record = request.getRecord(); // don't record a duration if the message was sent from an old Kafka client if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaExperimentalAttributesExtractor.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaExperimentalAttributesExtractor.java new file mode 100644 index 000000000000..7e9e5dcc4ac2 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaExperimentalAttributesExtractor.java @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import java.util.List; +import javax.annotation.Nullable; + +class KafkaExperimentalAttributesExtractor + implements AttributesExtractor { + + private static final AttributeKey> MESSAGING_KAFKA_BOOTSTRAP_SERVERS = + AttributeKey.stringArrayKey("messaging.kafka.bootstrap.servers"); + + @Override + public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST request) { + + List bootstrapServers = request.getBootstrapServers(); + attributes.put(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, bootstrapServers); + } + + @Override + public void onEnd( + AttributesBuilder attributes, + Context context, + REQUEST request, + @Nullable RESPONSE response, + @Nullable Throwable error) {} +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java index 989c2bd3ffe6..bb74e2677857 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java @@ -79,16 +79,20 @@ public Instrumenter createProducerInstrume KafkaProducerAttributesGetter getter = KafkaProducerAttributesGetter.INSTANCE; MessageOperation operation = MessageOperation.PUBLISH; - return Instrumenter.builder( - openTelemetry, - instrumentationName, - MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor( - buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) - .addAttributesExtractors(extractors) - .addAttributesExtractor(new KafkaProducerAttributesExtractor()) - .setErrorCauseExtractor(errorCauseExtractor) - .buildInstrumenter(SpanKindExtractor.alwaysProducer()); + InstrumenterBuilder builder = + Instrumenter.builder( + openTelemetry, + instrumentationName, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor( + buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) + .addAttributesExtractors(extractors) + .addAttributesExtractor(new KafkaProducerAttributesExtractor()) + .setErrorCauseExtractor(errorCauseExtractor); + if (captureExperimentalSpanAttributes) { + builder.addAttributesExtractor(new KafkaExperimentalAttributesExtractor<>()); + } + return builder.buildInstrumenter(SpanKindExtractor.alwaysProducer()); } public Instrumenter createConsumerReceiveInstrumenter() { @@ -100,17 +104,21 @@ public Instrumenter createConsumerReceiveInstrumenter KafkaReceiveAttributesGetter getter = KafkaReceiveAttributesGetter.INSTANCE; MessageOperation operation = MessageOperation.RECEIVE; - return Instrumenter.builder( - openTelemetry, - instrumentationName, - MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor( - buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) - .addAttributesExtractor(KafkaReceiveAttributesExtractor.INSTANCE) - .addAttributesExtractors(extractors) - .setErrorCauseExtractor(errorCauseExtractor) - .setEnabled(messagingReceiveInstrumentationEnabled) - .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + InstrumenterBuilder builder = + Instrumenter.builder( + openTelemetry, + instrumentationName, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor( + buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) + .addAttributesExtractor(KafkaReceiveAttributesExtractor.INSTANCE) + .addAttributesExtractors(extractors) + .setErrorCauseExtractor(errorCauseExtractor) + .setEnabled(messagingReceiveInstrumentationEnabled); + if (captureExperimentalSpanAttributes) { + builder.addAttributesExtractor(new KafkaExperimentalAttributesExtractor<>()); + } + return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } public Instrumenter createConsumerProcessInstrumenter() { @@ -151,18 +159,22 @@ public Instrumenter createBatchProcessInstrumenter() KafkaReceiveAttributesGetter getter = KafkaReceiveAttributesGetter.INSTANCE; MessageOperation operation = MessageOperation.PROCESS; - return Instrumenter.builder( - openTelemetry, - instrumentationName, - MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor( - buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) - .addAttributesExtractor(KafkaReceiveAttributesExtractor.INSTANCE) - .addSpanLinksExtractor( - new KafkaBatchProcessSpanLinksExtractor( - openTelemetry.getPropagators().getTextMapPropagator())) - .setErrorCauseExtractor(errorCauseExtractor) - .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + InstrumenterBuilder batchBuilder = + Instrumenter.builder( + openTelemetry, + instrumentationName, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor( + buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) + .addAttributesExtractor(KafkaReceiveAttributesExtractor.INSTANCE) + .addSpanLinksExtractor( + new KafkaBatchProcessSpanLinksExtractor( + openTelemetry.getPropagators().getTextMapPropagator())) + .setErrorCauseExtractor(errorCauseExtractor); + if (captureExperimentalSpanAttributes) { + batchBuilder.addAttributesExtractor(new KafkaExperimentalAttributesExtractor<>()); + } + return batchBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } private static diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProcessRequest.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProcessRequest.java index 5197ba5a6316..0e150e924ea7 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProcessRequest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProcessRequest.java @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal; +import java.util.List; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -17,23 +18,36 @@ public class KafkaProcessRequest extends AbstractKafkaConsumerRequest { private final ConsumerRecord record; public static KafkaProcessRequest create(ConsumerRecord record, Consumer consumer) { - return create(record, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer)); + return create( + record, + KafkaUtil.getConsumerGroup(consumer), + KafkaUtil.getClientId(consumer), + KafkaUtil.getBootstrapServers(consumer)); } public static KafkaProcessRequest create( KafkaConsumerContext consumerContext, ConsumerRecord record) { String consumerGroup = consumerContext != null ? consumerContext.getConsumerGroup() : null; String clientId = consumerContext != null ? consumerContext.getClientId() : null; - return create(record, consumerGroup, clientId); + List bootstrapServers = + consumerContext != null ? consumerContext.getBootstrapServers() : null; + return create(record, consumerGroup, clientId, bootstrapServers); } public static KafkaProcessRequest create( - ConsumerRecord record, String consumerGroup, String clientId) { - return new KafkaProcessRequest(record, consumerGroup, clientId); + ConsumerRecord record, + String consumerGroup, + String clientId, + List bootstrapServers) { + return new KafkaProcessRequest(record, consumerGroup, clientId, bootstrapServers); } - public KafkaProcessRequest(ConsumerRecord record, String consumerGroup, String clientId) { - super(consumerGroup, clientId); + public KafkaProcessRequest( + ConsumerRecord record, + String consumerGroup, + String clientId, + List bootstrapServers) { + super(consumerGroup, clientId, bootstrapServers); this.record = record; } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerRequest.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerRequest.java index 29828eb6d42d..67bfd2974802 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerRequest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerRequest.java @@ -6,6 +6,7 @@ package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal; import java.util.Iterator; +import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.kafka.clients.producer.Producer; @@ -17,20 +18,24 @@ * This class is internal and is hence not for public use. Its APIs are unstable and can change at * any time. */ -public final class KafkaProducerRequest { +public final class KafkaProducerRequest extends AbstractKafkaRequest { private final ProducerRecord record; @Nullable private final String clientId; public static KafkaProducerRequest create(ProducerRecord record, Producer producer) { - return create(record, extractClientId(producer)); + return new KafkaProducerRequest( + record, extractClientId(producer), extractBootstrapServers(producer)); } - public static KafkaProducerRequest create(ProducerRecord record, String clientId) { - return new KafkaProducerRequest(record, clientId); + public static KafkaProducerRequest create( + ProducerRecord record, String clientId, List bootstrapServers) { + return new KafkaProducerRequest(record, clientId, bootstrapServers); } - private KafkaProducerRequest(ProducerRecord record, String clientId) { + private KafkaProducerRequest( + ProducerRecord record, String clientId, List bootstrapServers) { + super(bootstrapServers); this.record = record; this.clientId = clientId; } @@ -53,4 +58,8 @@ private static String extractClientId(Producer producer) { return null; } } + + private static List extractBootstrapServers(Producer producer) { + return KafkaUtil.getBootstrapServers(producer); + } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaReceiveRequest.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaReceiveRequest.java index 09583076433a..b1e7bb07b7ef 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaReceiveRequest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaReceiveRequest.java @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal; +import java.util.List; import javax.annotation.Nullable; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -19,24 +20,36 @@ public class KafkaReceiveRequest extends AbstractKafkaConsumerRequest { public static KafkaReceiveRequest create( ConsumerRecords records, @Nullable Consumer consumer) { - return create(records, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer)); + return create( + records, + KafkaUtil.getConsumerGroup(consumer), + KafkaUtil.getClientId(consumer), + KafkaUtil.getBootstrapServers(consumer)); } public static KafkaReceiveRequest create( KafkaConsumerContext consumerContext, ConsumerRecords records) { String consumerGroup = consumerContext != null ? consumerContext.getConsumerGroup() : null; String clientId = consumerContext != null ? consumerContext.getClientId() : null; - return create(records, consumerGroup, clientId); + List bootstrapServers = + consumerContext != null ? consumerContext.getBootstrapServers() : null; + return create(records, consumerGroup, clientId, bootstrapServers); } public static KafkaReceiveRequest create( - ConsumerRecords records, String consumerGroup, String clientId) { - return new KafkaReceiveRequest(records, consumerGroup, clientId); + ConsumerRecords records, + String consumerGroup, + String clientId, + List bootstrapServers) { + return new KafkaReceiveRequest(records, consumerGroup, clientId, bootstrapServers); } private KafkaReceiveRequest( - ConsumerRecords records, String consumerGroup, String clientId) { - super(consumerGroup, clientId); + ConsumerRecords records, + String consumerGroup, + String clientId, + List bootstrapServers) { + super(consumerGroup, clientId, bootstrapServers); this.records = records; } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaUtil.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaUtil.java index 3049da507d82..1258957428d5 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaUtil.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaUtil.java @@ -9,14 +9,21 @@ import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; /** * This class is internal and is hence not for public use. Its APIs are unstable and can change at @@ -33,6 +40,11 @@ public final class KafkaUtil { private static final MethodHandle GET_GROUP_METADATA; private static final MethodHandle GET_GROUP_ID; + private static final VirtualField, List> consumerVirtualField = + VirtualField.find(Consumer.class, List.class); + private static final VirtualField, List> producerVirtualField = + VirtualField.find(Producer.class, List.class); + static { MethodHandle getGroupMetadata; MethodHandle getGroupId; @@ -66,6 +78,275 @@ public static String getClientId(Consumer consumer) { return getConsumerInfo(consumer).get(CLIENT_ID); } + @Nullable + public static List getBootstrapServers(Consumer consumer) { + List bootstrapServers = consumerVirtualField.get(consumer); + // If bootstrap servers are not available from virtual field (library instrumentation), + // try to extract them via reflection from Kafka client's metadata + if (bootstrapServers == null) { + String bootstrapServersString = extractBootstrapServersViaReflection(consumer); + if (bootstrapServersString != null) { + bootstrapServers = parseBootstrapServers(bootstrapServersString); + consumerVirtualField.set(consumer, bootstrapServers); + } else { + bootstrapServers = Collections.emptyList(); + } + } + return bootstrapServers; + } + + public static List getBootstrapServers(Producer producer) { + List bootstrapServers = producerVirtualField.get(producer); + // If bootstrap servers are not available from virtual field (library instrumentation), + // try to extract them via reflection from Kafka client's metadata + if (bootstrapServers == null) { + String bootstrapServersString = extractBootstrapServersViaReflection(producer); + if (bootstrapServersString != null) { + bootstrapServers = parseBootstrapServers(bootstrapServersString); + producerVirtualField.set(producer, bootstrapServers); + } else { + bootstrapServers = Collections.emptyList(); + } + } + return bootstrapServers; + } + + @Nonnull + public static List parseBootstrapServers(@Nullable Object bootstrapServersConfig) { + if (bootstrapServersConfig == null) { + return Collections.emptyList(); + } + + if (bootstrapServersConfig instanceof Collection) { + @SuppressWarnings("unchecked") + Collection collection = (Collection) bootstrapServersConfig; + return new ArrayList<>(collection); + } + + if (bootstrapServersConfig instanceof String) { + String serversString = (String) bootstrapServersConfig; + return parseBootstrapServers(serversString); + } + + return parseBootstrapServers(bootstrapServersConfig.toString()); + } + + @Nonnull + public static List parseBootstrapServers(@Nullable String bootstrapServersString) { + if (bootstrapServersString == null || bootstrapServersString.trim().isEmpty()) { + return Collections.emptyList(); + } + + return Arrays.stream(bootstrapServersString.split(",")) + .map(String::trim) + .filter(server -> !server.isEmpty()) + .collect(ArrayList::new, ArrayList::add, ArrayList::addAll); + } + + /** + * Extract bootstrap servers from Kafka client's metadata using reflection. 1.metadata -> Cluster + * -> nodes 2.metadata -> MetadataCache -> nodes 3.metadata -> MetadataSnapshot -> nodes + * 4.delegate -> metadata -> ... + */ + @Nullable + private static String extractBootstrapServersViaReflection(Object client) { + if (client == null) { + return null; + } + try { + Object metadata = extractMetadata(client); + + if (metadata == null) { + return null; + } + + String bootstrapServers = extractFromMetadataSnapshot(metadata); + + if (bootstrapServers == null) { + bootstrapServers = extractFromMetadataCache(metadata); + } + if (bootstrapServers == null) { + bootstrapServers = extractFromCluster(metadata); + } + + return bootstrapServers; + } catch (RuntimeException e) { + return null; + } + } + + @Nullable + private static Object extractMetadata(Object client) { + try { + java.lang.reflect.Field metadataField = findField(client.getClass(), "metadata"); + if (metadataField != null) { + metadataField.setAccessible(true); + Object metadata = metadataField.get(client); + if (metadata != null) { + return metadata; + } + } + + java.lang.reflect.Field delegateField = findField(client.getClass(), "delegate"); + if (delegateField != null) { + delegateField.setAccessible(true); + Object delegate = delegateField.get(client); + if (delegate != null) { + java.lang.reflect.Field delegateMetadataField = + findField(delegate.getClass(), "metadata"); + if (delegateMetadataField != null) { + delegateMetadataField.setAccessible(true); + return delegateMetadataField.get(delegate); + } + } + } + + return null; + } catch (Exception e) { + return null; + } + } + + @Nullable + private static java.lang.reflect.Field findField(Class clazz, String fieldName) { + Class currentClass = clazz; + while (currentClass != null) { + try { + return currentClass.getDeclaredField(fieldName); + } catch (NoSuchFieldException e) { + // Field not found in current class, try parent class + currentClass = currentClass.getSuperclass(); + } + } + return null; + } + + @Nullable + private static String extractFromCluster(Object metadata) { + try { + java.lang.reflect.Field clusterField = findField(metadata.getClass(), "cluster"); + if (clusterField == null) { + return null; + } + clusterField.setAccessible(true); + Object cluster = clusterField.get(metadata); + + if (cluster == null) { + return null; + } + + java.lang.reflect.Method nodesMethod = cluster.getClass().getDeclaredMethod("nodes"); + nodesMethod.setAccessible(true); + Object nodes = nodesMethod.invoke(cluster); + + return formatNodesAsBootstrapServers(nodes); + } catch (Exception e) { + return null; + } + } + + @Nullable + private static String extractFromMetadataCache(Object metadata) { + try { + java.lang.reflect.Field cacheField = findField(metadata.getClass(), "cache"); + if (cacheField == null) { + return null; + } + cacheField.setAccessible(true); + Object cache = cacheField.get(metadata); + + if (cache == null) { + return null; + } + + java.lang.reflect.Field nodesField = cache.getClass().getDeclaredField("nodes"); + nodesField.setAccessible(true); + Object nodes = nodesField.get(cache); + + return formatNodesAsBootstrapServers(nodes); + } catch (Exception e) { + return null; + } + } + + @Nullable + private static String extractFromMetadataSnapshot(Object metadata) { + try { + java.lang.reflect.Field snapshotField = findField(metadata.getClass(), "metadataSnapshot"); + if (snapshotField == null) { + return null; + } + snapshotField.setAccessible(true); + Object snapshot = snapshotField.get(metadata); + + if (snapshot == null) { + return null; + } + + java.lang.reflect.Field nodesField = snapshot.getClass().getDeclaredField("nodes"); + nodesField.setAccessible(true); + Object nodes = nodesField.get(snapshot); + + return formatNodesAsBootstrapServers(nodes); + } catch (Exception e) { + return null; + } + } + + @Nullable + private static String formatNodesAsBootstrapServers(Object nodes) { + if (nodes == null) { + return null; + } + + try { + StringBuilder sb = new StringBuilder(); + + if (nodes instanceof java.util.Map) { + // nodes is Map + @SuppressWarnings("unchecked") + java.util.Map nodeMap = (java.util.Map) nodes; + + for (Object node : nodeMap.values()) { + String address = getNodeAddress(node); + if (address != null) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append(address); + } + } + } else if (nodes instanceof java.util.Collection) { + // nodes is Collection + @SuppressWarnings("unchecked") + java.util.Collection nodeCollection = (java.util.Collection) nodes; + + for (Object node : nodeCollection) { + String address = getNodeAddress(node); + if (address != null) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append(address); + } + } + } + + return sb.length() > 0 ? sb.toString() : null; + } catch (RuntimeException e) { + return null; + } + } + + @Nullable + private static String getNodeAddress(Object o) { + if (o == null || !(o instanceof Node)) { + return null; + } + Node node = (Node) o; + return node.host() + ":" + node.port(); + } + private static Map getConsumerInfo(Consumer consumer) { if (consumer == null) { return Collections.emptyMap(); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaUtilTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaUtilTest.java new file mode 100644 index 000000000000..152c82f61ecd --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaUtilTest.java @@ -0,0 +1,78 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.jupiter.api.Test; + +class KafkaUtilTest { + + @Test + void parseBootstrapServers_withString() { + // Test single server + List result = KafkaUtil.parseBootstrapServers("localhost:9092"); + assertThat(result).containsExactly("localhost:9092"); + + // Test multiple servers + result = KafkaUtil.parseBootstrapServers("localhost:9092,localhost:9093,localhost:9094"); + assertThat(result).containsExactly("localhost:9092", "localhost:9093", "localhost:9094"); + + // Test with spaces + result = KafkaUtil.parseBootstrapServers(" localhost:9092 , localhost:9093 , localhost:9094 "); + assertThat(result).containsExactly("localhost:9092", "localhost:9093", "localhost:9094"); + + // Test empty string + result = KafkaUtil.parseBootstrapServers(""); + assertThat(result).isEmpty(); + + // Test null string + result = KafkaUtil.parseBootstrapServers((String) null); + assertThat(result).isEmpty(); + } + + @Test + void parseBootstrapServers_withObject() { + // Test with String object + List result = KafkaUtil.parseBootstrapServers((Object) "localhost:9092,localhost:9093"); + assertThat(result).containsExactly("localhost:9092", "localhost:9093"); + + // Test with List object + List inputList = Arrays.asList("localhost:9092", "localhost:9093"); + result = KafkaUtil.parseBootstrapServers(inputList); + assertThat(result).containsExactly("localhost:9092", "localhost:9093"); + + // Test with empty List + result = KafkaUtil.parseBootstrapServers(Collections.emptyList()); + assertThat(result).isEmpty(); + + // Test with null object + result = KafkaUtil.parseBootstrapServers((Object) null); + assertThat(result).isEmpty(); + + // Test with unsupported object type + result = KafkaUtil.parseBootstrapServers(123); + assertThat(result).containsExactly("123"); + } + + @Test + void parseBootstrapServers_edgeCases() { + // Test with only commas + List result = KafkaUtil.parseBootstrapServers(",,,"); + assertThat(result).isEmpty(); + + // Test with empty elements + result = KafkaUtil.parseBootstrapServers("localhost:9092,,localhost:9093"); + assertThat(result).containsExactly("localhost:9092", "localhost:9093"); + + // Test with whitespace only elements + result = KafkaUtil.parseBootstrapServers("localhost:9092, ,localhost:9093"); + assertThat(result).containsExactly("localhost:9092", "localhost:9093"); + } +} diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java index 345761431682..a806e2a8349b 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java @@ -5,8 +5,10 @@ package io.opentelemetry.javaagent.instrumentation.kafkastreams; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; import static java.util.Arrays.asList; import static java.util.Collections.singleton; +import static org.assertj.core.api.Assertions.assertThat; import com.google.common.collect.ImmutableMap; import io.opentelemetry.api.common.AttributeKey; @@ -15,10 +17,12 @@ import io.opentelemetry.context.propagation.TextMapGetter; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -52,6 +56,22 @@ abstract class KafkaStreamsBaseTest { protected static final AttributeKey MESSAGING_CLIENT_ID = AttributeKey.stringKey("messaging.client_id"); + + protected static final AttributeKey> MESSAGING_KAFKA_BOOTSTRAP_SERVERS = + AttributeKey.stringArrayKey("messaging.kafka.bootstrap.servers"); + + protected static AttributeAssertion bootstrapServersAssertion() { + return satisfies( + MESSAGING_KAFKA_BOOTSTRAP_SERVERS, + listAssert -> { + if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) { + listAssert.isNotEmpty().allSatisfy(server -> assertThat(server).isNotEmpty()); + } else { + listAssert.isNullOrEmpty(); + } + }); + } + protected static final String STREAM_PENDING = "test.pending"; protected static final String STREAM_PROCESSED = "test.processed"; diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java index 930b639a0e18..f20f838e3f57 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java @@ -113,6 +113,7 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { equalTo(MESSAGING_SYSTEM, KAFKA), equalTo(MESSAGING_DESTINATION_NAME, STREAM_PENDING), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("producer")), satisfies( MESSAGING_DESTINATION_PARTITION_ID, @@ -131,6 +132,7 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { equalTo(MESSAGING_SYSTEM, KAFKA), equalTo(MESSAGING_DESTINATION_NAME, STREAM_PENDING), equalTo(MESSAGING_OPERATION, "receive"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("consumer")), equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1))); if (Boolean.getBoolean("testLatestDeps")) { @@ -149,6 +151,7 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { equalTo(MESSAGING_SYSTEM, KAFKA), equalTo(MESSAGING_DESTINATION_NAME, STREAM_PENDING), equalTo(MESSAGING_OPERATION, "process"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("consumer")), satisfies(MESSAGING_MESSAGE_BODY_SIZE, k -> k.isInstanceOf(Long.class)), satisfies( @@ -180,6 +183,7 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { equalTo(MESSAGING_SYSTEM, KAFKA), equalTo(MESSAGING_DESTINATION_NAME, STREAM_PROCESSED), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("producer")), satisfies( MESSAGING_DESTINATION_PARTITION_ID, @@ -198,6 +202,7 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { equalTo(MESSAGING_SYSTEM, KAFKA), equalTo(MESSAGING_DESTINATION_NAME, STREAM_PROCESSED), equalTo(MESSAGING_OPERATION, "receive"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("consumer")), equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1))); if (Boolean.getBoolean("testLatestDeps")) { @@ -216,6 +221,7 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { equalTo(MESSAGING_SYSTEM, KAFKA), equalTo(MESSAGING_DESTINATION_NAME, STREAM_PROCESSED), equalTo(MESSAGING_OPERATION, "process"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("consumer")), satisfies( MESSAGING_MESSAGE_BODY_SIZE, k -> k.isInstanceOf(Long.class)), diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java index 1d66b2a5abd4..d1c13aec1449 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java @@ -103,6 +103,7 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { equalTo(MESSAGING_DESTINATION_NAME, STREAM_PENDING), equalTo(MESSAGING_OPERATION, "publish"), equalTo(MESSAGING_CLIENT_ID, "producer-1"), + bootstrapServersAssertion(), satisfies( MESSAGING_DESTINATION_PARTITION_ID, k -> k.isInstanceOf(String.class)), @@ -116,6 +117,7 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { equalTo(MESSAGING_SYSTEM, KAFKA), equalTo(MESSAGING_DESTINATION_NAME, STREAM_PENDING), equalTo(MESSAGING_OPERATION, "process"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("consumer")), satisfies( MESSAGING_MESSAGE_BODY_SIZE, k -> k.isInstanceOf(Long.class)), @@ -148,6 +150,7 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { equalTo(MESSAGING_SYSTEM, KAFKA), equalTo(MESSAGING_DESTINATION_NAME, STREAM_PROCESSED), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, k -> k.isInstanceOf(String.class)), satisfies( MESSAGING_DESTINATION_PARTITION_ID, @@ -162,6 +165,7 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { equalTo(MESSAGING_SYSTEM, KAFKA), equalTo(MESSAGING_DESTINATION_NAME, STREAM_PROCESSED), equalTo(MESSAGING_OPERATION, "process"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("consumer")), satisfies( MESSAGING_MESSAGE_BODY_SIZE, k -> k.isInstanceOf(Long.class)), diff --git a/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java b/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java index e73f8d8d273f..75788ae1fe32 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java +++ b/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java @@ -20,6 +20,7 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; import static java.util.Arrays.asList; import static java.util.Collections.singleton; +import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; @@ -71,6 +72,21 @@ public abstract class AbstractReactorKafkaTest { @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + protected static final AttributeKey> MESSAGING_KAFKA_BOOTSTRAP_SERVERS = + AttributeKey.stringArrayKey("messaging.kafka.bootstrap.servers"); + + protected static AttributeAssertion bootstrapServersAssertion() { + return satisfies( + MESSAGING_KAFKA_BOOTSTRAP_SERVERS, + listAssert -> { + if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) { + listAssert.isNotEmpty().allSatisfy(server -> assertThat(server).isNotEmpty()); + } else { + listAssert.isNullOrEmpty(); + } + }); + } + static KafkaContainer kafka; protected static KafkaSender sender; protected static KafkaReceiver receiver; @@ -192,6 +208,7 @@ protected static List sendAttributes(ProducerRecord stringAssert.startsWith("producer")), @@ -212,6 +229,7 @@ protected static List receiveAttributes(String topic) { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(MESSAGING_OPERATION, "receive"), + bootstrapServersAssertion(), satisfies( AttributeKey.stringKey("messaging.client_id"), stringAssert -> stringAssert.startsWith("consumer")), @@ -231,6 +249,7 @@ protected static List processAttributes( equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, record.topic()), equalTo(MESSAGING_OPERATION, "process"), + bootstrapServersAssertion(), satisfies( AttributeKey.stringKey("messaging.client_id"), stringAssert -> stringAssert.startsWith("consumer")), diff --git a/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/internal/instrumentation/kafka/KafkaInstrumentationAutoConfiguration.java b/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/internal/instrumentation/kafka/KafkaInstrumentationAutoConfiguration.java index 80734c2f2c64..bcea9a0a8ce3 100644 --- a/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/internal/instrumentation/kafka/KafkaInstrumentationAutoConfiguration.java +++ b/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/internal/instrumentation/kafka/KafkaInstrumentationAutoConfiguration.java @@ -30,8 +30,14 @@ public class KafkaInstrumentationAutoConfiguration { @Bean DefaultKafkaProducerFactoryCustomizer otelKafkaProducerFactoryCustomizer( - OpenTelemetry openTelemetry) { - KafkaTelemetry kafkaTelemetry = KafkaTelemetry.create(openTelemetry); + OpenTelemetry openTelemetry, ObjectProvider configPropertiesProvider) { + KafkaTelemetry kafkaTelemetry = + KafkaTelemetry.builder(openTelemetry) + .setCaptureExperimentalSpanAttributes( + configPropertiesProvider + .getObject() + .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false)) + .build(); return producerFactory -> producerFactory.addPostProcessor(kafkaTelemetry::wrap); } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java index 6818653482ef..aa9e632b01c0 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java @@ -85,6 +85,7 @@ void shouldCreateSpansForSingleRecordProcess() { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies( MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies( @@ -106,6 +107,7 @@ void shouldCreateSpansForSingleRecordProcess() { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), equalTo(MESSAGING_OPERATION, "receive"), + bootstrapServersAssertion(), equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testSingleListener"), satisfies( MESSAGING_CLIENT_ID, @@ -120,6 +122,7 @@ void shouldCreateSpansForSingleRecordProcess() { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), equalTo(MESSAGING_OPERATION, "process"), + bootstrapServersAssertion(), satisfies( MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), satisfies( @@ -160,6 +163,7 @@ void shouldHandleFailureInKafkaListener() { equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), equalTo(MESSAGING_OPERATION, "receive"), equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testSingleListener"), + bootstrapServersAssertion(), satisfies( MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1)); @@ -168,6 +172,7 @@ void shouldHandleFailureInKafkaListener() { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), equalTo(MESSAGING_OPERATION, "process"), + bootstrapServersAssertion(), satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -190,6 +195,7 @@ void shouldHandleFailureInKafkaListener() { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies( MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies( @@ -260,6 +266,7 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies( MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies( @@ -276,6 +283,7 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies( MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies( @@ -299,6 +307,7 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(MESSAGING_OPERATION, "receive"), equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"), + bootstrapServersAssertion(), satisfies( MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), @@ -315,6 +324,7 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(MESSAGING_OPERATION, "process"), equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"), + bootstrapServersAssertion(), satisfies( MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), @@ -349,6 +359,7 @@ void shouldHandleFailureInKafkaBatchListener() { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies( MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies( @@ -406,6 +417,7 @@ private static void assertReceiveSpan(SpanDataAssert span) { equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(MESSAGING_OPERATION, "receive"), equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1)); } @@ -421,6 +433,7 @@ private static void assertProcessSpan( equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(MESSAGING_OPERATION, "process"), equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1)); if (failed) { diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java index bcb57ad88c69..93b6b352b8b2 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java @@ -64,6 +64,7 @@ void shouldCreateSpansForSingleRecordProcess() { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies( MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), @@ -83,6 +84,7 @@ void shouldCreateSpansForSingleRecordProcess() { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), equalTo(MESSAGING_OPERATION, "process"), + bootstrapServersAssertion(), satisfies( MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), satisfies( @@ -118,6 +120,7 @@ void shouldHandleFailureInKafkaListener() { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), equalTo(MESSAGING_OPERATION, "process"), + bootstrapServersAssertion(), satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -138,6 +141,7 @@ void shouldHandleFailureInKafkaListener() { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies( MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), @@ -198,6 +202,7 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies( MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), @@ -216,6 +221,7 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies( MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), @@ -245,6 +251,7 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(MESSAGING_OPERATION, "process"), equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"), + bootstrapServersAssertion(), satisfies( MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), @@ -273,6 +280,7 @@ void shouldHandleFailureInKafkaBatchListener() { equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(MESSAGING_OPERATION, "process"), equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1)); @@ -290,6 +298,7 @@ void shouldHandleFailureInKafkaBatchListener() { equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies( MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaTest.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaTest.java index 3a53fcce2aaa..fd5b01ea168f 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaTest.java +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaTest.java @@ -5,11 +5,13 @@ package io.opentelemetry.testing; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; import io.opentelemetry.sdk.trace.data.LinkData; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; @@ -41,6 +43,22 @@ public abstract class AbstractSpringKafkaTest { protected static final AttributeKey MESSAGING_CLIENT_ID = AttributeKey.stringKey("messaging.client_id"); + + protected static final AttributeKey> MESSAGING_KAFKA_BOOTSTRAP_SERVERS = + AttributeKey.stringArrayKey("messaging.kafka.bootstrap.servers"); + + protected static AttributeAssertion bootstrapServersAssertion() { + return satisfies( + MESSAGING_KAFKA_BOOTSTRAP_SERVERS, + listAssert -> { + if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) { + listAssert.isNotEmpty().allSatisfy(server -> assertThat(server).isNotEmpty()); + } else { + listAssert.isNullOrEmpty(); + } + }); + } + static KafkaContainer kafka; ConfigurableApplicationContext applicationContext; diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractVertxKafkaTest.java index b0cdd3070a82..07e12a955dc0 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractVertxKafkaTest.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractVertxKafkaTest.java @@ -17,6 +17,7 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.KAFKA; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; import io.opentelemetry.api.common.AttributeKey; @@ -59,6 +60,21 @@ public abstract class AbstractVertxKafkaTest { private static final AttributeKey MESSAGING_CLIENT_ID = AttributeKey.stringKey("messaging.client_id"); + protected static final AttributeKey> MESSAGING_KAFKA_BOOTSTRAP_SERVERS = + AttributeKey.stringArrayKey("messaging.kafka.bootstrap.servers"); + + protected static AttributeAssertion bootstrapServersAssertion() { + return satisfies( + MESSAGING_KAFKA_BOOTSTRAP_SERVERS, + listAssert -> { + if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) { + listAssert.isNotEmpty().allSatisfy(server -> assertThat(server).isNotEmpty()); + } else { + listAssert.isNullOrEmpty(); + } + }); + } + KafkaContainer kafka; Vertx vertx; protected KafkaProducer kafkaProducer; @@ -179,6 +195,7 @@ protected static List sendAttributes( equalTo(MESSAGING_SYSTEM, KAFKA), equalTo(MESSAGING_DESTINATION_NAME, record.topic()), equalTo(MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative))); @@ -205,6 +222,7 @@ private List batchConsumerAttributes(String topic, String op equalTo(MESSAGING_SYSTEM, KAFKA), equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(MESSAGING_OPERATION, operation), + bootstrapServersAssertion(), satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), satisfies(MESSAGING_BATCH_MESSAGE_COUNT, AbstractLongAssert::isPositive))); if (hasConsumerGroup()) { @@ -221,6 +239,7 @@ protected List processAttributes(KafkaProducerRecord stringAssert.startsWith("consumer")), satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative))); diff --git a/smoke-tests-otel-starter/spring-boot-common/src/main/java/io/opentelemetry/spring/smoketest/AbstractKafkaSpringStarterSmokeTest.java b/smoke-tests-otel-starter/spring-boot-common/src/main/java/io/opentelemetry/spring/smoketest/AbstractKafkaSpringStarterSmokeTest.java index f92feb482367..91b53300edfd 100644 --- a/smoke-tests-otel-starter/spring-boot-common/src/main/java/io/opentelemetry/spring/smoketest/AbstractKafkaSpringStarterSmokeTest.java +++ b/smoke-tests-otel-starter/spring-boot-common/src/main/java/io/opentelemetry/spring/smoketest/AbstractKafkaSpringStarterSmokeTest.java @@ -7,11 +7,14 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; +import java.util.List; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.assertj.core.api.AbstractLongAssert; @@ -31,6 +34,16 @@ abstract class AbstractKafkaSpringStarterSmokeTest extends AbstractSpringStarter private static final AttributeKey MESSAGING_CLIENT_ID = AttributeKey.stringKey("messaging.client_id"); + protected static final AttributeKey> MESSAGING_KAFKA_BOOTSTRAP_SERVERS = + AttributeKey.stringArrayKey("messaging.kafka.bootstrap.servers"); + + protected static AttributeAssertion bootstrapServersAssertion() { + return satisfies( + MESSAGING_KAFKA_BOOTSTRAP_SERVERS, + listAssert -> + listAssert.isNotEmpty().allSatisfy(server -> assertThat(server).isNotEmpty())); + } + @SuppressWarnings("deprecation") // using deprecated semconv @Test void shouldInstrumentProducerAndConsumer() { @@ -65,6 +78,7 @@ void shouldInstrumentProducerAndConsumer() { MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, "testTopic"), equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"), + bootstrapServersAssertion(), satisfies( MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), @@ -100,6 +114,7 @@ void shouldInstrumentProducerAndConsumer() { equalTo( MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "testListener"), + bootstrapServersAssertion(), satisfies( AttributeKey.longKey("kafka.record.queue_time_ms"), AbstractLongAssert::isNotNegative), diff --git a/smoke-tests-otel-starter/spring-boot-common/src/main/resources/META-INF/native-image/reflect-config.json b/smoke-tests-otel-starter/spring-boot-common/src/main/resources/META-INF/native-image/reflect-config.json index ab4757f6961c..413d815a8c1d 100644 --- a/smoke-tests-otel-starter/spring-boot-common/src/main/resources/META-INF/native-image/reflect-config.json +++ b/smoke-tests-otel-starter/spring-boot-common/src/main/resources/META-INF/native-image/reflect-config.json @@ -1,4 +1,57 @@ [ + { + "name": "org.apache.kafka.clients.consumer.KafkaConsumer", + "fields": [ + {"name": "metadata", "allowWrite": true}, + {"name": "delegate", "allowWrite": true} + ] + }, + { + "name": "org.apache.kafka.clients.producer.KafkaProducer", + "fields": [ + {"name": "metadata", "allowWrite": true}, + {"name": "delegate", "allowWrite": true} + ] + }, + { + "name": "org.apache.kafka.clients.Metadata", + "fields": [ + {"name": "cluster", "allowWrite": true}, + {"name": "cache", "allowWrite": true}, + {"name": "metadataSnapshot", "allowWrite": true} + ] + }, + { + "name": "org.apache.kafka.common.Cluster", + "methods": [ + {"name": "nodes", "parameterTypes": []} + ] + }, + { + "name": "org.apache.kafka.common.Node", + "methods": [ + {"name": "host", "parameterTypes": []}, + {"name": "port", "parameterTypes": []} + ] + }, + { + "name": "org.apache.kafka.clients.MetadataCache", + "condition": { + "typeReachable": "org.apache.kafka.clients.Metadata" + }, + "fields": [ + {"name": "nodes", "allowWrite": true} + ] + }, + { + "name": "org.apache.kafka.clients.MetadataSnapshot", + "condition": { + "typeReachable": "org.apache.kafka.clients.Metadata" + }, + "fields": [ + {"name": "nodes", "allowWrite": true} + ] + }, { "condition": { "typeReachable": "org.apache.commons.pool2.impl.BaseGenericObjectPool"