Skip to content

Commit 2f95738

Browse files
committed
polish: refactor around the span naming for multi topic consumers and removing redundant code in test cases.
1 parent 73fc079 commit 2f95738

File tree

7 files changed

+54
-115
lines changed

7 files changed

+54
-115
lines changed

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectAttributesGetter.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,11 @@
77

88
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
99
import java.nio.charset.StandardCharsets;
10-
import java.util.Collections;
1110
import java.util.List;
1211
import java.util.stream.Collectors;
1312
import java.util.stream.StreamSupport;
1413
import javax.annotation.Nullable;
1514
import org.apache.kafka.connect.header.Header;
16-
import org.apache.kafka.connect.sink.SinkRecord;
1715

1816
enum KafkaConnectAttributesGetter implements MessagingAttributesGetter<KafkaConnectTask, Void> {
1917
INSTANCE;
@@ -83,12 +81,9 @@ public Long getBatchMessageCount(KafkaConnectTask request, @Nullable Void unused
8381

8482
@Override
8583
public List<String> getMessageHeader(KafkaConnectTask request, String name) {
86-
SinkRecord firstRecord = request.getFirstRecord();
87-
if (firstRecord == null || firstRecord.headers() == null) {
88-
return Collections.emptyList();
89-
}
90-
91-
return StreamSupport.stream(firstRecord.headers().spliterator(), false)
84+
return request.getRecords().stream()
85+
.filter(record -> record.headers() != null)
86+
.flatMap(record -> StreamSupport.stream(record.headers().spliterator(), false))
9287
.filter(header -> name.equals(header.key()) && header.value() != null)
9388
.map(header -> convertHeaderValue(header))
9489
.collect(Collectors.toList());

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,19 @@
66
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
77

88
import io.opentelemetry.api.GlobalOpenTelemetry;
9-
import io.opentelemetry.context.propagation.TextMapGetter;
109
import io.opentelemetry.context.propagation.TextMapPropagator;
1110
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
1211
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
1312
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
1413
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
1514
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
16-
import org.apache.kafka.connect.sink.SinkRecord;
1715

1816
public final class KafkaConnectSingletons {
1917

2018
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-connect-2.6";
2119
private static final TextMapPropagator PROPAGATOR =
2220
GlobalOpenTelemetry.get().getPropagators().getTextMapPropagator();
2321

24-
private static final TextMapGetter<SinkRecord> SINK_RECORD_HEADER_GETTER =
25-
SinkRecordHeadersGetter.INSTANCE;
26-
2722
private static final Instrumenter<KafkaConnectTask, Void> INSTRUMENTER;
2823

2924
static {
@@ -48,13 +43,5 @@ public static Instrumenter<KafkaConnectTask, Void> instrumenter() {
4843
return INSTRUMENTER;
4944
}
5045

51-
public static TextMapPropagator propagator() {
52-
return PROPAGATOR;
53-
}
54-
55-
public static TextMapGetter<SinkRecord> sinkRecordHeaderGetter() {
56-
return SINK_RECORD_HEADER_GETTER;
57-
}
58-
5946
private KafkaConnectSingletons() {}
6047
}

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectTask.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,7 @@ public Collection<SinkRecord> getRecords() {
2323
return records;
2424
}
2525

26-
public SinkRecord getFirstRecord() {
27-
return records.isEmpty() ? null : records.iterator().next();
28-
}
29-
30-
public Set<String> getTopics() {
26+
private Set<String> getTopics() {
3127
return records.stream()
3228
.map(SinkRecord::topic)
3329
.collect(Collectors.toCollection(LinkedHashSet::new));
@@ -38,9 +34,12 @@ public String getDestinationName() {
3834
if (topics.isEmpty()) {
3935
return null;
4036
}
37+
// Return the topic name only if all records are from the same topic.
38+
// When records are from multiple topics, return null as there is no standard way
39+
// to represent multiple destination names in messaging.destination.name attribute.
4140
if (topics.size() == 1) {
4241
return topics.iterator().next();
4342
}
44-
return topics.stream().collect(Collectors.joining(",", "[", "]"));
43+
return null;
4544
}
4645
}

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkRecordHeadersGetter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,21 @@
1212
import org.apache.kafka.connect.header.Header;
1313
import org.apache.kafka.connect.sink.SinkRecord;
1414

15+
import static java.util.Collections.emptyList;
16+
import static java.util.stream.Collectors.toList;
17+
1518
enum SinkRecordHeadersGetter implements TextMapGetter<SinkRecord> {
1619
INSTANCE;
1720

1821
@Override
1922
public Iterable<String> keys(SinkRecord record) {
2023
if (record.headers() == null) {
21-
return java.util.Collections.emptyList();
24+
return emptyList();
2225
}
2326

2427
return StreamSupport.stream(record.headers().spliterator(), false)
2528
.map(Header::key)
26-
.collect(java.util.stream.Collectors.toList());
29+
.collect(toList());
2730
}
2831

2932
@Override

instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/KafkaConnectSinkTaskBaseTest.java

Lines changed: 13 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.testcontainers.containers.output.Slf4jLogConsumer;
4545
import org.testcontainers.containers.wait.strategy.Wait;
4646
import org.testcontainers.lifecycle.Startables;
47-
import org.testcontainers.shaded.com.google.common.base.VerifyException;
4847
import org.testcontainers.utility.DockerImageName;
4948
import org.testcontainers.utility.MountableFile;
5049

@@ -105,8 +104,6 @@ public abstract class KafkaConnectSinkTaskBaseTest implements TelemetryRetriever
105104

106105
protected abstract String getConnectorName();
107106

108-
protected abstract String getTopicName();
109-
110107
// Static methods
111108
protected static String getKafkaConnectUrl() {
112109
return format(
@@ -150,7 +147,7 @@ protected static void clearTelemetryGracefully() {
150147
}
151148

152149
@BeforeAll
153-
public void setupBase() throws IOException {
150+
public void setupBase() {
154151
network = Network.newNetwork();
155152

156153
// Start backend container first (like smoke tests)
@@ -333,15 +330,14 @@ protected void createTopic(String topicName) {
333330
try (AdminClient adminClient = createAdminClient()) {
334331
NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
335332
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
333+
} catch (InterruptedException e) {
334+
Thread.currentThread().interrupt();
335+
throw new RuntimeException("Failed to create topic: " + topicName, e);
336336
} catch (Exception e) {
337-
if (e instanceof InterruptedException) {
338-
Thread.currentThread().interrupt();
339-
throw new VerifyException("Failed to create topic: " + topicName, e);
340-
} else if (e.getCause() instanceof org.apache.kafka.common.errors.TopicExistsException) {
337+
if (e.getCause() instanceof org.apache.kafka.common.errors.TopicExistsException) {
341338
// Topic already exists, continue
342339
} else {
343-
logger.error("Error creating topic: {}", e.getMessage());
344-
throw new VerifyException("Failed to create topic: " + topicName, e);
340+
throw new RuntimeException("Failed to create topic: " + topicName, e);
345341
}
346342
}
347343
}
@@ -352,6 +348,8 @@ protected void awaitForTopicCreation(String topicName) {
352348
.atMost(Duration.ofSeconds(60))
353349
.pollInterval(Duration.ofMillis(500))
354350
.until(() -> adminClient.listTopics().names().get().contains(topicName));
351+
} catch (RuntimeException e) {
352+
throw new RuntimeException("Failed to await topic creation: " + topicName, e);
355353
}
356354
}
357355

@@ -381,45 +379,25 @@ public void cleanupBase() {
381379

382380
// Stop all containers in reverse order of startup to ensure clean shutdown
383381
if (kafkaConnect != null) {
384-
try {
385-
kafkaConnect.stop();
386-
} catch (RuntimeException e) {
387-
logger.error("Error stopping Kafka Connect: " + e.getMessage());
388-
}
382+
kafkaConnect.stop();
389383
}
390384

391385
stopDatabaseContainer();
392386

393387
if (kafka != null) {
394-
try {
395-
kafka.stop();
396-
} catch (RuntimeException e) {
397-
logger.error("Error stopping Kafka: " + e.getMessage());
398-
}
388+
kafka.stop();
399389
}
400390

401391
if (zookeeper != null) {
402-
try {
403-
zookeeper.stop();
404-
} catch (RuntimeException e) {
405-
logger.error("Error stopping Zookeeper: " + e.getMessage());
406-
}
392+
zookeeper.stop();
407393
}
408394

409395
if (backend != null) {
410-
try {
411-
backend.stop();
412-
} catch (RuntimeException e) {
413-
logger.error("Error stopping backend: " + e.getMessage());
414-
}
396+
backend.stop();
415397
}
416398

417399
if (network != null) {
418-
try {
419-
network.close();
420-
} catch (RuntimeException e) {
421-
logger.error("Error closing network: " + e.getMessage());
422-
}
400+
network.close();
423401
}
424402
}
425403
}

instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/MongoKafkaConnectSinkTaskTest.java

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package io.opentelemetry.instrumentation.kafkaconnect.v2_6;
77

8+
import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
89
import static io.restassured.RestAssured.given;
910
import static java.lang.String.format;
1011
import static org.assertj.core.api.Assertions.assertThat;
@@ -63,11 +64,7 @@ protected void startDatabaseContainer() {
6364
@Override
6465
protected void stopDatabaseContainer() {
6566
if (mongoDB != null) {
66-
try {
67-
mongoDB.stop();
68-
} catch (RuntimeException e) {
69-
logger.error("Error stopping MongoDB: " + e.getMessage());
70-
}
67+
mongoDB.stop();
7168
}
7269
}
7370

@@ -87,10 +84,6 @@ protected String getConnectorName() {
8784
return CONNECTOR_NAME;
8885
}
8986

90-
@Override
91-
protected String getTopicName() {
92-
return TOPIC_NAME;
93-
}
9487

9588
@Test
9689
public void testKafkaConnectMongoSinkTaskInstrumentation()
@@ -127,8 +120,7 @@ public void testKafkaConnectMongoSinkTaskInstrumentation()
127120
span ->
128121
span.getName().contains(uniqueTopicName)
129122
&& span.getKind()
130-
== io.opentelemetry.api.trace.SpanKind
131-
.CONSUMER);
123+
== CONSUMER);
132124

133125
boolean hasInsertSpan =
134126
trace.stream()
@@ -152,7 +144,7 @@ public void testKafkaConnectMongoSinkTaskInstrumentation()
152144
.filter(
153145
span ->
154146
span.getName().contains(uniqueTopicName)
155-
&& span.getKind() == io.opentelemetry.api.trace.SpanKind.CONSUMER)
147+
&& span.getKind() == CONSUMER)
156148
.findFirst()
157149
.orElse(null);
158150
assertThat(kafkaConnectSpan).isNotNull();
@@ -223,7 +215,7 @@ public void testKafkaConnectMongoSinkTaskMultiTopicInstrumentation()
223215
(span.getName().contains(topicName1)
224216
|| span.getName().contains(topicName2)
225217
|| span.getName().contains(topicName3))
226-
&& span.getKind() == io.opentelemetry.api.trace.SpanKind.CONSUMER)
218+
&& span.getKind() == CONSUMER)
227219
.count();
228220

229221
long totalUpdateSpans =
@@ -238,16 +230,16 @@ public void testKafkaConnectMongoSinkTaskMultiTopicInstrumentation()
238230
assertThat(totalKafkaConnectSpans).isGreaterThanOrEqualTo(1);
239231
assertThat(totalUpdateSpans).isGreaterThanOrEqualTo(3);
240232

241-
boolean hasMultiTopicSpan =
233+
// When records are from multiple topics, the destination name is null,
234+
// resulting in a span name like "kafka process" without topic names.
235+
// Individual topic spans may still be created if processed separately.
236+
boolean hasGenericProcessSpan =
242237
traces.stream()
243238
.flatMap(trace -> trace.stream())
244239
.anyMatch(
245240
span ->
246-
span.getName().contains("[")
247-
&& span.getName().contains("]")
248-
&& span.getName().contains("process")
249-
&& span.getKind()
250-
== io.opentelemetry.api.trace.SpanKind.CONSUMER);
241+
span.getName().equals("kafka process")
242+
&& span.getKind() == CONSUMER);
251243

252244
boolean hasIndividualSpans =
253245
traces.stream()
@@ -257,11 +249,9 @@ public void testKafkaConnectMongoSinkTaskMultiTopicInstrumentation()
257249
(span.getName().contains(topicName1)
258250
|| span.getName().contains(topicName2)
259251
|| span.getName().contains(topicName3))
260-
&& !span.getName().contains("[")
261-
&& span.getKind()
262-
== io.opentelemetry.api.trace.SpanKind.CONSUMER);
252+
&& span.getKind() == CONSUMER);
263253

264-
assertThat(hasMultiTopicSpan || hasIndividualSpans).isTrue();
254+
assertThat(hasGenericProcessSpan || hasIndividualSpans).isTrue();
265255
});
266256
}
267257

@@ -335,8 +325,6 @@ private static long getRecordCountFromMongo() {
335325
MongoDatabase database = mongoClient.getDatabase(DB_NAME);
336326
MongoCollection<Document> collection = database.getCollection(COLLECTION_NAME);
337327
return collection.countDocuments();
338-
} catch (RuntimeException e) {
339-
return 0;
340328
}
341329
}
342330

@@ -345,8 +333,6 @@ private static void clearMongoCollection() {
345333
MongoDatabase database = mongoClient.getDatabase(DB_NAME);
346334
MongoCollection<Document> collection = database.getCollection(COLLECTION_NAME);
347335
collection.drop();
348-
} catch (RuntimeException e) {
349-
// Ignore cleanup failures
350336
}
351337
}
352338
}

0 commit comments

Comments
 (0)