diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts b/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts index 26128e9ff7bb..eb2c2dda026c 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts @@ -87,7 +87,6 @@ tasks { withType().configureEach { usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) systemProperty("collectMetadata", findProperty("collectMetadata")) - jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") } val testExperimental by registering(Test::class) { @@ -99,11 +98,19 @@ tasks { systemProperty("hasConsumerGroup", testLatestDeps) } + val testReceiveSpansDisabled by registering(Test::class) { + testClassesDirs = sourceSets.test.get().output.classesDirs + classpath = sourceSets.test.get().runtimeClasspath + + systemProperty("hasConsumerGroup", testLatestDeps) + } + test { systemProperty("hasConsumerGroup", testLatestDeps) + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") } check { - dependsOn(testing.suites, testExperimental) + dependsOn(testing.suites, testExperimental, testReceiveSpansDisabled) } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java b/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java index 22392e309d21..3c6036e962a5 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java +++ b/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java @@ -66,6 +66,9 @@ public abstract class AbstractReactorKafkaTest { private static final Logger logger = LoggerFactory.getLogger(AbstractReactorKafkaTest.class); + private static final boolean receiveTelemetryEnabled = + Boolean.getBoolean("otel.instrumentation.messaging.experimental.receive-telemetry.enabled"); + @RegisterExtension protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); @@ -153,6 +156,14 @@ protected void testSingleRecordProcess( Flux producer = sender.send(Flux.just(record)); testing.runWithSpan("producer", () -> producer.blockLast(Duration.ofSeconds(30))); + if (receiveTelemetryEnabled) { + assertWithReceiveTelemetry(record); + } else { + assertWithoutReceiveTelemetry(record); + } + } + + private static void assertWithReceiveTelemetry(SenderRecord record) { AtomicReference producerSpan = new AtomicReference<>(); testing.waitAndAssertSortedTraces( @@ -184,6 +195,24 @@ protected void testSingleRecordProcess( span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); } + private static void assertWithoutReceiveTelemetry(SenderRecord record) { + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testTopic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes(record)), + span -> + span.hasName("testTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly(processAttributes(record)), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + } + @SuppressWarnings("deprecation") // using deprecated semconv protected static List sendAttributes(ProducerRecord record) { List assertions =