Skip to content

Commit ee88ff0

Browse files
committed
wip: addressing the feedback and removing unwanted dependencies.
1 parent 1a4ff92 commit ee88ff0

File tree

9 files changed

+209
-58
lines changed

9 files changed

+209
-58
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
7+
8+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
9+
import java.nio.charset.StandardCharsets;
10+
import java.util.Collections;
11+
import java.util.List;
12+
import java.util.stream.Collectors;
13+
import java.util.stream.StreamSupport;
14+
import javax.annotation.Nullable;
15+
import org.apache.kafka.connect.header.Header;
16+
import org.apache.kafka.connect.sink.SinkRecord;
17+
18+
enum KafkaConnectAttributesGetter implements MessagingAttributesGetter<KafkaConnectTask, Void> {
19+
INSTANCE;
20+
21+
@Override
22+
public String getSystem(KafkaConnectTask request) {
23+
return "kafka";
24+
}
25+
26+
@Override
27+
@Nullable
28+
public String getDestination(KafkaConnectTask request) {
29+
SinkRecord firstRecord = request.getFirstRecord();
30+
return firstRecord != null ? firstRecord.topic() : null;
31+
}
32+
33+
@Nullable
34+
@Override
35+
public String getDestinationTemplate(KafkaConnectTask request) {
36+
return null;
37+
}
38+
39+
@Override
40+
public boolean isTemporaryDestination(KafkaConnectTask request) {
41+
return false;
42+
}
43+
44+
@Override
45+
public boolean isAnonymousDestination(KafkaConnectTask request) {
46+
return false;
47+
}
48+
49+
@Override
50+
@Nullable
51+
public String getConversationId(KafkaConnectTask request) {
52+
return null;
53+
}
54+
55+
@Nullable
56+
@Override
57+
public Long getMessageBodySize(KafkaConnectTask request) {
58+
// SinkRecord doesn't expose serialized size information
59+
// This would need to be calculated from the actual value, but that's expensive
60+
// and not typically done in messaging instrumentations for batch processing
61+
return null;
62+
}
63+
64+
@Nullable
65+
@Override
66+
public Long getMessageEnvelopeSize(KafkaConnectTask request) {
67+
return null;
68+
}
69+
70+
@Override
71+
@Nullable
72+
public String getMessageId(KafkaConnectTask request, @Nullable Void unused) {
73+
return null;
74+
}
75+
76+
@Nullable
77+
@Override
78+
public String getClientId(KafkaConnectTask request) {
79+
return null;
80+
}
81+
82+
@Nullable
83+
@Override
84+
public Long getBatchMessageCount(KafkaConnectTask request, @Nullable Void unused) {
85+
return (long) request.getRecords().size();
86+
}
87+
88+
@Override
89+
public List<String> getMessageHeader(KafkaConnectTask request, String name) {
90+
SinkRecord firstRecord = request.getFirstRecord();
91+
if (firstRecord == null || firstRecord.headers() == null) {
92+
return Collections.emptyList();
93+
}
94+
95+
return StreamSupport.stream(firstRecord.headers().spliterator(), false)
96+
.filter(header -> name.equals(header.key()) && header.value() != null)
97+
.map(header -> convertHeaderValue(header))
98+
.collect(Collectors.toList());
99+
}
100+
101+
private static String convertHeaderValue(Header header) {
102+
Object value = header.value();
103+
if (value instanceof byte[]) {
104+
return new String((byte[]) value, StandardCharsets.UTF_8);
105+
}
106+
return value.toString();
107+
}
108+
}

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectBatchProcessSpanLinksExtractor.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
1313
import org.apache.kafka.connect.sink.SinkRecord;
1414

