Skip to content

Commit 9bc7847

Browse files
committed
wip: adding a static SinkRecordHeadersGetter in KafkaConnectSingletons.
1 parent e16a085 commit 9bc7847

File tree

4 files changed

+166
-64
lines changed

4 files changed

+166
-64
lines changed

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,19 @@
66
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
77

88
import io.opentelemetry.api.GlobalOpenTelemetry;
9+
import io.opentelemetry.context.propagation.TextMapGetter;
910
import io.opentelemetry.context.propagation.TextMapPropagator;
1011
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
12+
import org.apache.kafka.connect.sink.SinkRecord;
1113

1214
public final class KafkaConnectSingletons {
1315

1416
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-connect-2.6";
1517
private static final TextMapPropagator PROPAGATOR =
1618
GlobalOpenTelemetry.get().getPropagators().getTextMapPropagator();
1719

20+
private static final TextMapGetter<SinkRecord> SINK_RECORD_HEADER_GETTER = SinkRecordHeadersGetter.INSTANCE;
21+
1822
private static final Instrumenter<KafkaConnectTask, Void> INSTRUMENTER =
1923
Instrumenter.<KafkaConnectTask, Void>builder(
2024
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, KafkaConnectTask::getSpanName)
@@ -29,5 +33,9 @@ public static TextMapPropagator propagator() {
2933
return PROPAGATOR;
3034
}
3135

36+
public static TextMapGetter<SinkRecord> sinkRecordHeaderGetter() {
37+
return SINK_RECORD_HEADER_GETTER;
38+
}
39+
3240
private KafkaConnectSingletons() {}
3341
}

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
import io.opentelemetry.context.Context;
1414
import io.opentelemetry.context.Scope;
15-
import io.opentelemetry.context.propagation.TextMapGetter;
1615
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
1716
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
1817
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
@@ -21,9 +20,6 @@
2120
import net.bytebuddy.description.type.TypeDescription;
2221
import net.bytebuddy.matcher.ElementMatcher;
2322
import org.apache.kafka.connect.sink.SinkRecord;
24-
import org.apache.kafka.connect.header.Header;
25-
import org.apache.kafka.connect.header.Headers;
26-
import java.nio.charset.StandardCharsets;
2723
import java.util.logging.Logger;
2824

2925
public class SinkTaskInstrumentation implements TypeInstrumentation {
@@ -81,7 +77,8 @@ public static void onEnter(
8177
// Extract context from first record if available
8278
if (!records.isEmpty()) {
8379
SinkRecord firstRecord = records.iterator().next();
84-
Context extractedContext = extractContextFromRecord(parentContext, firstRecord);
80+
Context extractedContext = KafkaConnectSingletons.propagator()
81+
.extract(parentContext, firstRecord, KafkaConnectSingletons.sinkRecordHeaderGetter());
8582
parentContext = extractedContext;
8683
}
8784

@@ -112,36 +109,6 @@ public static void onExit(
112109
logger.info("KafkaConnect: Span ended");
113110
}
114111

115-
public static Context extractContextFromRecord(Context parentContext, SinkRecord record) {
116-
// Create a simple TextMapGetter for SinkRecord headers
117-
return KafkaConnectSingletons.propagator().extract(parentContext, record, new TextMapGetter<SinkRecord>() {
118-
@Override
119-
public Iterable<String> keys(SinkRecord sinkRecord) {
120-
Headers headers = sinkRecord.headers();
121-
java.util.List<String> keys = new java.util.ArrayList<>();
122-
for (Header header : headers) {
123-
keys.add(header.key());
124-
}
125-
return keys;
126-
}
127-
128-
@Override
129-
public String get(SinkRecord sinkRecord, String key) {
130-
if (sinkRecord == null) {
131-
return null;
132-
}
133-
Headers headers = sinkRecord.headers();
134-
Header header = headers.lastWithName(key);
135-
if (header == null) {
136-
return null;
137-
}
138-
Object value = header.value();
139-
if (value instanceof byte[]) {
140-
return new String((byte[]) value, StandardCharsets.UTF_8);
141-
}
142-
return value != null ? value.toString() : null;
143-
}
144-
});
145-
}
112+
146113
}
147114
}

instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/MongoKafkaConnectSinkTaskTest.java

