Skip to content

Commit 15518bb

Browse files
committed
wip: removing the parent child relationship between kafka producer and kafka connect spans.
1 parent e4dc787 commit 15518bb

File tree

3 files changed

+44
-13
lines changed

3 files changed

+44
-13
lines changed

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,11 @@ public static void onEnter(
4646
@Advice.Local("otelContext") Context context,
4747
@Advice.Local("otelScope") Scope scope) {
4848

49+
// For batch processing from multiple traces, use root context instead of extracting
50+
// from first record to avoid creating incorrect parent-child relationships.
51+
// Relationships are maintained through span links (see KafkaConnectBatchProcessSpanLinksExtractor)
4952
Context parentContext = Java8BytecodeBridge.currentContext();
5053

51-
// Extract context from first record if available
52-
if (!records.isEmpty()) {
53-
SinkRecord firstRecord = records.iterator().next();
54-
Context extractedContext =
55-
KafkaConnectSingletons.propagator()
56-
.extract(
57-
parentContext, firstRecord, KafkaConnectSingletons.sinkRecordHeaderGetter());
58-
parentContext = extractedContext;
59-
}
60-
6154
task = new KafkaConnectTask(records);
6255
if (!instrumenter().shouldStart(parentContext, task)) {
6356
return;

instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/MongoKafkaConnectSinkTaskTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,18 @@ public void testKafkaConnectMongoSinkTaskInstrumentation()
360360
String tracesJson =
361361
given().when().get(backendUrl + "/get-traces").then().statusCode(200).extract().asString();
362362

363+
// Write resourceSpans to desktop file for inspection
364+
try {
365+
java.nio.file.Path desktopPath =
366+
java.nio.file.Paths.get(
367+
System.getProperty("user.home"), "Desktop", "kafka-connect-single-topic-spans.json");
368+
java.nio.file.Files.write(
369+
desktopPath, tracesJson.getBytes(java.nio.charset.StandardCharsets.UTF_8));
370+
System.out.println("✅ Wrote resourceSpans to: " + desktopPath.toString());
371+
} catch (Exception e) {
372+
System.err.println("❌ Failed to write spans to desktop: " + e.getMessage());
373+
}
374+
363375
// Extract spans and links using separate deserialization method
364376
TracingData tracingData;
365377
try {
@@ -405,6 +417,11 @@ public void testKafkaConnectMongoSinkTaskInstrumentation()
405417
.isNotNull()
406418
.as("Database span parent should be Kafka Connect Consumer span")
407419
.isEqualTo(tracingData.kafkaConnectConsumerSpan.spanId);
420+
421+
// Assertion 5: Verify Kafka Connect Consumer span has no parent (batch processing pattern)
422+
assertThat(tracingData.kafkaConnectConsumerSpan.parentSpanId)
423+
.as("Kafka Connect Consumer span should have no parent for batch processing from multiple traces")
424+
.isNull();
408425
}
409426

410427
@Test
@@ -534,6 +551,11 @@ public void testKafkaConnectMongoSinkTaskMultiTopicInstrumentation()
534551
.isNotNull()
535552
.as("Database span parent should be Kafka Connect Consumer span")
536553
.isEqualTo(tracingData.kafkaConnectConsumerSpan.spanId);
554+
555+
// Assertion 7: Verify Kafka Connect Consumer span has no parent (batch processing pattern)
556+
assertThat(tracingData.kafkaConnectConsumerSpan.parentSpanId)
557+
.as("Kafka Connect Consumer span should have no parent for batch processing from multiple traces")
558+
.isNull();
537559
}
538560

539561
// Private methods

instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/PostgresKafkaConnectSinkTaskTest.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,18 @@ public void testKafkaConnectPostgresSinkTaskInstrumentation()
386386
String tracesJson =
387387
given().when().get(backendUrl + "/get-traces").then().statusCode(200).extract().asString();
388388

389+
// Write resourceSpans to desktop file for inspection
390+
try {
391+
java.nio.file.Path desktopPath =
392+
java.nio.file.Paths.get(
393+
System.getProperty("user.home"), "Desktop", "kafka-connect-postgres-spans.json");
394+
java.nio.file.Files.write(
395+
desktopPath, tracesJson.getBytes(java.nio.charset.StandardCharsets.UTF_8));
396+
System.out.println("✅ Wrote resourceSpans to: " + desktopPath.toString());
397+
} catch (Exception e) {
398+
System.err.println("❌ Failed to write spans to desktop: " + e.getMessage());
399+
}
400+
389401
// Extract spans and links using separate deserialization method
390402
TracingData tracingData;
391403
try {
@@ -431,6 +443,11 @@ public void testKafkaConnectPostgresSinkTaskInstrumentation()
431443
.isNotNull()
432444
.as("Database span parent should be Kafka Connect Consumer span")
433445
.isEqualTo(tracingData.kafkaConnectConsumerSpan.spanId);
446+
447+
// Assertion 5: Verify Kafka Connect Consumer span has no parent (batch processing pattern)
448+
assertThat(tracingData.kafkaConnectConsumerSpan.parentSpanId)
449+
.as("Kafka Connect Consumer span should have no parent for batch processing from multiple traces")
450+
.isNull();
434451
}
435452

436453
@AfterAll
@@ -661,7 +678,7 @@ private static TracingData deserializeAndExtractSpans(String tracesJson, String
661678
&& spanName.contains(expectedTopicName)
662679
&& spanKind.equals("SPAN_KIND_CONSUMER")) {
663680
kafkaConnectConsumerSpan =
664-
new SpanInfo(spanName, traceId, spanId, parentSpanId, spanKind);
681+
new SpanInfo(traceId, spanId, parentSpanId, spanKind);
665682

666683
// Extract span link information for verification
667684
JsonNode linksArray = spanNode.get("links");
@@ -681,7 +698,7 @@ private static TracingData deserializeAndExtractSpans(String tracesJson, String
681698
|| spanName.contains("SELECT")
682699
|| spanName.contains(DB_TABLE_PERSON))) {
683700
databaseSpan =
684-
new SpanInfo(spanName, traceId, spanId, parentSpanId, spanKind);
701+
new SpanInfo(traceId, spanId, parentSpanId, spanKind);
685702
}
686703
}
687704
}
@@ -716,7 +733,6 @@ private static class SpanInfo {
716733
final String kind;
717734

718735
SpanInfo(
719-
String name,
720736
String traceId,
721737
String spanId,
722738
String parentSpanId,

0 commit comments

Comments
 (0)