15-
final class KafkaConnectBatchProcessSpanLinksExtractor
16-
implements SpanLinksExtractor<KafkaConnectTask> {
15+
/**
16+
* Extracts span links from Kafka Connect SinkRecord headers for batch processing scenarios.
17+
* This ensures that when processing a batch of records that may come from different traces,
18+
* we create links to all the original trace contexts rather than losing them.
19+
*/
20+
final class KafkaConnectBatchProcessSpanLinksExtractor implements SpanLinksExtractor<KafkaConnectTask> {
1721

1822
private final SpanLinksExtractor<SinkRecord> singleRecordLinkExtractor;
1923

@@ -23,10 +27,13 @@ final class KafkaConnectBatchProcessSpanLinksExtractor
2327
}
2428

2529
@Override
26-
public void extract(SpanLinksBuilder spanLinks, Context parentContext, KafkaConnectTask task) {
30+
public void extract(
31+
SpanLinksBuilder spanLinks, Context parentContext, KafkaConnectTask request) {
2732

28-
for (SinkRecord record : task.getRecords()) {
29-
singleRecordLinkExtractor.extract(spanLinks, parentContext, record);
33+
for (SinkRecord record : request.getRecords()) {
34+
// Create a link to each record's original trace context
35+
// Using Context.root() to avoid linking to the current span
36+
singleRecordLinkExtractor.extract(spanLinks, Context.root(), record);
3037
}
3138
}
3239
}

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import io.opentelemetry.api.GlobalOpenTelemetry;
99
import io.opentelemetry.context.propagation.TextMapGetter;
1010
import io.opentelemetry.context.propagation.TextMapPropagator;
11+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
12+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
13+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
1114
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
1215
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
1316
import org.apache.kafka.connect.sink.SinkRecord;
@@ -21,11 +24,22 @@ public final class KafkaConnectSingletons {
2124
private static final TextMapGetter<SinkRecord> SINK_RECORD_HEADER_GETTER =
2225
SinkRecordHeadersGetter.INSTANCE;
2326

24-
private static final Instrumenter<KafkaConnectTask, Void> INSTRUMENTER =
25-
Instrumenter.<KafkaConnectTask, Void>builder(
26-
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, KafkaConnectTask::getSpanName)
27-
.addSpanLinksExtractor(new KafkaConnectBatchProcessSpanLinksExtractor(PROPAGATOR))
28-
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
27+
private static final Instrumenter<KafkaConnectTask, Void> INSTRUMENTER;
28+
29+
static {
30+
KafkaConnectBatchProcessSpanLinksExtractor spanLinksExtractor = new KafkaConnectBatchProcessSpanLinksExtractor(PROPAGATOR);
31+
32+
INSTRUMENTER =
33+
Instrumenter.<KafkaConnectTask, Void>builder(
34+
GlobalOpenTelemetry.get(),
35+
INSTRUMENTATION_NAME,
36+
MessagingSpanNameExtractor.create(KafkaConnectAttributesGetter.INSTANCE, MessageOperation.PROCESS))
37+
.addAttributesExtractor(
38+
MessagingAttributesExtractor.builder(KafkaConnectAttributesGetter.INSTANCE, MessageOperation.PROCESS)
39+
.build())
40+
.addSpanLinksExtractor(spanLinksExtractor)
41+
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
42+
}
2943

3044
public static Instrumenter<KafkaConnectTask, Void> instrumenter() {
3145
return INSTRUMENTER;

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectTask.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@ public Collection<SinkRecord> getRecords() {
2020
return records;
2121
}
2222

23-
public static String getSpanName(KafkaConnectTask task) {
24-
return "KafkaConnect.put";
23+
/**
24+
* Returns the first record in the batch, used for extracting destination information.
25+
* Kafka Connect processes records in batches, but all records in a batch typically
26+
* come from the same topic, so we use the first record for span naming.
27+
*/
28+
public SinkRecord getFirstRecord() {
29+
return records.isEmpty() ? null : records.iterator().next();
2530
}
2631
}

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkRecordHeadersGetter.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,37 +7,42 @@
77

88
import io.opentelemetry.context.propagation.TextMapGetter;
99
import java.nio.charset.StandardCharsets;
10-
import java.util.ArrayList;
11-
import java.util.List;
10+
import java.util.stream.StreamSupport;
1211
import javax.annotation.Nullable;
1312
import org.apache.kafka.connect.header.Header;
14-
import org.apache.kafka.connect.header.Headers;
1513
import org.apache.kafka.connect.sink.SinkRecord;
1614

15+
/**
16+
* Extracts trace context from Kafka Connect SinkRecord headers for distributed tracing.
17+
* This enables proper trace propagation from the original producer through Kafka Connect processing.
18+
*/
1719
enum SinkRecordHeadersGetter implements TextMapGetter<SinkRecord> {
1820
INSTANCE;
1921

2022
@Override
2123
public Iterable<String> keys(SinkRecord record) {
22-
Headers headers = record.headers();
23-
List<String> keys = new ArrayList<>();
24-
for (Header header : headers) {
25-
keys.add(header.key());
24+
if (record.headers() == null) {
25+
return java.util.Collections.emptyList();
2626
}
27-
return keys;
27+
28+
return StreamSupport.stream(record.headers().spliterator(), false)
29+
.map(Header::key)
30+
.collect(java.util.stream.Collectors.toList());
2831
}
2932

30-
@Nullable
3133
@Override
34+
@Nullable
3235
public String get(@Nullable SinkRecord record, String key) {
33-
if (record == null) {
36+
if (record == null || record.headers() == null) {
3437
return null;
3538
}
36-
Headers headers = record.headers();
37-
Header header = headers.lastWithName(key);
38-
if (header == null) {
39+
40+
Header header = record.headers().lastWithName(key);
41+
if (header == null || header.value() == null) {
3942
return null;
4043
}
44+
45+
// Convert header value to string
4146
Object value = header.value();
4247
if (value instanceof byte[]) {
4348
return new String((byte[]) value, StandardCharsets.UTF_8);

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
77

8+
import static io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6.KafkaConnectSingletons.instrumenter;
89
import static net.bytebuddy.matcher.ElementMatchers.hasSuperType;
910
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
1011
import static net.bytebuddy.matcher.ElementMatchers.named;
@@ -58,11 +59,11 @@ public static void onEnter(
5859
}
5960

6061
task = new KafkaConnectTask(records);
61-
if (!KafkaConnectSingletons.instrumenter().shouldStart(parentContext, task)) {
62+
if (!instrumenter().shouldStart(parentContext, task)) {
6263
return;
6364
}
6465

65-
context = KafkaConnectSingletons.instrumenter().start(parentContext, task);
66+
context = instrumenter().start(parentContext, task);
6667
scope = context.makeCurrent();
6768
}
6869

@@ -77,7 +78,7 @@ public static void onExit(
7778
return;
7879
}
7980
scope.close();
80-
KafkaConnectSingletons.instrumenter().end(context, task, null, throwable);
81+
instrumenter().end(context, task, null, throwable);
8182
}
8283
}
8384
}

instrumentation/kafka/kafka-connect-2.6/testing/build.gradle.kts

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,36 +6,23 @@ plugins {
66
dependencies {
77
api(project(":testing-common"))
88

9-
implementation("org.apache.kafka:kafka-clients:0.11.0.0")
9+
testImplementation("org.apache.kafka:kafka-clients:3.6.1")
1010

11-
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common-0.11:library"))
12-
implementation(project(":instrumentation:kafka:kafka-connect-2.6:javaagent"))
13-
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent"))
14-
15-
testImplementation("org.apache.kafka:connect-runtime:3.6.1")
16-
testImplementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:testing"))
17-
18-
implementation("org.testcontainers:postgresql:1.21.3") // For PostgreSQLContainer
11+
testImplementation("org.testcontainers:postgresql:1.21.3") // For PostgreSQLContainer
1912
testImplementation("org.postgresql:postgresql:42.7.2") // PostgreSQL JDBC driver
20-
implementation("org.testcontainers:mongodb:1.21.3") // For MongoDBContainer
13+
testImplementation("org.testcontainers:mongodb:1.21.3") // For MongoDBContainer
2114
testImplementation("org.mongodb:mongodb-driver-sync:4.11.0") // MongoDB Java driver
2215
testImplementation("org.apache.httpcomponents:httpclient") // For HttpStatus (not httpcore)
2316

2417
testImplementation("org.mockito:mockito-junit-jupiter:4.11.0")
2518

2619
// Testcontainers dependencies for integration testing
27-
testImplementation("io.strimzi:strimzi-test-container:0.111.0")
28-
implementation("org.testcontainers:junit-jupiter")
20+
testImplementation("org.testcontainers:junit-jupiter")
2921
testImplementation("org.awaitility:awaitility")
3022
testImplementation("com.squareup.okhttp3:okhttp:4.12.0")
3123
testImplementation("org.testcontainers:testcontainers:1.19.7")
3224
testImplementation("org.testcontainers:kafka:1.19.7")
33-
implementation("io.rest-assured:rest-assured:5.5.5")
25+
testImplementation("io.rest-assured:rest-assured:5.5.5")
3426
testImplementation("org.junit.jupiter:junit-jupiter:5.10.2")
35-
testImplementation("org.testcontainers:junit-jupiter")
36-
implementation("com.fasterxml.jackson.core:jackson-databind")
37-
testImplementation("com.squareup.okhttp3:okhttp:4.12.0")
38-
39-
compileOnly("com.google.auto.value:auto-value-annotations")
40-
annotationProcessor("com.google.auto.value:auto-value")
27+
testImplementation("com.fasterxml.jackson.core:jackson-databind")
4128
}

0 commit comments

Comments
 (0)