Skip to content

Commit f9466ed

Browse files
committed
./gradlew spotlessApply
1 parent 3bbaa07 commit f9466ed

File tree

2 files changed

+32
-23
lines changed

2 files changed

+32
-23
lines changed

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectTask.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,17 @@ public Collection<SinkRecord> getRecords() {
2424
}
2525

2626
/**
27-
* Returns the first record in the batch, used for extracting destination information.
28-
* Note: Records in a batch can come from multiple topics, so this should only be used
29-
* when you need a representative record, not for definitive topic information.
27+
* Returns the first record in the batch, used for extracting destination information. Note:
28+
* Records in a batch can come from multiple topics, so this should only be used when you need a
29+
* representative record, not for definitive topic information.
3030
*/
3131
public SinkRecord getFirstRecord() {
3232
return records.isEmpty() ? null : records.iterator().next();
3333
}
3434

3535
/**
36-
* Returns all unique topic names present in this batch of records.
37-
* This provides accurate information about all topics being processed.
36+
* Returns all unique topic names present in this batch of records. This provides accurate
37+
* information about all topics being processed.
3838
*/
3939
public Set<String> getTopics() {
4040
return records.stream()
@@ -43,9 +43,9 @@ public Set<String> getTopics() {
4343
}
4444

4545
/**
46-
* Returns a single topic name if all records are from the same topic,
47-
* or a formatted string representing multiple topics if they differ.
48-
* This is useful for span naming that needs to be concise but informative.
46+
* Returns a single topic name if all records are from the same topic, or a formatted string
47+
* representing multiple topics if they differ. This is useful for span naming that needs to be
48+
* concise but informative.
4949
*/
5050
public String getDestinationName() {
5151
Set<String> topics = getTopics();

instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/MongoKafkaConnectSinkTaskTest.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ public void testKafkaConnectMongoSinkTaskMultiTopicInstrumentation()
417417
String topicName1 = TOPIC_NAME + "-1";
418418
String topicName2 = TOPIC_NAME + "-2";
419419
String topicName3 = TOPIC_NAME + "-3";
420-
420+
421421
// Setup Kafka Connect MongoDB Sink connector with multiple topics
422422
setupMongoSinkConnectorMultiTopic(topicName1, topicName2, topicName3);
423423

@@ -437,9 +437,15 @@ public void testKafkaConnectMongoSinkTaskMultiTopicInstrumentation()
437437

438438
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
439439
// Send messages to different topics
440-
producer.send(new ProducerRecord<>(topicName1, "key1", "{\"id\":1,\"name\":\"User1\",\"source\":\"topic1\"}"));
441-
producer.send(new ProducerRecord<>(topicName2, "key2", "{\"id\":2,\"name\":\"User2\",\"source\":\"topic2\"}"));
442-
producer.send(new ProducerRecord<>(topicName3, "key3", "{\"id\":3,\"name\":\"User3\",\"source\":\"topic3\"}"));
440+
producer.send(
441+
new ProducerRecord<>(
442+
topicName1, "key1", "{\"id\":1,\"name\":\"User1\",\"source\":\"topic1\"}"));
443+
producer.send(
444+
new ProducerRecord<>(
445+
topicName2, "key2", "{\"id\":2,\"name\":\"User2\",\"source\":\"topic2\"}"));
446+
producer.send(
447+
new ProducerRecord<>(
448+
topicName3, "key3", "{\"id\":3,\"name\":\"User3\",\"source\":\"topic3\"}"));
443449
producer.flush();
444450
}
445451

@@ -474,8 +480,11 @@ public void testKafkaConnectMongoSinkTaskMultiTopicInstrumentation()
474480

475481
// Write resourceSpans to desktop file for inspection
476482
try {
477-
java.nio.file.Path desktopPath = java.nio.file.Paths.get(System.getProperty("user.home"), "Desktop", "kafka-connect-multi-topic-spans.json");
478-
java.nio.file.Files.write(desktopPath, tracesJson.getBytes(java.nio.charset.StandardCharsets.UTF_8));
483+
java.nio.file.Path desktopPath =
484+
java.nio.file.Paths.get(
485+
System.getProperty("user.home"), "Desktop", "kafka-connect-multi-topic-spans.json");
486+
java.nio.file.Files.write(
487+
desktopPath, tracesJson.getBytes(java.nio.charset.StandardCharsets.UTF_8));
479488
System.out.println("✅ Wrote resourceSpans to: " + desktopPath.toString());
480489
} catch (Exception e) {
481490
System.err.println("❌ Failed to write spans to desktop: " + e.getMessage());
@@ -484,7 +493,8 @@ public void testKafkaConnectMongoSinkTaskMultiTopicInstrumentation()
484493
// Extract spans and verify multi-topic span naming
485494
MultiTopicTracingData tracingData;
486495
try {
487-
tracingData = deserializeAndExtractMultiTopicSpans(tracesJson, topicName1, topicName2, topicName3);
496+
tracingData =
497+
deserializeAndExtractMultiTopicSpans(tracesJson, topicName1, topicName2, topicName3);
488498
} catch (Exception e) {
489499
logger.error("Failed to deserialize and extract spans: {}", e.getMessage(), e);
490500
throw new AssertionError("Span deserialization failed", e);
@@ -495,7 +505,7 @@ public void testKafkaConnectMongoSinkTaskMultiTopicInstrumentation()
495505
assertThat(tracingData.kafkaConnectConsumerSpan)
496506
.as("Kafka Connect Consumer span should be found for multi-topic processing")
497507
.isNotNull();
498-
508+
499509
// Assertion 2: Verify span name contains all topics in bracket format (order may vary)
500510
assertThat(tracingData.kafkaConnectConsumerSpan.name)
501511
.as("Span name should contain all topics in bracket format")
@@ -578,7 +588,8 @@ private static void setupMongoSinkConnectorMultiTopic(String... topicNames) thro
578588
"com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy");
579589

580590
String payload =
581-
MAPPER.writeValueAsString(ImmutableMap.of("name", CONNECTOR_NAME + "-multi", "config", configMap));
591+
MAPPER.writeValueAsString(
592+
ImmutableMap.of("name", CONNECTOR_NAME + "-multi", "config", configMap));
582593
given()
583594
.log()
584595
.headers()
@@ -816,8 +827,8 @@ private static TracingData deserializeAndExtractSpans(String tracesJson, String
816827
}
817828

818829
/** Deserialize traces JSON and extract span information for multi-topic scenarios */
819-
private static MultiTopicTracingData deserializeAndExtractMultiTopicSpans(String tracesJson, String... expectedTopicNames)
820-
throws IOException {
830+
private static MultiTopicTracingData deserializeAndExtractMultiTopicSpans(
831+
String tracesJson, String... expectedTopicNames) throws IOException {
821832
ObjectMapper objectMapper = new ObjectMapper();
822833
JsonNode rootNode = objectMapper.readTree(tracesJson);
823834

@@ -875,7 +886,7 @@ private static MultiTopicTracingData deserializeAndExtractMultiTopicSpans(String
875886
break;
876887
}
877888
}
878-
889+
879890
if (containsExpectedTopics) {
880891
kafkaConnectConsumerSpan =
881892
new SpanInfo(spanName, traceId, spanId, parentSpanId, spanKind, scopeName);
@@ -912,9 +923,7 @@ private static class MultiTopicTracingData {
912923
final SpanLinkInfo extractedSpanLink;
913924

914925
MultiTopicTracingData(
915-
SpanInfo kafkaConnectConsumerSpan,
916-
SpanInfo databaseSpan,
917-
SpanLinkInfo extractedSpanLink) {
926+
SpanInfo kafkaConnectConsumerSpan, SpanInfo databaseSpan, SpanLinkInfo extractedSpanLink) {
918927
this.kafkaConnectConsumerSpan = kafkaConnectConsumerSpan;
919928
this.databaseSpan = databaseSpan;
920929
this.extractedSpanLink = extractedSpanLink;

0 commit comments

Comments
 (0)