From f5dd853e8bed8735ceeb04875e00ccd02c97dc73 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Mon, 30 Mar 2026 14:30:43 +0300 Subject: [PATCH 1/3] Test reactor-kafka without receive telemetry --- .../javaagent/build.gradle.kts | 9 ++++-- .../v1_0/ReactorKafkaInstrumentationTest.java | 1 + .../kafka/v1_0/AbstractReactorKafkaTest.java | 29 +++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts b/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts index 26128e9ff7bb..c29800b9746f 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts @@ -87,7 +87,6 @@ tasks { withType().configureEach { usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) systemProperty("collectMetadata", findProperty("collectMetadata")) - jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") } val testExperimental by registering(Test::class) { @@ -99,11 +98,17 @@ tasks { systemProperty("hasConsumerGroup", testLatestDeps) } + val testReceiveSpansDisabled by registering(Test::class) { + testClassesDirs = sourceSets.test.get().output.classesDirs + classpath = sourceSets.test.get().runtimeClasspath + } + test { systemProperty("hasConsumerGroup", testLatestDeps) + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") } check { - dependsOn(testing.suites, testExperimental) + dependsOn(testing.suites, testExperimental, testReceiveSpansDisabled) } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java index c04cf040fe42..24f2f9f0d3db 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.Test; +// XXX ilma receive telemetryta class ReactorKafkaInstrumentationTest extends AbstractReactorKafkaTest { @Test diff --git a/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java b/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java index 22392e309d21..3c6036e962a5 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java +++ b/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java @@ -66,6 +66,9 @@ public abstract class AbstractReactorKafkaTest { private static final Logger logger = LoggerFactory.getLogger(AbstractReactorKafkaTest.class); + private static final boolean receiveTelemetryEnabled = + Boolean.getBoolean("otel.instrumentation.messaging.experimental.receive-telemetry.enabled"); + @RegisterExtension protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); @@ -153,6 +156,14 @@ protected void testSingleRecordProcess( Flux producer = sender.send(Flux.just(record)); testing.runWithSpan("producer", () -> producer.blockLast(Duration.ofSeconds(30))); + if (receiveTelemetryEnabled) { + assertWithReceiveTelemetry(record); + } else { + assertWithoutReceiveTelemetry(record); + } + } + + private static void assertWithReceiveTelemetry(SenderRecord record) { AtomicReference producerSpan = new AtomicReference<>(); testing.waitAndAssertSortedTraces( @@ -184,6 +195,24 @@ protected void testSingleRecordProcess( span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); } + private static void assertWithoutReceiveTelemetry(SenderRecord record) { + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testTopic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes(record)), + span -> + span.hasName("testTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly(processAttributes(record)), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + } + @SuppressWarnings("deprecation") // using deprecated semconv protected static List sendAttributes(ProducerRecord record) { List assertions = From b712f4f324f2fb46cfd4e7dbcd567e5d34a8fb13 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Mon, 30 Mar 2026 15:40:10 +0300 Subject: [PATCH 2/3] Update instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java Co-authored-by: Jay DeLuca --- .../reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java index 24f2f9f0d3db..c04cf040fe42 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java @@ -7,7 +7,6 @@ import org.junit.jupiter.api.Test; -// XXX ilma receive telemetryta class ReactorKafkaInstrumentationTest extends AbstractReactorKafkaTest { @Test From c5eb744c8a66aac33be1cfdaeb87db2b441784ea Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Mon, 30 Mar 2026 16:30:59 +0300 Subject: [PATCH 3/3] fix --- .../reactor/reactor-kafka-1.0/javaagent/build.gradle.kts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts b/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts index c29800b9746f..eb2c2dda026c 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts @@ -101,6 +101,8 @@ tasks { val testReceiveSpansDisabled by registering(Test::class) { testClassesDirs = sourceSets.test.get().output.classesDirs classpath = sourceSets.test.get().runtimeClasspath + + systemProperty("hasConsumerGroup", testLatestDeps) } test {