Lines changed: 81 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,9 @@ public static void setup() throws IOException {
135135
.withExposedPorts(BACKEND_PORT)
136136
.withNetwork(network)
137137
.withNetworkAliases(BACKEND_ALIAS)
138-
.waitingFor(Wait.forHttp("/health").forPort(BACKEND_PORT))
139-
.withStartupTimeout(Duration.of(2, MINUTES));
138+
.waitingFor(Wait.forHttp("/health").forPort(BACKEND_PORT).withStartupTimeout(Duration.of(5, MINUTES)))
139+
.withStartupTimeout(Duration.of(5, MINUTES))
140+
.withEnv("DOCKER_DEFAULT_PLATFORM", "linux/amd64"); // Force AMD64 for stability on ARM64 hosts
140141

141142
logger.info("Starting backend container...");
142143
backend.start();
@@ -150,7 +151,7 @@ public static void setup() throws IOException {
150151
.withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(ZOOKEEPER_INTERNAL_PORT))
151152
.withEnv("ZOOKEEPER_TICK_TIME", "2000")
152153
.withExposedPorts(ZOOKEEPER_INTERNAL_PORT)
153-
.withStartupTimeout(Duration.of(3, MINUTES));
154+
.withStartupTimeout(Duration.of(5, MINUTES));
154155
String zookeeperInternalUrl = ZOOKEEPER_NETWORK_ALIAS + ":" + ZOOKEEPER_INTERNAL_PORT;
155156

156157
kafkaExposedPort = getRandomFreePort();
@@ -186,7 +187,7 @@ public static void setup() throws IOException {
186187
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
187188
.withEnv("KAFKA_OPTS", "-Djava.net.preferIPv4Stack=True")
188189
.withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "100")
189-
.withStartupTimeout(Duration.of(3, MINUTES));
190+
.withStartupTimeout(Duration.of(5, MINUTES));
190191

191192
// Get the agent path from system properties
192193
String agentPath = System.getProperty("otel.javaagent.testing.javaagent-jar-path");
@@ -237,7 +238,8 @@ public static void setup() throws IOException {
237238
.withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1")
238239
.withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1")
239240
.withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1")
240-
.withStartupTimeout(Duration.of(3, MINUTES))
241+
.waitingFor(Wait.forHttp("/").forPort(CONNECT_REST_PORT_INTERNAL).withStartupTimeout(Duration.of(5, MINUTES)))
242+
.withStartupTimeout(Duration.of(5, MINUTES))
241243
.withCommand(
242244
"bash",
243245
"-c",
@@ -248,7 +250,7 @@ public static void setup() throws IOException {
248250
new MongoDBContainer(DockerImageName.parse("mongo:4.4"))
249251
.withNetwork(network)
250252
.withNetworkAliases(MONGO_NETWORK_ALIAS)
251-
.withStartupTimeout(Duration.of(3, MINUTES));
253+
.withStartupTimeout(Duration.of(5, MINUTES));
252254

253255
// Start containers (backend already started)
254256
Startables.deepStart(Stream.of(zookeeper, kafka, kafkaConnect, mongoDB)).join();
@@ -314,12 +316,12 @@ public void testKafkaConnectMongoSinkTaskInstrumentation() throws IOException, I
314316
producer.flush();
315317
}
316318

317-
// Wait for message processing
318-
await().atMost(Duration.ofSeconds(30)).until(() -> getRecordCountFromMongo() >= 1);
319+
// Wait for message processing (increased timeout for ARM64 Docker emulation)
320+
await().atMost(Duration.ofSeconds(60)).until(() -> getRecordCountFromMongo() >= 1);
319321

320-
// Wait for spans to arrive at backend
322+
// Wait for spans to arrive at backend (increased timeout for ARM64 Docker emulation)
321323
String backendUrl = getBackendUrl();
322-
await().atMost(Duration.ofSeconds(15)).until(() -> {
324+
await().atMost(Duration.ofSeconds(30)).until(() -> {
323325
try {
324326
String traces = given()
325327
.when()
@@ -444,7 +446,7 @@ private static void createTopic(String topicName) {
444446
private static void awaitForTopicCreation(String topicName) {
445447
try (AdminClient adminClient = createAdminClient()) {
446448
await()
447-
.atMost(Duration.ofSeconds(30))
449+
.atMost(Duration.ofSeconds(60))
448450
.pollInterval(Duration.ofMillis(500))
449451
.until(() -> adminClient.listTopics().names().get().contains(topicName));
450452
}
@@ -503,14 +505,78 @@ private static void deleteConnectorIfExists() {
503505

504506
@AfterAll
505507
public static void cleanup() {
508+
// Close AdminClient first to release Kafka connections
509+
if (adminClient != null) {
510+
try {
511+
adminClient.close();
512+
logger.info("AdminClient closed");
513+
} catch (RuntimeException e) {
514+
logger.error("Error closing AdminClient: " + e.getMessage());
515+
}
516+
}
517+
518+
// Stop all containers in reverse order of startup to ensure clean shutdown
519+
if (kafkaConnect != null) {
520+
try {
521+
kafkaConnect.stop();
522+
logger.info("Kafka Connect container stopped");
523+
} catch (RuntimeException e) {
524+
logger.error("Error stopping Kafka Connect: " + e.getMessage());
525+
}
526+
}
527+
528+
if (mongoDB != null) {
529+
try {
530+
mongoDB.stop();
531+
logger.info("MongoDB container stopped");
532+
} catch (RuntimeException e) {
533+
logger.error("Error stopping MongoDB: " + e.getMessage());
534+
}
535+
}
536+
537+
if (kafka != null) {
538+
try {
539+
kafka.stop();
540+
logger.info("Kafka container stopped");
541+
} catch (RuntimeException e) {
542+
logger.error("Error stopping Kafka: " + e.getMessage());
543+
}
544+
}
545+
546+
if (zookeeper != null) {
547+
try {
548+
zookeeper.stop();
549+
logger.info("Zookeeper container stopped");
550+
} catch (RuntimeException e) {
551+
logger.error("Error stopping Zookeeper: " + e.getMessage());
552+
}
553+
}
554+
506555
if (backend != null) {
507-
backend.stop();
508-
logger.info("Backend container stopped");
556+
try {
557+
backend.stop();
558+
logger.info("Backend container stopped");
559+
} catch (RuntimeException e) {
560+
logger.error("Error stopping backend: " + e.getMessage());
561+
}
509562
}
563+
510564
if (network != null) {
511-
network.close();
512-
logger.info("Network closed");
565+
try {
566+
network.close();
567+
logger.info("Network closed");
568+
} catch (RuntimeException e) {
569+
logger.error("Error closing network: " + e.getMessage());
570+
}
513571
}
572+
573+
// Small delay to ensure containers are fully stopped before next test
574+
try {
575+
Thread.sleep(2000);
576+
} catch (InterruptedException e) {
577+
Thread.currentThread().interrupt();
578+
}
579+
514580
logger.info("Test cleanup complete");
515581
}
516582
}

0 commit comments

Comments
 (0)