Skip to content

Commit abc6bbe

Browse files
committed
wip: instrumenting kafka connect.
1 parent c7dd764 commit abc6bbe

File tree

21 files changed

+3345
-0
lines changed

21 files changed

+3345
-0
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# Kafka Connect SinkTask Integration Test
2+
3+
This module contains integration tests for Kafka Connect SinkTask instrumentation using Testcontainers.
4+
5+
## Test Overview
6+
7+
The `KafkaConnectSinkTaskTest` class demonstrates how to test Kafka Connect SinkTask functionality with:
8+
9+
- **Kafka Container**: Apache Kafka using Confluent Platform
10+
- **PostgreSQL Container**: PostgreSQL database as the sink destination
11+
- **Kafka Connect Container**: Confluent Kafka Connect with JDBC connector
12+
13+
## Test Structure
14+
15+
### Test Setup
16+
1. **Kafka Container**: Starts a Kafka broker for message streaming
17+
2. **PostgreSQL Container**: Starts a PostgreSQL database for data storage
18+
3. **Kafka Connect Container**: Starts Kafka Connect with the Confluent JDBC connector pre-installed
19+
4. **Network Configuration**: All containers are connected via a Docker network for inter-container communication
20+
21+
### Test Scenarios
22+
23+
#### 1. Basic SinkTask Test (`testKafkaConnectSinkTask`)
24+
- Sends JSON messages to a Kafka topic
25+
- Verifies that Kafka Connect processes the messages and inserts them into PostgreSQL
26+
- Validates both the count and content of inserted records
27+
28+
#### 2. Batch Processing Test (`testKafkaConnectSinkTaskWithMultipleBatches`)
29+
- Sends multiple batches of messages to test bulk processing
30+
- Verifies that all messages are correctly processed and stored
31+
32+
### Data Flow
33+
```
34+
JSON Messages → Kafka Topic → Kafka Connect JDBC Sink → PostgreSQL Table
35+
```
36+
37+
## Running the Tests
38+
39+
### Prerequisites
40+
- Docker installed and running
41+
- Java 11 or higher
42+
- Gradle
43+
44+
### Execute Tests
45+
```bash
46+
# Run all tests in the module
47+
./gradlew :instrumentation:kafka:kafka-connect-2.6:javaagent:test
48+
49+
# Run only the SinkTask test
50+
./gradlew :instrumentation:kafka:kafka-connect-2.6:javaagent:test --tests "KafkaConnectSinkTaskTest"
51+
```
52+
53+
### Test Configuration
54+
55+
The test uses the following configuration:
56+
- **Kafka Topic**: `test-topic`
57+
- **PostgreSQL Database**: `testdb`
58+
- **Table Name**: `test_table`
59+
- **Connector**: Confluent JDBC Sink Connector (version 10.7.4)
60+
61+
### Test Data Format
62+
63+
Messages are sent as JSON strings with the following structure:
64+
```json
65+
{
66+
"name": "John Doe",
67+
"value": 100
68+
}
69+
```
70+
71+
These are automatically mapped to PostgreSQL table columns:
72+
- `id` (SERIAL PRIMARY KEY)
73+
- `name` (VARCHAR(255))
74+
- `value` (INTEGER)
75+
- `created_at` (TIMESTAMP)
76+
77+
## Key Features
78+
79+
1. **Full Integration**: Tests the complete data pipeline from Kafka to PostgreSQL
80+
2. **Container Orchestration**: Uses Docker networks for seamless container communication
81+
3. **Realistic Environment**: Uses actual Confluent components, not mocks
82+
4. **Data Validation**: Verifies both data count and content accuracy
83+
5. **Error Handling**: Includes proper cleanup and error handling
84+
6. **Batch Processing**: Tests both single messages and batch processing scenarios
85+
86+
## Dependencies
87+
88+
The test relies on the following key dependencies:
89+
- `org.testcontainers:kafka` - Kafka container support
90+
- `org.testcontainers:postgresql` - PostgreSQL container support
91+
- `org.postgresql:postgresql` - PostgreSQL JDBC driver
92+
- `com.fasterxml.jackson.core:jackson-databind` - JSON processing
93+
- `org.awaitility:awaitility` - Asynchronous testing utilities
94+
95+
## Notes
96+
97+
- The test automatically installs the Confluent JDBC connector during container startup
98+
- All containers are started with appropriate logging for debugging
99+
- The test includes proper timeouts and retry logic for container readiness
100+
- Network isolation ensures tests don't interfere with each other
101+
- The test focuses on data flow verification rather than OpenTelemetry spans (as requested)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
plugins {
2+
id("otel.javaagent-instrumentation")
3+
}
4+
5+
muzzle {
6+
pass {
7+
group.set("org.apache.kafka")
8+
module.set("connect-api")
9+
versions.set("[2.6.0,)")
10+
// we use reflection to access the "pause" and "resume" methods, so we can't reference them
11+
// directly, and so we can't assert that they exist at muzzle-time
12+
skip("org.apache.kafka.connect.sink.SinkTaskContext")
13+
}
14+
}
15+
16+
dependencies {
17+
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common-0.11:library"))
18+
19+
library("org.apache.kafka:connect-api:3.6.1")
20+
21+
testImplementation("org.apache.kafka:connect-runtime:3.6.1")
22+
23+
}
24+
25+
tasks {
26+
withType<Test>().configureEach {
27+
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
28+
29+
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
30+
31+
// Enable experimental span attributes and receive telemetry for comprehensive testing
32+
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
33+
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
34+
// Set timeout for integration tests with containers
35+
systemProperty("junit.jupiter.execution.timeout.default", "5m")
36+
}
37+
38+
withType<JavaCompile>().configureEach {
39+
options.compilerArgs.add("-Xlint:-deprecation")
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.propagation.TextMapPropagator;
10+
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder;
11+
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
12+
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
13+
import org.apache.kafka.connect.sink.SinkRecord;
14+
15+
final class KafkaConnectBatchProcessSpanLinksExtractor
16+
implements SpanLinksExtractor<KafkaConnectTask> {
17+
18+
private final SpanLinksExtractor<SinkRecord> singleRecordLinkExtractor;
19+
20+
KafkaConnectBatchProcessSpanLinksExtractor(TextMapPropagator propagator) {
21+
this.singleRecordLinkExtractor =
22+
new PropagatorBasedSpanLinksExtractor<>(propagator, SinkRecordHeadersGetter.INSTANCE);
23+
}
24+
25+
@Override
26+
public void extract(SpanLinksBuilder spanLinks, Context parentContext, KafkaConnectTask task) {
27+
28+
for (SinkRecord record : task.getRecords()) {
29+
singleRecordLinkExtractor.extract(spanLinks, parentContext, record);
30+
}
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
7+
8+
import static java.util.Collections.singletonList;
9+
10+
import com.google.auto.service.AutoService;
11+
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
12+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
13+
import java.util.List;
14+
import java.util.logging.Logger;
15+
16+
@AutoService(InstrumentationModule.class)
17+
public class KafkaConnectInstrumentationModule extends InstrumentationModule {
18+
19+
private static final Logger logger = Logger.getLogger(KafkaConnectInstrumentationModule.class.getName());
20+
21+
public KafkaConnectInstrumentationModule() {
22+
super("kafka-connect", "kafka-connect-2.6");
23+
logger.info("KafkaConnect: InstrumentationModule constructor called");
24+
}
25+
26+
@Override
27+
public List<TypeInstrumentation> typeInstrumentations() {
28+
logger.info("KafkaConnect: typeInstrumentations() called");
29+
return singletonList(new SinkTaskInstrumentation());
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
7+
8+
import io.opentelemetry.api.GlobalOpenTelemetry;
9+
import io.opentelemetry.context.propagation.TextMapPropagator;
10+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
11+
12+
public final class KafkaConnectSingletons {
13+
14+
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-connect-2.6";
15+
private static final TextMapPropagator PROPAGATOR =
16+
GlobalOpenTelemetry.get().getPropagators().getTextMapPropagator();
17+
18+
private static final Instrumenter<KafkaConnectTask, Void> INSTRUMENTER =
19+
Instrumenter.<KafkaConnectTask, Void>builder(
20+
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, KafkaConnectTask::getSpanName)
21+
.addSpanLinksExtractor(new KafkaConnectBatchProcessSpanLinksExtractor(PROPAGATOR))
22+
.buildInstrumenter();
23+
24+
public static Instrumenter<KafkaConnectTask, Void> instrumenter() {
25+
return INSTRUMENTER;
26+
}
27+
28+
public static TextMapPropagator propagator() {
29+
return PROPAGATOR;
30+
}
31+
32+
private KafkaConnectSingletons() {}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
7+
8+
import java.util.Collection;
9+
import org.apache.kafka.connect.sink.SinkRecord;
10+
11+
public final class KafkaConnectTask {
12+
13+
private final Collection<SinkRecord> records;
14+
15+
public KafkaConnectTask(Collection<SinkRecord> records) {
16+
this.records = records;
17+
}
18+
19+
public Collection<SinkRecord> getRecords() {
20+
return records;
21+
}
22+
23+
public static String getSpanName(KafkaConnectTask task) {
24+
return "KafkaConnect.put";
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
7+
8+
import io.opentelemetry.context.propagation.TextMapGetter;
9+
import java.nio.charset.StandardCharsets;
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
import javax.annotation.Nullable;
13+
import org.apache.kafka.connect.header.Header;
14+
import org.apache.kafka.connect.header.Headers;
15+
import org.apache.kafka.connect.sink.SinkRecord;
16+
17+
enum SinkRecordHeadersGetter implements TextMapGetter<SinkRecord> {
18+
INSTANCE;
19+
20+
@Override
21+
public Iterable<String> keys(SinkRecord record) {
22+
Headers headers = record.headers();
23+
List<String> keys = new ArrayList<>();
24+
for (Header header : headers) {
25+
keys.add(header.key());
26+
}
27+
return keys;
28+
}
29+
30+
@Nullable
31+
@Override
32+
public String get(@Nullable SinkRecord record, String key) {
33+
if (record == null) {
34+
return null;
35+
}
36+
Headers headers = record.headers();
37+
Header header = headers.lastWithName(key);
38+
if (header == null) {
39+
return null;
40+
}
41+
Object value = header.value();
42+
if (value instanceof byte[]) {
43+
return new String((byte[]) value, StandardCharsets.UTF_8);
44+
}
45+
return value.toString();
46+
}
47+
}

0 commit comments

Comments
 (0)