diff --git a/.fossa.yml b/.fossa.yml
index 04356f396aa3..8e2e16ca330b 100644
--- a/.fossa.yml
+++ b/.fossa.yml
@@ -619,6 +619,9 @@ targets:
- type: gradle
path: ./
target: ':instrumentation:jsf:jsf-myfaces-3.0:javaagent'
+ - type: gradle
+ path: ./
+ target: ':instrumentation:kafka:kafka-connect-2.6:javaagent'
- type: gradle
path: ./
target: ':instrumentation:kafka:kafka-streams-0.11:javaagent'
diff --git a/docs/supported-libraries.md b/docs/supported-libraries.md
index 641580e5bc71..c965c0daae45 100644
--- a/docs/supported-libraries.md
+++ b/docs/supported-libraries.md
@@ -32,6 +32,7 @@ These are the supported libraries and frameworks:
| [Apache HttpAsyncClient](https://hc.apache.org/index.html) | 4.1+ | N/A | [HTTP Client Spans], [HTTP Client Metrics] |
| [Apache HttpClient](https://hc.apache.org/index.html) | 2.0+ | [opentelemetry-apache-httpclient-4.3](../instrumentation/apache-httpclient/apache-httpclient-4.3/library),
[opentelemetry-apache-httpclient-5.2](../instrumentation/apache-httpclient/apache-httpclient-5.2/library) | [HTTP Client Spans], [HTTP Client Metrics] |
| [Apache ShenYu](https://shenyu.apache.org/) | 2.4+ | N/A | Provides `http.route` [2] |
+| [Apache Kafka Connect API](https://kafka.apache.org/documentation/#connect) | 2.6+ | N/A | [Messaging Spans] |
| [Apache Kafka Producer/Consumer API](https://kafka.apache.org/documentation/#producerapi) | 0.11+ | [opentelemetry-kafka-clients-2.6](../instrumentation/kafka/kafka-clients/kafka-clients-2.6/library) | [Messaging Spans] |
| [Apache Kafka Streams API](https://kafka.apache.org/documentation/streams/) | 0.11+ | N/A | [Messaging Spans] |
| [Apache MyFaces](https://myfaces.apache.org/) | 1.2+ (not including 4.0+ yet) | N/A | Provides `http.route` [2], Controller Spans [3] |
diff --git a/instrumentation/kafka/kafka-connect-2.6/javaagent/build.gradle.kts b/instrumentation/kafka/kafka-connect-2.6/javaagent/build.gradle.kts
new file mode 100644
index 000000000000..037951e97d5d
--- /dev/null
+++ b/instrumentation/kafka/kafka-connect-2.6/javaagent/build.gradle.kts
@@ -0,0 +1,18 @@
+plugins {
+ id("otel.javaagent-instrumentation")
+}
+
+muzzle {
+ pass {
+ group.set("org.apache.kafka")
+ module.set("connect-api")
+ versions.set("[2.6.0,)")
+ assertInverse.set(true)
+ }
+}
+
+dependencies {
+ bootstrap(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:bootstrap"))
+ implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common-0.11:library"))
+ library("org.apache.kafka:connect-api:2.6.0")
+}
diff --git a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectAttributesGetter.java b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectAttributesGetter.java
new file mode 100644
index 000000000000..b42d52b73e6b
--- /dev/null
+++ b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectAttributesGetter.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
+
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.KAFKA;
+
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.kafka.connect.header.Header;
+
+enum KafkaConnectAttributesGetter implements MessagingAttributesGetter {
+ INSTANCE;
+
+ @Override
+ public String getSystem(KafkaConnectTask request) {
+ return KAFKA;
+ }
+
+ @Override
+ @Nullable
+ public String getDestination(KafkaConnectTask request) {
+ return request.getDestinationName();
+ }
+
+ @Nullable
+ @Override
+ public String getDestinationTemplate(KafkaConnectTask request) {
+ return null;
+ }
+
+ @Override
+ public boolean isTemporaryDestination(KafkaConnectTask request) {
+ return false;
+ }
+
+ @Override
+ public boolean isAnonymousDestination(KafkaConnectTask request) {
+ return false;
+ }
+
+ @Override
+ @Nullable
+ public String getConversationId(KafkaConnectTask request) {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Long getMessageBodySize(KafkaConnectTask request) {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Long getMessageEnvelopeSize(KafkaConnectTask request) {
+ return null;
+ }
+
+ @Override
+ @Nullable
+ public String getMessageId(KafkaConnectTask request, @Nullable Void unused) {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String getClientId(KafkaConnectTask request) {
+ return null;
+ }
+
+ @Override
+ public Long getBatchMessageCount(KafkaConnectTask request, @Nullable Void unused) {
+ return (long) request.getRecords().size();
+ }
+
+ @Override
+ public List getMessageHeader(KafkaConnectTask request, String name) {
+ return request.getRecords().stream()
+ .filter(record -> record.headers() != null)
+ .flatMap(record -> StreamSupport.stream(record.headers().spliterator(), false))
+ .filter(header -> name.equals(header.key()) && header.value() != null)
+ .map(header -> convertHeaderValue(header))
+ .collect(Collectors.toList());
+ }
+
+ private static String convertHeaderValue(Header header) {
+ Object value = header.value();
+ if (value instanceof byte[]) {
+ return new String((byte[]) value, StandardCharsets.UTF_8);
+ }
+ return value.toString();
+ }
+}
diff --git a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectBatchProcessSpanLinksExtractor.java b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectBatchProcessSpanLinksExtractor.java
new file mode 100644
index 000000000000..85f1a815c749
--- /dev/null
+++ b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectBatchProcessSpanLinksExtractor.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
+
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder;
+import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
+import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+final class KafkaConnectBatchProcessSpanLinksExtractor
+ implements SpanLinksExtractor {
+
+ private final SpanLinksExtractor singleRecordLinkExtractor;
+
+ KafkaConnectBatchProcessSpanLinksExtractor(TextMapPropagator propagator) {
+ this.singleRecordLinkExtractor =
+ new PropagatorBasedSpanLinksExtractor<>(propagator, SinkRecordHeadersGetter.INSTANCE);
+ }
+
+ @Override
+ public void extract(SpanLinksBuilder spanLinks, Context parentContext, KafkaConnectTask request) {
+ for (SinkRecord record : request.getRecords()) {
+ singleRecordLinkExtractor.extract(spanLinks, parentContext, record);
+ }
+ }
+}
diff --git a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectInstrumentationModule.java b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectInstrumentationModule.java
new file mode 100644
index 000000000000..65a8204f29a0
--- /dev/null
+++ b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectInstrumentationModule.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
+
+import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
+import static java.util.Arrays.asList;
+
+import com.google.auto.service.AutoService;
+import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import java.util.List;
+import net.bytebuddy.matcher.ElementMatcher;
+
+@AutoService(InstrumentationModule.class)
+public class KafkaConnectInstrumentationModule extends InstrumentationModule {
+
+ public KafkaConnectInstrumentationModule() {
+ super("kafka-connect", "kafka-connect-2.6");
+ }
+
+ @Override
+ public List typeInstrumentations() {
+ return asList(new SinkTaskInstrumentation(), new WorkerSinkTaskInstrumentation());
+ }
+
+ @Override
+ public ElementMatcher.Junction classLoaderMatcher() {
+ // class added in 2.6.0
+ return hasClassesNamed("org.apache.kafka.connect.sink.SinkConnectorContext");
+ }
+}
diff --git a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java
new file mode 100644
index 000000000000..6dac80ccec7d
--- /dev/null
+++ b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
+import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
+import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
+
+public final class KafkaConnectSingletons {
+
+ private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-connect-2.6";
+ private static final TextMapPropagator PROPAGATOR =
+ GlobalOpenTelemetry.get().getPropagators().getTextMapPropagator();
+
+ private static final Instrumenter INSTRUMENTER;
+
+ static {
+ KafkaConnectBatchProcessSpanLinksExtractor spanLinksExtractor =
+ new KafkaConnectBatchProcessSpanLinksExtractor(PROPAGATOR);
+
+ INSTRUMENTER =
+ Instrumenter.builder(
+ GlobalOpenTelemetry.get(),
+ INSTRUMENTATION_NAME,
+ MessagingSpanNameExtractor.create(
+ KafkaConnectAttributesGetter.INSTANCE, MessageOperation.PROCESS))
+ .addAttributesExtractor(
+ MessagingAttributesExtractor.builder(
+ KafkaConnectAttributesGetter.INSTANCE, MessageOperation.PROCESS)
+ .build())
+ .addSpanLinksExtractor(spanLinksExtractor)
+ .buildInstrumenter(SpanKindExtractor.alwaysConsumer());
+ }
+
+ public static Instrumenter instrumenter() {
+ return INSTRUMENTER;
+ }
+
+ private KafkaConnectSingletons() {}
+}
diff --git a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectTask.java b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectTask.java
new file mode 100644
index 000000000000..d61656391627
--- /dev/null
+++ b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectTask.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
+
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public final class KafkaConnectTask {
+
+ private final Collection records;
+
+ public KafkaConnectTask(Collection records) {
+ this.records = records;
+ }
+
+ public Collection getRecords() {
+ return records;
+ }
+
+ private Set getTopics() {
+ return records.stream()
+ .map(SinkRecord::topic)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ }
+
+ public String getDestinationName() {
+ Set topics = getTopics();
+ if (topics.isEmpty()) {
+ return null;
+ }
+ // Return the topic name only if all records are from the same topic.
+ // When records are from multiple topics, return null as there is no standard way
+ // to represent multiple destination names in messaging.destination.name attribute.
+ if (topics.size() == 1) {
+ return topics.iterator().next();
+ }
+ return null;
+ }
+}
diff --git a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkRecordHeadersGetter.java b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkRecordHeadersGetter.java
new file mode 100644
index 000000000000..577bfc8b0fd3
--- /dev/null
+++ b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkRecordHeadersGetter.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
+
+import static java.util.Collections.emptyList;
+import static java.util.stream.Collectors.toList;
+
+import io.opentelemetry.context.propagation.TextMapGetter;
+import java.nio.charset.StandardCharsets;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+enum SinkRecordHeadersGetter implements TextMapGetter {
+ INSTANCE;
+
+ @Override
+ public Iterable keys(SinkRecord record) {
+ if (record.headers() == null) {
+ return emptyList();
+ }
+
+ return StreamSupport.stream(record.headers().spliterator(), false)
+ .map(Header::key)
+ .collect(toList());
+ }
+
+ @Override
+ @Nullable
+ public String get(@Nullable SinkRecord record, String key) {
+ if (record == null || record.headers() == null) {
+ return null;
+ }
+
+ Header header = record.headers().lastWithName(key);
+ if (header == null || header.value() == null) {
+ return null;
+ }
+
+ Object value = header.value();
+ if (value instanceof byte[]) {
+ return new String((byte[]) value, StandardCharsets.UTF_8);
+ }
+ return value.toString();
+ }
+}
diff --git a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java
new file mode 100644
index 000000000000..05ad924e7730
--- /dev/null
+++ b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
+
+import static io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6.KafkaConnectSingletons.instrumenter;
+import static net.bytebuddy.matcher.ElementMatchers.hasSuperType;
+import static net.bytebuddy.matcher.ElementMatchers.isPublic;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
+import java.util.Collection;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class SinkTaskInstrumentation implements TypeInstrumentation {
+
+ @Override
+ public ElementMatcher typeMatcher() {
+ return hasSuperType(named("org.apache.kafka.connect.sink.SinkTask"));
+ }
+
+ @Override
+ public void transform(TypeTransformer transformer) {
+ transformer.applyAdviceToMethod(
+ named("put").and(takesArgument(0, Collection.class)).and(isPublic()),
+ SinkTaskInstrumentation.class.getName() + "$SinkTaskPutAdvice");
+ }
+
+ @SuppressWarnings("unused")
+ public static class SinkTaskPutAdvice {
+
+ @Advice.OnMethodEnter(suppress = Throwable.class)
+ public static void onEnter(
+ @Advice.Argument(0) Collection records,
+ @Advice.Local("otelTask") KafkaConnectTask task,
+ @Advice.Local("otelContext") Context context,
+ @Advice.Local("otelScope") Scope scope) {
+
+ Context parentContext = Java8BytecodeBridge.currentContext();
+
+ task = new KafkaConnectTask(records);
+ if (!instrumenter().shouldStart(parentContext, task)) {
+ return;
+ }
+
+ context = instrumenter().start(parentContext, task);
+ scope = context.makeCurrent();
+ }
+
+ @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
+ public static void onExit(
+ @Advice.Thrown Throwable throwable,
+ @Advice.Local("otelTask") KafkaConnectTask task,
+ @Advice.Local("otelContext") Context context,
+ @Advice.Local("otelScope") Scope scope) {
+
+ if (scope == null) {
+ return;
+ }
+ scope.close();
+ instrumenter().end(context, task, null, throwable);
+ }
+ }
+}
diff --git a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/WorkerSinkTaskInstrumentation.java b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/WorkerSinkTaskInstrumentation.java
new file mode 100644
index 000000000000..a07203d96768
--- /dev/null
+++ b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/WorkerSinkTaskInstrumentation.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+
+/**
+ * This instrumentation is responsible for suppressing the underlying Kafka client consumer spans to
+ * avoid duplicate telemetry. Without this suppression, both high-level Kafka Connect spans (from
+ * {@link SinkTaskInstrumentation}) and low-level kafka-clients spans would be created for the same
+ * consumer operation. This ensures only the meaningful Kafka Connect spans are generated.
+ */
+public class WorkerSinkTaskInstrumentation implements TypeInstrumentation {
+
+ @Override
+ public ElementMatcher typeMatcher() {
+ return named("org.apache.kafka.connect.runtime.WorkerSinkTask");
+ }
+
+ @Override
+ public void transform(TypeTransformer transformer) {
+ // Instrument the execute method which contains the main polling loop
+ transformer.applyAdviceToMethod(named("execute"), this.getClass().getName() + "$ExecuteAdvice");
+ }
+
+ // This advice suppresses the CONSUMER spans created by the kafka-clients instrumentation
+ @SuppressWarnings("unused")
+ public static class ExecuteAdvice {
+
+ @Advice.OnMethodEnter(suppress = Throwable.class)
+ public static boolean onEnter() {
+ return KafkaClientsConsumerProcessTracing.setEnabled(false);
+ }
+
+ @Advice.OnMethodExit(suppress = Throwable.class)
+ public static void onExit(@Advice.Enter boolean previousValue) {
+ KafkaClientsConsumerProcessTracing.setEnabled(previousValue);
+ }
+ }
+}
diff --git a/instrumentation/kafka/kafka-connect-2.6/testing/build.gradle.kts b/instrumentation/kafka/kafka-connect-2.6/testing/build.gradle.kts
new file mode 100644
index 000000000000..1c6ce0e18ceb
--- /dev/null
+++ b/instrumentation/kafka/kafka-connect-2.6/testing/build.gradle.kts
@@ -0,0 +1,38 @@
+import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
+
+plugins {
+ id("otel.java-conventions")
+}
+
+otelJava {
+ minJavaVersionSupported.set(JavaVersion.VERSION_11)
+}
+
+val agentShadowJar = project(":javaagent").tasks.named("shadowJar")
+
+dependencies {
+ testImplementation(project(":smoke-tests"))
+ testImplementation(project(":testing-common"))
+ testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
+ testImplementation("org.apache.kafka:kafka-clients:3.6.1")
+ testImplementation("io.opentelemetry:opentelemetry-exporter-logging")
+ testImplementation("io.opentelemetry:opentelemetry-exporter-otlp")
+ testImplementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-2.6:library"))
+
+ testImplementation("org.testcontainers:testcontainers-postgresql") // For PostgreSQLContainer
+ testImplementation("org.postgresql:postgresql:42.7.2") // PostgreSQL JDBC driver
+ testImplementation("org.testcontainers:testcontainers-mongodb") // For MongoDBContainer
+ testImplementation("org.mongodb:mongodb-driver-sync:4.11.0") // MongoDB Java driver
+
+ // Testcontainers dependencies for integration testing
+ testImplementation("org.testcontainers:testcontainers-junit-jupiter")
+ testImplementation("org.testcontainers:testcontainers")
+ testImplementation("org.testcontainers:testcontainers-kafka")
+ testImplementation("io.rest-assured:rest-assured:5.5.5")
+ testImplementation("com.fasterxml.jackson.core:jackson-databind")
+}
+
+tasks.withType().configureEach {
+ dependsOn(agentShadowJar)
+ systemProperty("io.opentelemetry.smoketest.agent.shadowJar.path", agentShadowJar.get().archiveFile.get().toString())
+}
diff --git a/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/KafkaConnectSinkTaskBaseTest.java b/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/KafkaConnectSinkTaskBaseTest.java
new file mode 100644
index 000000000000..05450782843d
--- /dev/null
+++ b/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/KafkaConnectSinkTaskBaseTest.java
@@ -0,0 +1,374 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.kafkaconnect.v2_6;
+
+import static io.restassured.RestAssured.given;
+import static java.lang.String.format;
+import static java.time.temporal.ChronoUnit.MINUTES;
+import static org.awaitility.Awaitility.await;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.propagation.ContextPropagators;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import io.opentelemetry.exporter.logging.LoggingSpanExporter;
+import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
+import io.opentelemetry.instrumentation.test.utils.PortUtils;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
+import io.opentelemetry.smoketest.SmokeTestInstrumentationExtension;
+import io.opentelemetry.smoketest.TelemetryRetriever;
+import io.opentelemetry.smoketest.TelemetryRetrieverProvider;
+import io.restassured.http.ContentType;
+import java.time.Duration;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.stream.Stream;
+import org.apache.http.HttpStatus;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.Producer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.condition.DisabledIf;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.FixedHostPortGenericContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+// Suppressing warnings for test dependencies and deprecated Testcontainers API
+@SuppressWarnings({"deprecation"})
+@DisabledIf("io.opentelemetry.smoketest.TestContainerManager#useWindowsContainers")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class KafkaConnectSinkTaskBaseTest implements TelemetryRetrieverProvider {
+
+ @RegisterExtension
+ protected static final InstrumentationExtension testing =
+ SmokeTestInstrumentationExtension.create();
+
+ // Using the same fake backend pattern as smoke tests (with ARM64 support)
+ protected static GenericContainer> backend;
+ protected static TelemetryRetriever telemetryRetriever;
+
+ protected static final String CONFLUENT_VERSION = "7.5.9";
+
+ // Ports
+ protected static final int KAFKA_INTERNAL_PORT = 9092;
+ protected static final int ZOOKEEPER_INTERNAL_PORT = 2181;
+ protected static final int KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT = 29092;
+ protected static final int CONNECT_REST_PORT_INTERNAL = 8083;
+
+ // Network Aliases
+ protected static final String KAFKA_NETWORK_ALIAS = "kafka";
+ protected static final String ZOOKEEPER_NETWORK_ALIAS = "zookeeper";
+ protected static final String KAFKA_CONNECT_NETWORK_ALIAS = "kafka-connect";
+ protected static final String BACKEND_ALIAS = "backend";
+ protected static final int BACKEND_PORT = 8080;
+
+ // Other constants
+ protected static final String PLUGIN_PATH_CONTAINER = "/usr/share/java";
+ protected static final ObjectMapper MAPPER =
+ new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
+
+ // Docker network / containers
+ protected static Network network;
+ protected static FixedHostPortGenericContainer> kafka;
+ protected static GenericContainer> zookeeper;
+ protected static GenericContainer> kafkaConnect;
+ protected static int kafkaExposedPort;
+
+ protected static OpenTelemetrySdk openTelemetry;
+
+ // Abstract methods for database-specific setup
+ protected abstract void setupDatabaseContainer();
+
+ protected abstract void startDatabaseContainer();
+
+ protected abstract void stopDatabaseContainer();
+
+ protected abstract void clearDatabaseData() throws Exception;
+
+ protected abstract String getConnectorInstallCommand();
+
+ protected abstract String getConnectorName();
+
+ // Static methods
+ protected static String getKafkaConnectUrl() {
+ return format(
+ Locale.ROOT,
+ "http://%s:%s",
+ kafkaConnect.getHost(),
+ kafkaConnect.getMappedPort(CONNECT_REST_PORT_INTERNAL));
+ }
+
+ protected static String getInternalKafkaBoostrapServers() {
+ return KAFKA_NETWORK_ALIAS + ":" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT;
+ }
+
+ protected static String getKafkaBoostrapServers() {
+ return kafka.getHost() + ":" + kafkaExposedPort;
+ }
+
+ @Override
+ public TelemetryRetriever getTelemetryRetriever() {
+ return telemetryRetriever;
+ }
+
+ @BeforeAll
+ public void setupBase() {
+ network = Network.newNetwork();
+
+ // Start backend container first (like smoke tests)
+ backend =
+ new GenericContainer<>(
+ DockerImageName.parse(
+ "ghcr.io/open-telemetry/opentelemetry-java-instrumentation/smoke-test-fake-backend:20250811.16876216352"))
+ .withExposedPorts(BACKEND_PORT)
+ .withNetwork(network)
+ .withNetworkAliases(BACKEND_ALIAS)
+ .waitingFor(
+ Wait.forHttp("/health")
+ .forPort(BACKEND_PORT)
+ .withStartupTimeout(Duration.of(5, MINUTES)))
+ .withStartupTimeout(Duration.of(5, MINUTES));
+ backend.start();
+
+ telemetryRetriever =
+ new TelemetryRetriever(backend.getMappedPort(BACKEND_PORT), Duration.ofSeconds(30));
+
+ openTelemetry =
+ OpenTelemetrySdk.builder()
+ .setTracerProvider(
+ SdkTracerProvider.builder()
+ .addSpanProcessor(SimpleSpanProcessor.create(LoggingSpanExporter.create()))
+ .addSpanProcessor(
+ SimpleSpanProcessor.create(
+ OtlpGrpcSpanExporter.builder()
+ .setEndpoint(
+ "http://localhost:" + backend.getMappedPort(BACKEND_PORT))
+ .build()))
+ .build())
+ .setPropagators(
+ ContextPropagators.create(
+ TextMapPropagator.composite(W3CTraceContextPropagator.getInstance())))
+ .build();
+
+ setupZookeeper();
+ setupKafka();
+ setupDatabaseContainer();
+ setupKafkaConnect();
+
+ // Start containers (backend already started)
+ startDatabaseContainer();
+ Startables.deepStart(Stream.of(zookeeper, kafka, kafkaConnect)).join();
+
+ // Wait until Kafka Connect container is ready
+ given()
+ .contentType(ContentType.JSON)
+ .when()
+ .get(getKafkaConnectUrl())
+ .then()
+ .statusCode(HttpStatus.SC_OK);
+ }
+
+ private static void setupZookeeper() {
+ zookeeper =
+ new GenericContainer<>("confluentinc/cp-zookeeper:" + CONFLUENT_VERSION)
+ .withNetwork(network)
+ .withNetworkAliases(ZOOKEEPER_NETWORK_ALIAS)
+ .withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(ZOOKEEPER_INTERNAL_PORT))
+ .withEnv("ZOOKEEPER_TICK_TIME", "2000")
+ .withExposedPorts(ZOOKEEPER_INTERNAL_PORT)
+ .withStartupTimeout(Duration.of(5, MINUTES));
+ }
+
+ private static void setupKafka() {
+ String zookeeperInternalUrl = ZOOKEEPER_NETWORK_ALIAS + ":" + ZOOKEEPER_INTERNAL_PORT;
+
+ kafkaExposedPort = PortUtils.findOpenPort();
+ kafka =
+ new FixedHostPortGenericContainer<>("confluentinc/cp-kafka:" + CONFLUENT_VERSION)
+ .withFixedExposedPort(kafkaExposedPort, KAFKA_INTERNAL_PORT)
+ .withNetwork(network)
+ .withNetworkAliases(KAFKA_NETWORK_ALIAS)
+ .withEnv("KAFKA_BROKER_ID", "1")
+ .withEnv("KAFKA_ZOOKEEPER_CONNECT", zookeeperInternalUrl)
+ .withEnv("ZOOKEEPER_SASL_ENABLED", "false")
+ .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
+ .withEnv(
+ "KAFKA_LISTENERS",
+ "PLAINTEXT://0.0.0.0:"
+ + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT
+ + ",PLAINTEXT_HOST://0.0.0.0:"
+ + KAFKA_INTERNAL_PORT)
+ .withEnv(
+ "KAFKA_ADVERTISED_LISTENERS",
+ "PLAINTEXT://"
+ + KAFKA_NETWORK_ALIAS
+ + ":"
+ + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT
+ + ",PLAINTEXT_HOST://localhost:"
+ + kafkaExposedPort)
+ .withEnv(
+ "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
+ "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT")
+ .withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "PLAINTEXT")
+ .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "PLAINTEXT")
+ .withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAINTEXT")
+ .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
+ .withEnv("KAFKA_OPTS", "-Djava.net.preferIPv4Stack=True")
+ .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "100")
+ .withStartupTimeout(Duration.of(5, MINUTES));
+ }
+
+ private void setupKafkaConnect() {
+ // Get the agent path from system properties (smoke test pattern)
+ String agentPath = System.getProperty("io.opentelemetry.smoketest.agent.shadowJar.path");
+ if (agentPath == null) {
+ throw new IllegalStateException(
+ "Agent path not found. Make sure the shadowJar task is configured correctly.");
+ }
+
+ kafkaConnect =
+ new GenericContainer<>("confluentinc/cp-kafka-connect:" + CONFLUENT_VERSION)
+ .withNetwork(network)
+ .withNetworkAliases(KAFKA_CONNECT_NETWORK_ALIAS)
+ .withExposedPorts(CONNECT_REST_PORT_INTERNAL)
+ .withLogConsumer(
+ new Slf4jLogConsumer(LoggerFactory.getLogger("kafka-connect-container")))
+ // Save logs to desktop
+ .withFileSystemBind(
+ System.getProperty("user.home") + "/Desktop/kafka-connect-logs",
+ "/var/log/kafka-connect",
+ BindMode.READ_WRITE)
+ // Copy the agent jar to the container
+ .withCopyFileToContainer(
+ MountableFile.forHostPath(agentPath), "/opentelemetry-javaagent.jar")
+ // Configure the agent to export spans to backend (like smoke tests)
+ .withEnv(
+ "JAVA_TOOL_OPTIONS",
+ "-javaagent:/opentelemetry-javaagent.jar " + "-Dotel.javaagent.debug=true")
+ // Disable test exporter and force OTLP exporter
+ .withEnv("OTEL_TESTING_EXPORTER_ENABLED", "false")
+ .withEnv("OTEL_TRACES_EXPORTER", "otlp")
+ .withEnv("OTEL_METRICS_EXPORTER", "none")
+ .withEnv("OTEL_LOGS_EXPORTER", "none")
+ .withEnv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://" + BACKEND_ALIAS + ":" + BACKEND_PORT)
+ .withEnv("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc")
+ .withEnv("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "1")
+ .withEnv("OTEL_BSP_SCHEDULE_DELAY", "10ms")
+ .withEnv("OTEL_METRIC_EXPORT_INTERVAL", "1000")
+ .withEnv("CONNECT_BOOTSTRAP_SERVERS", getInternalKafkaBoostrapServers())
+ .withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", KAFKA_CONNECT_NETWORK_ALIAS)
+ .withEnv("CONNECT_PLUGIN_PATH", PLUGIN_PATH_CONTAINER)
+ .withEnv(
+ "CONNECT_LOG4J_LOGGERS", "org.reflections=ERROR,org.apache.kafka.connect=DEBUG")
+ .withEnv("CONNECT_REST_PORT", String.valueOf(CONNECT_REST_PORT_INTERNAL))
+ .withEnv("CONNECT_GROUP_ID", "kafka-connect-group")
+ .withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "kafka-connect-configs")
+ .withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "kafka-connect-offsets")
+ .withEnv("CONNECT_STATUS_STORAGE_TOPIC", "kafka-connect-status")
+ .withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.json.JsonConverter")
+ .withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.json.JsonConverter")
+ .withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1")
+ .withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1")
+ .withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1")
+ .waitingFor(
+ Wait.forHttp("/")
+ .forPort(CONNECT_REST_PORT_INTERNAL)
+ .withStartupTimeout(Duration.of(5, MINUTES)))
+ .withStartupTimeout(Duration.of(5, MINUTES))
+ .withCommand(
+ "bash",
+ "-c",
+ "mkdir -p /var/log/kafka-connect && "
+ + getConnectorInstallCommand()
+ + " && "
+ + "echo 'Starting Kafka Connect with logging to /var/log/kafka-connect/' && "
+ + "/etc/confluent/docker/run 2>&1 | tee /var/log/kafka-connect/kafka-connect.log");
+ }
+
+ @BeforeEach
+ public void resetBase() throws Exception {
+ deleteConnectorIfExists();
+ clearDatabaseData();
+ }
+
+ protected void awaitForTopicCreation(String topicName) {
+ try (AdminClient adminClient = createAdminClient()) {
+ await()
+ .atMost(Duration.ofSeconds(60))
+ .pollInterval(Duration.ofMillis(500))
+ .until(() -> adminClient.listTopics().names().get().contains(topicName));
+ }
+ }
+
+ protected AdminClient createAdminClient() {
+ Properties properties = new Properties();
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers());
+ return KafkaAdminClient.create(properties);
+ }
+
+ protected void deleteConnectorIfExists() {
+ given()
+ .log()
+ .headers()
+ .contentType(ContentType.JSON)
+ .when()
+ .delete(getKafkaConnectUrl() + "/connectors/" + getConnectorName())
+ .andReturn()
+ .then()
+ .log()
+ .all();
+ }
+
+ @AfterAll
+ public void cleanupBase() {
+ telemetryRetriever.close();
+ openTelemetry.close();
+
+ // Stop all containers in reverse order of startup to ensure clean shutdown
+ if (kafkaConnect != null) {
+ kafkaConnect.stop();
+ }
+
+ stopDatabaseContainer();
+
+ if (kafka != null) {
+ kafka.stop();
+ }
+
+ if (zookeeper != null) {
+ zookeeper.stop();
+ }
+
+ if (backend != null) {
+ backend.stop();
+ }
+
+ if (network != null) {
+ network.close();
+ }
+ }
+
+ protected static Producer instrument(Producer producer) {
+ return KafkaTelemetry.create(openTelemetry).wrap(producer);
+ }
+}
diff --git a/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/MongoKafkaConnectSinkTaskTest.java b/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/MongoKafkaConnectSinkTaskTest.java
new file mode 100644
index 000000000000..c49a60b952e6
--- /dev/null
+++ b/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/MongoKafkaConnectSinkTaskTest.java
@@ -0,0 +1,372 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.kafkaconnect.v2_6;
+
+import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingOperationTypeIncubatingValues.PROCESS;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.KAFKA;
+import static io.opentelemetry.semconv.incubating.ThreadIncubatingAttributes.THREAD_ID;
+import static io.opentelemetry.semconv.incubating.ThreadIncubatingAttributes.THREAD_NAME;
+import static io.restassured.RestAssured.given;
+import static java.lang.String.format;
+import static org.awaitility.Awaitility.await;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.sdk.testing.assertj.TraceAssert;
+import io.opentelemetry.sdk.trace.data.LinkData;
+import io.restassured.http.ContentType;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.bson.Document;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testcontainers.utility.DockerImageName;
+
+@SuppressWarnings("deprecation") // using deprecated semconv
+@Testcontainers
+class MongoKafkaConnectSinkTaskTest extends KafkaConnectSinkTaskBaseTest {
+ // MongoDB-specific constants
+ private static final String MONGO_NETWORK_ALIAS = "mongodb";
+ private static final String DB_NAME = "testdb";
+ private static final String COLLECTION_NAME = "person";
+ private static final String CONNECTOR_NAME = "test-mongo-connector";
+ private static final String TOPIC_NAME = "test-mongo-topic";
+
+ private static MongoDBContainer mongoDB;
+
+ @Override
+ protected void setupDatabaseContainer() {
+ mongoDB =
+ new MongoDBContainer(DockerImageName.parse("mongo:4.4"))
+ .withNetwork(network)
+ .withNetworkAliases(MONGO_NETWORK_ALIAS)
+ .withStartupTimeout(Duration.ofMinutes(5));
+ }
+
+ @Override
+ protected void startDatabaseContainer() {
+ Startables.deepStart(mongoDB).join();
+ }
+
+ @Override
+ protected void stopDatabaseContainer() {
+ if (mongoDB != null) {
+ mongoDB.stop();
+ }
+ }
+
+ @Override
+ protected void clearDatabaseData() {
+ clearMongoCollection();
+ }
+
+ @Override
+ protected String getConnectorInstallCommand() {
+ return "confluent-hub install --no-prompt --component-dir /usr/share/java "
+ + "mongodb/kafka-connect-mongodb:1.11.0";
+ }
+
+ @Override
+ protected String getConnectorName() {
+ return CONNECTOR_NAME;
+ }
+
+ @Test
+ void testSingleMessage() throws Exception {
+ String testTopicName = TOPIC_NAME;
+ setupMongoSinkConnector(testTopicName);
+ awaitForTopicCreation(testTopicName);
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ testing.waitForTraces(9); // Skip initial traces from Kafka Connect startup
+ testing.clearData();
+
+ try (Producer producer = instrument(new KafkaProducer<>(props))) {
+ producer.send(
+ new ProducerRecord<>(testTopicName, "test-key", "{\"id\":1,\"name\":\"TestUser\"}"));
+ producer.flush();
+ }
+
+ await().atMost(Duration.ofSeconds(60)).until(() -> getRecordCountFromMongo() >= 1);
+
+ AtomicReference producerSpanContext = new AtomicReference<>();
+ testing.waitAndAssertTraces(
+ trace ->
+ // producer is in a separate trace, linked to consumer with a span link
+ trace.hasSpansSatisfyingExactly(
+ span -> {
+ span.hasName(testTopicName + " publish").hasKind(SpanKind.PRODUCER).hasNoParent();
+ producerSpanContext.set(span.actual().getSpanContext());
+ }),
+ trace ->
+ // kafka connect sends message to status topic while processing our message
+ trace.hasSpansSatisfyingExactly(
+ span ->
+ span.hasName("kafka-connect-status publish")
+ .hasKind(SpanKind.PRODUCER)
+ .hasNoParent(),
+ span ->
+ span.hasName("kafka-connect-status process")
+ .hasKind(SpanKind.CONSUMER)
+ .hasParent(trace.getSpan(0))),
+ trace ->
+ // kafka connect consumer trace, linked to producer span via a span link
+ trace.hasSpansSatisfyingExactly(
+ span ->
+ span.hasName(testTopicName + " process")
+ .hasKind(CONSUMER)
+ .hasNoParent()
+ .hasLinks(LinkData.create(producerSpanContext.get()))
+ .hasAttributesSatisfyingExactly(
+ equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1),
+ equalTo(MESSAGING_DESTINATION_NAME, testTopicName),
+ equalTo(MESSAGING_OPERATION, PROCESS),
+ equalTo(MESSAGING_SYSTEM, KAFKA),
+ satisfies(THREAD_ID, val -> val.isNotZero()),
+ satisfies(THREAD_NAME, val -> val.isNotBlank())),
+ span ->
+ span.hasName("update " + DB_NAME + "." + COLLECTION_NAME)
+ .hasKind(SpanKind.CLIENT)
+ .hasParent(trace.getSpan(0))),
+ trace ->
+ trace.hasSpansSatisfyingExactly(
+ span -> span.hasName("GET /connectors").hasKind(SpanKind.SERVER).hasNoParent()),
+ trace ->
+ trace.hasSpansSatisfyingExactly(
+ span -> span.hasName("GET /connectors").hasKind(SpanKind.SERVER).hasNoParent()));
+ }
+
+ @Test
+ void testMultiTopic() throws Exception {
+ String topicName1 = TOPIC_NAME + "-1";
+ String topicName2 = TOPIC_NAME + "-2";
+ String topicName3 = TOPIC_NAME + "-3";
+
+ setupMongoSinkConnectorMultiTopic(topicName1, topicName2, topicName3);
+ awaitForTopicCreation(topicName1);
+ awaitForTopicCreation(topicName2);
+ awaitForTopicCreation(topicName3);
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10); // to send messages in one batch
+ props.put(ProducerConfig.LINGER_MS_CONFIG, 10_000); // 10 seconds
+
+ testing.waitForTraces(9); // Skip initial traces from Kafka Connect startup
+ testing.clearData();
+
+ Span parentSpan = openTelemetry.getTracer("test").spanBuilder("parent").startSpan();
+ try (Producer producer = instrument(new KafkaProducer<>(props));
+ Scope ignore = parentSpan.makeCurrent()) {
+ producer.send(
+ new ProducerRecord<>(
+ topicName1, "key1", "{\"id\":1,\"name\":\"User1\",\"source\":\"topic1\"}"));
+ producer.send(
+ new ProducerRecord<>(
+ topicName2, "key2", "{\"id\":2,\"name\":\"User2\",\"source\":\"topic2\"}"));
+ producer.send(
+ new ProducerRecord<>(
+ topicName3, "key3", "{\"id\":3,\"name\":\"User3\",\"source\":\"topic3\"}"));
+ producer.flush();
+ } finally {
+ parentSpan.end();
+ }
+
+ await().atMost(Duration.ofSeconds(60)).until(() -> getRecordCountFromMongo() >= 3);
+
+ Consumer kafkaStatusAssertion =
+ trace ->
+ // kafka connect sends message to status topic while processing our message
+ trace.hasSpansSatisfyingExactly(
+ span ->
+ span.hasName("kafka-connect-status publish")
+ .hasKind(SpanKind.PRODUCER)
+ .hasNoParent(),
+ span ->
+ span.hasName("kafka-connect-status process")
+ .hasKind(SpanKind.CONSUMER)
+ .hasParent(trace.getSpan(0)));
+
+ AtomicReference producerSpanContext1 = new AtomicReference<>();
+ AtomicReference producerSpanContext2 = new AtomicReference<>();
+ AtomicReference producerSpanContext3 = new AtomicReference<>();
+ testing.waitAndAssertTraces(
+ trace ->
+ // producer is in a separate trace, linked to consumer with a span link
+ trace.hasSpansSatisfyingExactly(
+ span -> span.hasName("parent").hasNoParent(),
+ span -> {
+ span.hasName(topicName1 + " publish")
+ .hasKind(SpanKind.PRODUCER)
+ .hasParent(trace.getSpan(0));
+ producerSpanContext1.set(span.actual().getSpanContext());
+ },
+ span -> {
+ span.hasName(topicName2 + " publish")
+ .hasKind(SpanKind.PRODUCER)
+ .hasParent(trace.getSpan(0));
+ producerSpanContext2.set(span.actual().getSpanContext());
+ },
+ span -> {
+ span.hasName(topicName3 + " publish")
+ .hasKind(SpanKind.PRODUCER)
+ .hasParent(trace.getSpan(0));
+ producerSpanContext3.set(span.actual().getSpanContext());
+ }),
+ kafkaStatusAssertion,
+ kafkaStatusAssertion,
+ kafkaStatusAssertion,
+ trace ->
+ // kafka connect consumer trace, linked to producer span via a span link
+ trace.hasSpansSatisfyingExactly(
+ span ->
+ span.hasName("unknown process")
+ .hasKind(CONSUMER)
+ .hasNoParent()
+ .hasLinks(
+ LinkData.create(producerSpanContext1.get()),
+ LinkData.create(producerSpanContext2.get()),
+ LinkData.create(producerSpanContext3.get()))
+ .hasAttributesSatisfyingExactly(
+ equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 3),
+ equalTo(MESSAGING_OPERATION, PROCESS),
+ equalTo(MESSAGING_SYSTEM, KAFKA),
+ satisfies(THREAD_ID, val -> val.isNotZero()),
+ satisfies(THREAD_NAME, val -> val.isNotBlank())),
+ span ->
+ span.hasName("update " + DB_NAME + "." + COLLECTION_NAME)
+ .hasKind(SpanKind.CLIENT)
+ .hasParent(trace.getSpan(0)),
+ span ->
+ span.hasName("update " + DB_NAME + "." + COLLECTION_NAME)
+ .hasKind(SpanKind.CLIENT)
+ .hasParent(trace.getSpan(0)),
+ span ->
+ span.hasName("update " + DB_NAME + "." + COLLECTION_NAME)
+ .hasKind(SpanKind.CLIENT)
+ .hasParent(trace.getSpan(0))),
+ trace ->
+ trace.hasSpansSatisfyingExactly(
+ span -> span.hasName("GET /connectors").hasKind(SpanKind.SERVER).hasNoParent()),
+ trace ->
+ trace.hasSpansSatisfyingExactly(
+ span -> span.hasName("GET /connectors").hasKind(SpanKind.SERVER).hasNoParent()));
+ }
+
+ // MongoDB-specific helper methods
+ private static void setupMongoSinkConnector(String topicName) throws IOException {
+ Map configMap = new HashMap<>();
+ configMap.put("connector.class", "com.mongodb.kafka.connect.MongoSinkConnector");
+ configMap.put("tasks.max", "1");
+ configMap.put("connection.uri", format(Locale.ROOT, "mongodb://%s:27017", MONGO_NETWORK_ALIAS));
+ configMap.put("database", DB_NAME);
+ configMap.put("collection", COLLECTION_NAME);
+ configMap.put("topics", topicName);
+ configMap.put("key.converter", "org.apache.kafka.connect.storage.StringConverter");
+ configMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ configMap.put("value.converter.schemas.enable", "false");
+ configMap.put(
+ "document.id.strategy",
+ "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy");
+
+ String payload =
+ MAPPER.writeValueAsString(ImmutableMap.of("name", CONNECTOR_NAME, "config", configMap));
+ given()
+ .log()
+ .headers()
+ .contentType(ContentType.JSON)
+ .accept(ContentType.JSON)
+ .body(payload)
+ .when()
+ .post(getKafkaConnectUrl() + "/connectors")
+ .andReturn()
+ .then()
+ .log()
+ .all();
+ }
+
+ private static void setupMongoSinkConnectorMultiTopic(String... topicNames) throws IOException {
+ Map configMap = new HashMap<>();
+ configMap.put("connector.class", "com.mongodb.kafka.connect.MongoSinkConnector");
+ configMap.put("tasks.max", "1");
+ configMap.put("connection.uri", format(Locale.ROOT, "mongodb://%s:27017", MONGO_NETWORK_ALIAS));
+ configMap.put("database", DB_NAME);
+ configMap.put("collection", COLLECTION_NAME);
+ // Configure multiple topics separated by commas
+ configMap.put("topics", String.join(",", topicNames));
+ configMap.put("key.converter", "org.apache.kafka.connect.storage.StringConverter");
+ configMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ configMap.put("value.converter.schemas.enable", "false");
+ configMap.put(
+ "document.id.strategy",
+ "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy");
+
+ String payload =
+ MAPPER.writeValueAsString(
+ ImmutableMap.of("name", CONNECTOR_NAME + "-multi", "config", configMap));
+ given()
+ .log()
+ .headers()
+ .contentType(ContentType.JSON)
+ .accept(ContentType.JSON)
+ .body(payload)
+ .when()
+ .post(getKafkaConnectUrl() + "/connectors")
+ .andReturn()
+ .then()
+ .log()
+ .all();
+ }
+
+ private static long getRecordCountFromMongo() {
+ try (MongoClient mongoClient = MongoClients.create(mongoDB.getConnectionString())) {
+ MongoDatabase database = mongoClient.getDatabase(DB_NAME);
+ MongoCollection collection = database.getCollection(COLLECTION_NAME);
+ return collection.countDocuments();
+ }
+ }
+
+ private static void clearMongoCollection() {
+ try (MongoClient mongoClient = MongoClients.create(mongoDB.getConnectionString())) {
+ MongoDatabase database = mongoClient.getDatabase(DB_NAME);
+ MongoCollection collection = database.getCollection(COLLECTION_NAME);
+ collection.drop();
+ }
+ }
+}
diff --git a/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/PostgresKafkaConnectSinkTaskTest.java b/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/PostgresKafkaConnectSinkTaskTest.java
new file mode 100644
index 000000000000..34a07b3597e9
--- /dev/null
+++ b/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/PostgresKafkaConnectSinkTaskTest.java
@@ -0,0 +1,429 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.kafkaconnect.v2_6;
+
+import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingOperationTypeIncubatingValues.PROCESS;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.KAFKA;
+import static io.opentelemetry.semconv.incubating.ThreadIncubatingAttributes.THREAD_ID;
+import static io.opentelemetry.semconv.incubating.ThreadIncubatingAttributes.THREAD_NAME;
+import static io.restassured.RestAssured.given;
+import static java.lang.String.format;
+import static org.awaitility.Awaitility.await;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
+import io.opentelemetry.sdk.testing.assertj.TraceAssert;
+import io.opentelemetry.sdk.trace.data.LinkData;
+import io.restassured.http.ContentType;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testcontainers.utility.DockerImageName;
+
+@SuppressWarnings("deprecation") // using deprecated semconv
+@Testcontainers
+class PostgresKafkaConnectSinkTaskTest extends KafkaConnectSinkTaskBaseTest {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(PostgresKafkaConnectSinkTaskTest.class);
+
+ private static final String POSTGRES_NETWORK_ALIAS = "postgres";
+ private static final String DB_NAME = "test";
+ private static final String DB_USERNAME = "postgres";
+ private static final String DB_PASSWORD = "password";
+ private static final String DB_TABLE_PERSON = "person";
+ private static final String CONNECTOR_NAME = "test-postgres-connector";
+ private static final String TOPIC_NAME = "test-postgres-topic";
+
+ private static PostgreSQLContainer> postgreSql;
+
+ @Override
+ protected void setupDatabaseContainer() {
+ postgreSql =
+ new PostgreSQLContainer<>(
+ DockerImageName.parse("postgres:11").asCompatibleSubstituteFor("postgres"))
+ .withNetwork(network)
+ .withNetworkAliases(POSTGRES_NETWORK_ALIAS)
+ .withInitScript("postgres-setup.sql")
+ .withDatabaseName(DB_NAME)
+ .withUsername(DB_USERNAME)
+ .withPassword(DB_PASSWORD)
+ .withStartupTimeout(Duration.ofMinutes(5));
+ }
+
+ @Override
+ protected void startDatabaseContainer() {
+ Startables.deepStart(postgreSql).join();
+ }
+
+ @Override
+ protected void stopDatabaseContainer() {
+ if (postgreSql != null) {
+ postgreSql.stop();
+ }
+ }
+
+ @Override
+ protected void clearDatabaseData() throws Exception {
+ clearPostgresTable();
+ }
+
+ @Override
+ protected String getConnectorInstallCommand() {
+ return "confluent-hub install --no-prompt --component-dir /usr/share/java "
+ + "confluentinc/kafka-connect-jdbc:10.7.4";
+ }
+
+ @Override
+ protected String getConnectorName() {
+ return CONNECTOR_NAME;
+ }
+
+ @Test
+ void testSingleMessage() throws Exception {
+ String testTopicName = TOPIC_NAME;
+ setupPostgresSinkConnector(testTopicName);
+ awaitForTopicCreation(testTopicName);
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ testing.waitForTraces(9); // Skip initial traces from Kafka Connect startup
+ testing.clearData();
+
+ try (Producer producer = instrument(new KafkaProducer<>(props))) {
+ producer.send(
+ new ProducerRecord<>(
+ testTopicName,
+ "test-key",
+ "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"id\",\"type\":\"int32\"},{\"field\":\"name\",\"type\":\"string\"}]},\"payload\":{\"id\":1,\"name\":\"TestUser\"}}"));
+ producer.flush();
+ }
+
+ await().atMost(Duration.ofSeconds(60)).until(() -> getRecordCountFromPostgres() >= 1);
+
+ AtomicReference producerSpanContext = new AtomicReference<>();
+ testing.waitAndAssertTraces(
+ trace ->
+ // producer is in a separate trace, linked to consumer with a span link
+ trace.hasSpansSatisfyingExactly(
+ span -> {
+ span.hasName(testTopicName + " publish").hasKind(SpanKind.PRODUCER).hasNoParent();
+ producerSpanContext.set(span.actual().getSpanContext());
+ }),
+ trace ->
+ // kafka connect sends message to status topic while processing our message
+ trace.hasSpansSatisfyingExactly(
+ span ->
+ span.hasName("kafka-connect-status publish")
+ .hasKind(SpanKind.PRODUCER)
+ .hasNoParent(),
+ span ->
+ span.hasName("kafka-connect-status process")
+ .hasKind(SpanKind.CONSUMER)
+ .hasParent(trace.getSpan(0))),
+ trace -> {
+ // kafka connect consumer trace, linked to producer span via a span link
+ Consumer selectAssertion =
+ span ->
+ span.hasName("SELECT test").hasKind(SpanKind.CLIENT).hasParent(trace.getSpan(0));
+
+ trace.hasSpansSatisfyingExactly(
+ span ->
+ span.hasName(testTopicName + " process")
+ .hasKind(CONSUMER)
+ .hasNoParent()
+ .hasLinks(LinkData.create(producerSpanContext.get()))
+ .hasAttributesSatisfyingExactly(
+ equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1),
+ equalTo(MESSAGING_DESTINATION_NAME, testTopicName),
+ equalTo(MESSAGING_OPERATION, PROCESS),
+ equalTo(MESSAGING_SYSTEM, KAFKA),
+ satisfies(THREAD_ID, val -> val.isNotZero()),
+ satisfies(THREAD_NAME, val -> val.isNotBlank())),
+ selectAssertion,
+ selectAssertion,
+ selectAssertion,
+ selectAssertion,
+ selectAssertion,
+ span ->
+ span.hasName("INSERT test." + DB_TABLE_PERSON)
+ .hasKind(SpanKind.CLIENT)
+ .hasParent(trace.getSpan(0)));
+ },
+ trace ->
+ trace.hasSpansSatisfyingExactly(
+ span -> span.hasName("GET /connectors").hasKind(SpanKind.SERVER).hasNoParent()),
+ trace ->
+ trace.hasSpansSatisfyingExactly(
+ span -> span.hasName("GET /connectors").hasKind(SpanKind.SERVER).hasNoParent()));
+ }
+
+ @Test
+ void testMultiTopic() throws Exception {
+ String topicName1 = TOPIC_NAME + "-1";
+ String topicName2 = TOPIC_NAME + "-2";
+ String topicName3 = TOPIC_NAME + "-3";
+
+ setupPostgresSinkConnectorMultiTopic(topicName1, topicName2, topicName3);
+ awaitForTopicCreation(topicName1);
+ awaitForTopicCreation(topicName2);
+ awaitForTopicCreation(topicName3);
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10); // to send messages in one batch
+ props.put(ProducerConfig.LINGER_MS_CONFIG, 10_000); // 10 seconds
+
+ testing.waitForTraces(9); // Skip initial traces from Kafka Connect startup
+ testing.clearData();
+
+ Span parentSpan = openTelemetry.getTracer("test").spanBuilder("parent").startSpan();
+ try (Producer producer = instrument(new KafkaProducer<>(props));
+ Scope ignore = parentSpan.makeCurrent()) {
+ producer.send(
+ new ProducerRecord<>(
+ topicName1,
+ "key1",
+ "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"id\",\"type\":\"int32\"},{\"field\":\"name\",\"type\":\"string\"}]},\"payload\":{\"id\":1,\"name\":\"User1\"}}"));
+ producer.send(
+ new ProducerRecord<>(
+ topicName2,
+ "key2",
+ "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"id\",\"type\":\"int32\"},{\"field\":\"name\",\"type\":\"string\"}]},\"payload\":{\"id\":2,\"name\":\"User2\"}}"));
+ producer.send(
+ new ProducerRecord<>(
+ topicName3,
+ "key3",
+ "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"id\",\"type\":\"int32\"},{\"field\":\"name\",\"type\":\"string\"}]},\"payload\":{\"id\":3,\"name\":\"User3\"}}"));
+ producer.flush();
+ } finally {
+ parentSpan.end();
+ }
+
+ await().atMost(Duration.ofSeconds(60)).until(() -> getRecordCountFromPostgres() >= 3);
+
+ Consumer kafkaStatusAssertion =
+ trace ->
+ // kafka connect sends message to status topic while processing our message
+ trace.hasSpansSatisfyingExactly(
+ span ->
+ span.hasName("kafka-connect-status publish")
+ .hasKind(SpanKind.PRODUCER)
+ .hasNoParent(),
+ span ->
+ span.hasName("kafka-connect-status process")
+ .hasKind(SpanKind.CONSUMER)
+ .hasParent(trace.getSpan(0)));
+
+ AtomicReference producerSpanContext1 = new AtomicReference<>();
+ AtomicReference producerSpanContext2 = new AtomicReference<>();
+ AtomicReference producerSpanContext3 = new AtomicReference<>();
+ testing.waitAndAssertTraces(
+ trace ->
+ // producer is in a separate trace, linked to consumer with a span link
+ trace.hasSpansSatisfyingExactly(
+ span -> span.hasName("parent").hasNoParent(),
+ span -> {
+ span.hasName(topicName1 + " publish")
+ .hasKind(SpanKind.PRODUCER)
+ .hasParent(trace.getSpan(0));
+ producerSpanContext1.set(span.actual().getSpanContext());
+ },
+ span -> {
+ span.hasName(topicName2 + " publish")
+ .hasKind(SpanKind.PRODUCER)
+ .hasParent(trace.getSpan(0));
+ producerSpanContext2.set(span.actual().getSpanContext());
+ },
+ span -> {
+ span.hasName(topicName3 + " publish")
+ .hasKind(SpanKind.PRODUCER)
+ .hasParent(trace.getSpan(0));
+ producerSpanContext3.set(span.actual().getSpanContext());
+ }),
+ kafkaStatusAssertion,
+ kafkaStatusAssertion,
+ kafkaStatusAssertion,
+ trace -> {
+ // kafka connect consumer trace, linked to producer span via a span link
+ Consumer selectAssertion =
+ span ->
+ span.hasName("SELECT test").hasKind(SpanKind.CLIENT).hasParent(trace.getSpan(0));
+
+ trace.hasSpansSatisfyingExactly(
+ span ->
+ span.hasName("unknown process")
+ .hasKind(CONSUMER)
+ .hasNoParent()
+ .hasLinks(
+ LinkData.create(producerSpanContext1.get()),
+ LinkData.create(producerSpanContext2.get()),
+ LinkData.create(producerSpanContext3.get()))
+ .hasAttributesSatisfyingExactly(
+ equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 3),
+ equalTo(MESSAGING_OPERATION, PROCESS),
+ equalTo(MESSAGING_SYSTEM, KAFKA),
+ satisfies(THREAD_ID, val -> val.isNotZero()),
+ satisfies(THREAD_NAME, val -> val.isNotBlank())),
+ selectAssertion,
+ selectAssertion,
+ selectAssertion,
+ selectAssertion,
+ selectAssertion,
+ span ->
+ span.hasName("INSERT test." + DB_TABLE_PERSON)
+ .hasKind(SpanKind.CLIENT)
+ .hasParent(trace.getSpan(0)));
+ },
+ trace ->
+ trace.hasSpansSatisfyingExactly(
+ span -> span.hasName("GET /connectors").hasKind(SpanKind.SERVER).hasNoParent()),
+ trace ->
+ trace.hasSpansSatisfyingExactly(
+ span -> span.hasName("GET /connectors").hasKind(SpanKind.SERVER).hasNoParent()));
+ }
+
+ private static void setupPostgresSinkConnector(String topicName) throws IOException {
+ Map configMap = new HashMap<>();
+ configMap.put("connector.class", "io.confluent.connect.jdbc.JdbcSinkConnector");
+ configMap.put("tasks.max", "1");
+ configMap.put(
+ "connection.url",
+ format(
+ Locale.ROOT,
+ "jdbc:postgresql://%s:5432/%s?loggerLevel=OFF",
+ POSTGRES_NETWORK_ALIAS,
+ DB_NAME));
+ configMap.put("connection.user", DB_USERNAME);
+ configMap.put("connection.password", DB_PASSWORD);
+ configMap.put("topics", topicName);
+ configMap.put("auto.create", "false");
+ configMap.put("auto.evolve", "false");
+ configMap.put("insert.mode", "insert");
+ configMap.put("delete.enabled", "false");
+ configMap.put("key.converter", "org.apache.kafka.connect.storage.StringConverter");
+ configMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ configMap.put("value.converter.schemas.enable", "true");
+ configMap.put("table.name.format", DB_TABLE_PERSON);
+ configMap.put("pk.mode", "none");
+
+ String payload =
+ MAPPER.writeValueAsString(ImmutableMap.of("name", CONNECTOR_NAME, "config", configMap));
+ given()
+ .log()
+ .headers()
+ .contentType(ContentType.JSON)
+ .accept(ContentType.JSON)
+ .body(payload)
+ .when()
+ .post(getKafkaConnectUrl() + "/connectors")
+ .andReturn()
+ .then()
+ .log()
+ .all();
+ }
+
+ private static void setupPostgresSinkConnectorMultiTopic(String... topicNames)
+ throws IOException {
+ Map configMap = new HashMap<>();
+ configMap.put("connector.class", "io.confluent.connect.jdbc.JdbcSinkConnector");
+ configMap.put("tasks.max", "1");
+ configMap.put(
+ "connection.url",
+ format(
+ Locale.ROOT,
+ "jdbc:postgresql://%s:5432/%s?loggerLevel=OFF",
+ POSTGRES_NETWORK_ALIAS,
+ DB_NAME));
+ configMap.put("connection.user", DB_USERNAME);
+ configMap.put("connection.password", DB_PASSWORD);
+ // Configure multiple topics separated by commas
+ configMap.put("topics", String.join(",", topicNames));
+ configMap.put("auto.create", "false");
+ configMap.put("auto.evolve", "false");
+ configMap.put("insert.mode", "insert");
+ configMap.put("delete.enabled", "false");
+ configMap.put("key.converter", "org.apache.kafka.connect.storage.StringConverter");
+ configMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ configMap.put("value.converter.schemas.enable", "true");
+ configMap.put("table.name.format", DB_TABLE_PERSON);
+ configMap.put("pk.mode", "none");
+
+ String payload =
+ MAPPER.writeValueAsString(
+ ImmutableMap.of("name", CONNECTOR_NAME + "-multi", "config", configMap));
+ given()
+ .log()
+ .headers()
+ .contentType(ContentType.JSON)
+ .accept(ContentType.JSON)
+ .body(payload)
+ .when()
+ .post(getKafkaConnectUrl() + "/connectors")
+ .andReturn()
+ .then()
+ .log()
+ .all();
+ }
+
+ private static long getRecordCountFromPostgres() throws SQLException {
+ try (Connection conn =
+ DriverManager.getConnection(postgreSql.getJdbcUrl(), DB_USERNAME, DB_PASSWORD);
+ Statement st = conn.createStatement();
+ ResultSet rs = st.executeQuery("SELECT COUNT(*) FROM " + DB_TABLE_PERSON)) {
+ if (rs.next()) {
+ return rs.getLong(1);
+ }
+ }
+ return 0;
+ }
+
+ private static void clearPostgresTable() throws SQLException {
+ try (Connection conn =
+ DriverManager.getConnection(postgreSql.getJdbcUrl(), DB_USERNAME, DB_PASSWORD);
+ Statement st = conn.createStatement()) {
+ st.executeUpdate("DELETE FROM " + DB_TABLE_PERSON);
+ logger.info("Cleared PostgreSQL table: {}", DB_TABLE_PERSON);
+ }
+ }
+}
diff --git a/instrumentation/kafka/kafka-connect-2.6/testing/src/test/resources/postgres-setup.sql b/instrumentation/kafka/kafka-connect-2.6/testing/src/test/resources/postgres-setup.sql
new file mode 100644
index 000000000000..f83390a2c6e1
--- /dev/null
+++ b/instrumentation/kafka/kafka-connect-2.6/testing/src/test/resources/postgres-setup.sql
@@ -0,0 +1,5 @@
+CREATE TABLE IF NOT EXISTS person (
+ id INT NOT NULL,
+ name VARCHAR(255) NOT NULL,
+ PRIMARY KEY (id)
+);
\ No newline at end of file
diff --git a/settings.gradle.kts b/settings.gradle.kts
index a8aa7a7a51fd..cd8bc47ef729 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -368,6 +368,8 @@ include(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent")
include(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:testing")
include(":instrumentation:kafka:kafka-clients:kafka-clients-2.6:library")
include(":instrumentation:kafka:kafka-clients:kafka-clients-common-0.11:library")
+include(":instrumentation:kafka:kafka-connect-2.6:javaagent")
+include(":instrumentation:kafka:kafka-connect-2.6:testing")
include(":instrumentation:kafka:kafka-streams-0.11:javaagent")
include(":instrumentation:kotlinx-coroutines:kotlinx-coroutines-1.0:javaagent")
include(":instrumentation:kotlinx-coroutines:kotlinx-coroutines-flow-1.3:javaagent")
diff --git a/smoke-tests/src/main/java/io/opentelemetry/smoketest/TelemetryRetriever.java b/smoke-tests/src/main/java/io/opentelemetry/smoketest/TelemetryRetriever.java
index aff06b8817b1..2b0268c9bf5d 100644
--- a/smoke-tests/src/main/java/io/opentelemetry/smoketest/TelemetryRetriever.java
+++ b/smoke-tests/src/main/java/io/opentelemetry/smoketest/TelemetryRetriever.java
@@ -5,6 +5,8 @@
package io.opentelemetry.smoketest;
+import static java.util.Collections.emptyList;
+
import io.opentelemetry.instrumentation.testing.internal.TelemetryConverter;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.metrics.data.MetricData;
@@ -26,10 +28,11 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
-public class TelemetryRetriever {
+public class TelemetryRetriever implements AutoCloseable {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final WebClient client;
private final Duration telemetryTimeout;
+ private boolean closed;
public TelemetryRetriever(int backendPort, Duration telemetryTimeout) {
client = WebClient.of("http://localhost:" + backendPort);
@@ -37,6 +40,10 @@ public TelemetryRetriever(int backendPort, Duration telemetryTimeout) {
}
public void clearTelemetry() {
+ if (closed) {
+ return;
+ }
+
client.get("/clear").aggregate().join();
}
@@ -70,6 +77,10 @@ private static List convert(Collection items, Function>
@SuppressWarnings({"unchecked", "rawtypes"})
private
Collection waitForTelemetry(String path, Supplier builderConstructor) {
+ if (closed) {
+ return emptyList();
+ }
+
try {
return OBJECT_MAPPER
.readTree(waitForContent(path))
@@ -117,4 +128,9 @@ private String waitForContent(String path) throws InterruptedException {
public final WebClient getClient() {
return client;
}
+
+ @Override
+ public void close() {
+ closed = true;
+ }
}