Skip to content

Commit a615051

Browse files
lauritjaydeluca
andauthored
Test reactor-kafka without receive telemetry (#17225)
Co-authored-by: Jay DeLuca <jaydeluca4@gmail.com>
1 parent a81f45e commit a615051

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed

instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ tasks {
8787
withType<Test>().configureEach {
8888
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
8989
systemProperty("collectMetadata", findProperty("collectMetadata"))
90-
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
9190
}
9291

9392
val testExperimental by registering(Test::class) {
@@ -99,11 +98,19 @@ tasks {
9998
systemProperty("hasConsumerGroup", testLatestDeps)
10099
}
101100

101+
val testReceiveSpansDisabled by registering(Test::class) {
102+
testClassesDirs = sourceSets.test.get().output.classesDirs
103+
classpath = sourceSets.test.get().runtimeClasspath
104+
105+
systemProperty("hasConsumerGroup", testLatestDeps)
106+
}
107+
102108
test {
103109
systemProperty("hasConsumerGroup", testLatestDeps)
110+
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
104111
}
105112

106113
check {
107-
dependsOn(testing.suites, testExperimental)
114+
dependsOn(testing.suites, testExperimental, testReceiveSpansDisabled)
108115
}
109116
}

instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ public abstract class AbstractReactorKafkaTest {
6666

6767
private static final Logger logger = LoggerFactory.getLogger(AbstractReactorKafkaTest.class);
6868

69+
private static final boolean receiveTelemetryEnabled =
70+
Boolean.getBoolean("otel.instrumentation.messaging.experimental.receive-telemetry.enabled");
71+
6972
@RegisterExtension
7073
protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
7174

@@ -153,6 +156,14 @@ protected void testSingleRecordProcess(
153156
Flux<?> producer = sender.send(Flux.just(record));
154157
testing.runWithSpan("producer", () -> producer.blockLast(Duration.ofSeconds(30)));
155158

159+
if (receiveTelemetryEnabled) {
160+
assertWithReceiveTelemetry(record);
161+
} else {
162+
assertWithoutReceiveTelemetry(record);
163+
}
164+
}
165+
166+
private static void assertWithReceiveTelemetry(SenderRecord<String, String, Object> record) {
156167
AtomicReference<SpanData> producerSpan = new AtomicReference<>();
157168

158169
testing.waitAndAssertSortedTraces(
@@ -184,6 +195,24 @@ protected void testSingleRecordProcess(
184195
span -> span.hasName("consumer").hasParent(trace.getSpan(1))));
185196
}
186197

198+
private static void assertWithoutReceiveTelemetry(SenderRecord<String, String, Object> record) {
199+
testing.waitAndAssertTraces(
200+
trace ->
201+
trace.hasSpansSatisfyingExactly(
202+
span -> span.hasName("producer"),
203+
span ->
204+
span.hasName("testTopic publish")
205+
.hasKind(SpanKind.PRODUCER)
206+
.hasParent(trace.getSpan(0))
207+
.hasAttributesSatisfyingExactly(sendAttributes(record)),
208+
span ->
209+
span.hasName("testTopic process")
210+
.hasKind(SpanKind.CONSUMER)
211+
.hasParent(trace.getSpan(1))
212+
.hasAttributesSatisfyingExactly(processAttributes(record)),
213+
span -> span.hasName("consumer").hasParent(trace.getSpan(2))));
214+
}
215+
187216
@SuppressWarnings("deprecation") // using deprecated semconv
188217
protected static List<AttributeAssertion> sendAttributes(ProducerRecord<String, String> record) {
189218
List<AttributeAssertion> assertions =

0 commit comments

Comments
 (0)