Skip to content

Commit 20c54bf

Browse files
committed
wip: reactor to remove comments and unwanted dependencies from gradle file.
1 parent 780ba48 commit 20c54bf

File tree

9 files changed

+185
-165
lines changed

9 files changed

+185
-165
lines changed

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectAttributesGetter.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,6 @@ public String getConversationId(KafkaConnectTask request) {
5454
@Nullable
5555
@Override
5656
public Long getMessageBodySize(KafkaConnectTask request) {
57-
// SinkRecord doesn't expose serialized size information
58-
// This would need to be calculated from the actual value, but that's expensive
59-
// and not typically done in messaging instrumentations for batch processing
6057
return null;
6158
}
6259

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectBatchProcessSpanLinksExtractor.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,6 @@
1212
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
1313
import org.apache.kafka.connect.sink.SinkRecord;
1414

15-
/**
16-
* Extracts span links from Kafka Connect SinkRecord headers for batch processing scenarios. This
17-
* ensures that when processing a batch of records that may come from different traces, we create
18-
* links to all the original trace contexts rather than losing them.
19-
*/
2015
final class KafkaConnectBatchProcessSpanLinksExtractor
2116
implements SpanLinksExtractor<KafkaConnectTask> {
2217

@@ -31,8 +26,6 @@ final class KafkaConnectBatchProcessSpanLinksExtractor
3126
public void extract(SpanLinksBuilder spanLinks, Context parentContext, KafkaConnectTask request) {
3227

3328
for (SinkRecord record : request.getRecords()) {
34-
// Create a link to each record's original trace context
35-
// Using Context.root() to avoid linking to the current span
3629
singleRecordLinkExtractor.extract(spanLinks, Context.root(), record);
3730
}
3831
}

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectTask.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,30 +23,16 @@ public Collection<SinkRecord> getRecords() {
2323
return records;
2424
}
2525

26-
/**
27-
* Returns the first record in the batch, used for extracting destination information. Note:
28-
* Records in a batch can come from multiple topics, so this should only be used when you need a
29-
* representative record, not for definitive topic information.
30-
*/
3126
public SinkRecord getFirstRecord() {
3227
return records.isEmpty() ? null : records.iterator().next();
3328
}
3429

35-
/**
36-
* Returns all unique topic names present in this batch of records. This provides accurate
37-
* information about all topics being processed.
38-
*/
3930
public Set<String> getTopics() {
4031
return records.stream()
4132
.map(SinkRecord::topic)
4233
.collect(Collectors.toCollection(LinkedHashSet::new));
4334
}
4435

45-
/**
46-
* Returns a single topic name if all records are from the same topic, or a formatted string
47-
* representing multiple topics if they differ. This is useful for span naming that needs to be
48-
* concise but informative.
49-
*/
5036
public String getDestinationName() {
5137
Set<String> topics = getTopics();
5238
if (topics.isEmpty()) {
@@ -55,7 +41,6 @@ public String getDestinationName() {
5541
if (topics.size() == 1) {
5642
return topics.iterator().next();
5743
}
58-
// For multiple topics, create a descriptive name
5944
return topics.stream().collect(Collectors.joining(",", "[", "]"));
6045
}
6146
}

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkRecordHeadersGetter.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@
1212
import org.apache.kafka.connect.header.Header;
1313
import org.apache.kafka.connect.sink.SinkRecord;
1414

15-
/**
16-
* Extracts trace context from Kafka Connect SinkRecord headers for distributed tracing. This
17-
* enables proper trace propagation from the original producer through Kafka Connect processing.
18-
*/
1915
enum SinkRecordHeadersGetter implements TextMapGetter<SinkRecord> {
2016
INSTANCE;
2117

@@ -42,7 +38,6 @@ public String get(@Nullable SinkRecord record, String key) {
4238
return null;
4339
}
4440

45-
// Convert header value to string
4641
Object value = header.value();
4742
if (value instanceof byte[]) {
4843
return new String((byte[]) value, StandardCharsets.UTF_8);

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,6 @@ public static void onEnter(
4646
@Advice.Local("otelContext") Context context,
4747
@Advice.Local("otelScope") Scope scope) {
4848

49-
// For batch processing from multiple traces, use root context instead of extracting
50-
// from first record to avoid creating incorrect parent-child relationships.
51-
// Relationships are maintained through span links (see
52-
// KafkaConnectBatchProcessSpanLinksExtractor)
5349
Context parentContext = Java8BytecodeBridge.currentContext();
5450

5551
task = new KafkaConnectTask(records);

instrumentation/kafka/kafka-connect-2.6/testing/build.gradle.kts

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,11 @@ plugins {
44
id("otel.java-conventions")
55
}
66

7-
// Smoke test pattern - get the agent jar
87
val agentShadowJar = project(":javaagent").tasks.named<ShadowJar>("shadowJar")
98

109
dependencies {
11-
// Add smoke-tests dependency for SmokeTestInstrumentationExtension
1210
testImplementation(project(":smoke-tests"))
13-
// Add testing-common manually since we removed otel.javaagent-testing plugin
1411
testImplementation(project(":testing-common"))
15-
// Add SDK testing assertions for structured trace verification
1612
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
1713
testImplementation("org.apache.kafka:kafka-clients:3.6.1")
1814

@@ -35,18 +31,7 @@ dependencies {
3531
testImplementation("com.fasterxml.jackson.core:jackson-databind")
3632
}
3733

38-
// Configure test tasks to have access to agent jar
3934
tasks.withType<Test>().configureEach {
4035
dependsOn(agentShadowJar)
41-
42-
// Make agent jar path available to tests
4336
systemProperty("io.opentelemetry.smoketest.agent.shadowJar.path", agentShadowJar.get().archiveFile.get().toString())
44-
// Configure test JVM with agent for end-to-end tracing
45-
jvmArgs(
46-
"-javaagent:${agentShadowJar.get().archiveFile.get()}",
47-
"-Dotel.traces.exporter=otlp",
48-
"-Dotel.metrics.exporter=none",
49-
"-Dotel.logs.exporter=none",
50-
"-Dotel.service.name=kafka-connect-test-producer"
51-
)
5237
}

instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/KafkaConnectSinkTaskTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public abstract class KafkaConnectSinkTaskTestBase implements TelemetryRetriever
5959
@RegisterExtension
6060
protected static final InstrumentationExtension testing = SmokeTestInstrumentationExtension.create();
6161

62-
private static final Logger logger = LoggerFactory.getLogger(KafkaConnectSinkTaskTestBase.class);
62+
protected static final Logger logger = LoggerFactory.getLogger(KafkaConnectSinkTaskTestBase.class);
6363

6464
// Using the same fake backend pattern as smoke tests (with ARM64 support)
6565
protected static GenericContainer<?> backend;

instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/MongoKafkaConnectSinkTaskTest.java

Lines changed: 39 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import org.apache.kafka.common.serialization.StringSerializer;
3030
import org.bson.Document;
3131
import org.junit.jupiter.api.Test;
32-
import org.slf4j.Logger;
33-
import org.slf4j.LoggerFactory;
3432
import org.testcontainers.containers.MongoDBContainer;
3533
import org.testcontainers.junit.jupiter.Testcontainers;
3634
import org.testcontainers.lifecycle.Startables;
@@ -39,20 +37,15 @@
3937

4038
@Testcontainers
4139
class MongoKafkaConnectSinkTaskTest extends KafkaConnectSinkTaskTestBase {
42-
43-
private static final Logger logger = LoggerFactory.getLogger(MongoKafkaConnectSinkTaskTest.class);
44-
4540
// MongoDB-specific constants
4641
private static final String MONGO_NETWORK_ALIAS = "mongodb";
4742
private static final String DB_NAME = "testdb";
4843
private static final String COLLECTION_NAME = "person";
4944
private static final String CONNECTOR_NAME = "test-mongo-connector";
5045
private static final String TOPIC_NAME = "test-mongo-topic";
5146

52-
// MongoDB container
5347
private static MongoDBContainer mongoDB;
5448

55-
// Override abstract methods from base class
5649
@Override
5750
protected void setupDatabaseContainer() {
5851
mongoDB =
@@ -102,17 +95,12 @@ protected String getTopicName() {
10295
@Test
10396
public void testKafkaConnectMongoSinkTaskInstrumentation()
10497
throws IOException, InterruptedException {
105-
// Use base topic name for consistent span naming
10698
String uniqueTopicName = TOPIC_NAME;
107-
108-
// Setup Kafka Connect MongoDB Sink connector
10999
setupMongoSinkConnector(uniqueTopicName);
110100

111-
// Create topic and wait for availability
112101
createTopic(uniqueTopicName);
113102
awaitForTopicCreation(uniqueTopicName);
114103

115-
// Produce a test message
116104
Properties props = new Properties();
117105
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers());
118106
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@@ -123,20 +111,12 @@ public void testKafkaConnectMongoSinkTaskInstrumentation()
123111
new ProducerRecord<>(uniqueTopicName, "test-key", "{\"id\":1,\"name\":\"TestUser\"}"));
124112
producer.flush();
125113
}
126-
127-
// Wait for message processing (increased timeout for ARM64 Docker emulation)
128114
await().atMost(Duration.ofSeconds(60)).until(() -> getRecordCountFromMongo() >= 1);
129-
130-
// Use SmokeTestInstrumentationExtension's testing framework to wait for and assert traces
131-
// Wait for traces and then find the specific trace we want
132115
await()
133116
.atMost(Duration.ofSeconds(30))
134117
.untilAsserted(
135118
() -> {
136119
List<List<SpanData>> traces = testing.waitForTraces(1);
137-
138-
// Find the trace that contains our Kafka Connect Consumer span and database INSERT
139-
// span
140120
List<SpanData> targetTrace =
141121
traces.stream()
142122
.filter(
@@ -163,14 +143,10 @@ public void testKafkaConnectMongoSinkTaskInstrumentation()
163143
.findFirst()
164144
.orElse(null);
165145

166-
// Assert that we found the target trace
167146
assertThat(targetTrace).isNotNull();
168147

169-
// Assert on the spans in the target trace (should have at least 2 spans: Kafka
170-
// Connect Consumer + database operations)
171148
assertThat(targetTrace).hasSizeGreaterThanOrEqualTo(2);
172149

173-
// Find and assert the Kafka Connect Consumer span
174150
SpanData kafkaConnectSpan =
175151
targetTrace.stream()
176152
.filter(
@@ -182,7 +158,6 @@ public void testKafkaConnectMongoSinkTaskInstrumentation()
182158
assertThat(kafkaConnectSpan).isNotNull();
183159
assertThat(kafkaConnectSpan.getParentSpanContext().isValid()).isFalse(); // No parent
184160

185-
// Find and assert the database UPDATE span
186161
SpanData insertSpan =
187162
targetTrace.stream()
188163
.filter(
@@ -198,30 +173,25 @@ public void testKafkaConnectMongoSinkTaskInstrumentation()
198173
@Test
199174
public void testKafkaConnectMongoSinkTaskMultiTopicInstrumentation()
200175
throws IOException, InterruptedException {
201-
// Create multiple topic names for consistent span naming
202176
String topicName1 = TOPIC_NAME + "-1";
203177
String topicName2 = TOPIC_NAME + "-2";
204178
String topicName3 = TOPIC_NAME + "-3";
205179

206-
// Setup Kafka Connect MongoDB Sink connector with multiple topics
207180
setupMongoSinkConnectorMultiTopic(topicName1, topicName2, topicName3);
208181

209-
// Create all topics and wait for availability
210182
createTopic(topicName1);
211183
createTopic(topicName2);
212184
createTopic(topicName3);
213185
awaitForTopicCreation(topicName1);
214186
awaitForTopicCreation(topicName2);
215187
awaitForTopicCreation(topicName3);
216188

217-
// Produce test messages to different topics
218189
Properties props = new Properties();
219190
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers());
220191
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
221192
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
222193

223194
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
224-
// Send messages to different topics
225195
producer.send(
226196
new ProducerRecord<>(
227197
topicName1, "key1", "{\"id\":1,\"name\":\"User1\",\"source\":\"topic1\"}"));
@@ -234,78 +204,52 @@ public void testKafkaConnectMongoSinkTaskMultiTopicInstrumentation()
234204
producer.flush();
235205
}
236206

237-
// Wait for message processing (increased timeout for ARM64 Docker emulation)
238207
await().atMost(Duration.ofSeconds(60)).until(() -> getRecordCountFromMongo() >= 3);
239208

240-
// Use SmokeTestInstrumentationExtension's testing framework to wait for and assert traces
241-
// Wait for traces and then find the specific trace we want
242209
await()
243210
.atMost(Duration.ofSeconds(30))
244211
.untilAsserted(
245212
() -> {
213+
// Wait for at least 1 trace (could be 1 batch trace or multiple individual traces)
246214
List<List<SpanData>> traces = testing.waitForTraces(1);
247-
248-
// Find the trace that contains our Kafka Connect Consumer span and database INSERT
249-
// span
250-
List<SpanData> targetTrace =
251-
traces.stream()
252-
.filter(
253-
trace -> {
254-
boolean hasKafkaConnectSpan =
255-
trace.stream()
256-
.anyMatch(
257-
span ->
258-
(span.getName().contains(topicName1)
259-
|| span.getName().contains(topicName2)
260-
|| span.getName().contains(topicName3))
261-
&& span.getKind()
262-
== io.opentelemetry.api.trace.SpanKind
263-
.CONSUMER);
264-
265-
boolean hasInsertSpan =
266-
trace.stream()
267-
.anyMatch(
268-
span ->
269-
span.getName().contains("update")
270-
&& span.getKind()
271-
== io.opentelemetry.api.trace.SpanKind.CLIENT);
272-
273-
return hasKafkaConnectSpan && hasInsertSpan;
274-
})
275-
.findFirst()
276-
.orElse(null);
277-
278-
// Assert that we found the target trace
279-
assertThat(targetTrace).isNotNull();
280-
281-
// Assert on the spans in the target trace (should have at least 2 spans: Kafka
282-
// Connect Consumer + database operations)
283-
assertThat(targetTrace).hasSizeGreaterThanOrEqualTo(2);
284-
285-
// Find and assert the Kafka Connect Consumer span (multi-topic span)
286-
SpanData kafkaConnectSpan =
287-
targetTrace.stream()
288-
.filter(
289-
span ->
290-
(span.getName().contains(topicName1)
291-
|| span.getName().contains(topicName2)
292-
|| span.getName().contains(topicName3))
293-
&& span.getKind() == io.opentelemetry.api.trace.SpanKind.CONSUMER)
294-
.findFirst()
295-
.orElse(null);
296-
assertThat(kafkaConnectSpan).isNotNull();
297-
assertThat(kafkaConnectSpan.getParentSpanContext().isValid()).isFalse(); // No parent
298-
299-
// Find and assert the database UPDATE span
300-
SpanData insertSpan =
301-
targetTrace.stream()
302-
.filter(
303-
span ->
304-
span.getName().contains("update")
305-
&& span.getKind() == io.opentelemetry.api.trace.SpanKind.CLIENT)
306-
.findFirst()
307-
.orElse(null);
308-
assertThat(insertSpan).isNotNull();
215+
216+
// Count total Kafka Connect consumer spans and database update spans across all traces
217+
long totalKafkaConnectSpans = traces.stream()
218+
.flatMap(trace -> trace.stream())
219+
.filter(span ->
220+
(span.getName().contains(topicName1) ||
221+
span.getName().contains(topicName2) ||
222+
span.getName().contains(topicName3))
223+
&& span.getKind() == io.opentelemetry.api.trace.SpanKind.CONSUMER)
224+
.count();
225+
226+
long totalUpdateSpans = traces.stream()
227+
.flatMap(trace -> trace.stream())
228+
.filter(span ->
229+
span.getName().contains("update")
230+
&& span.getKind() == io.opentelemetry.api.trace.SpanKind.CLIENT)
231+
.count();
232+
233+
assertThat(totalKafkaConnectSpans).isGreaterThanOrEqualTo(1);
234+
assertThat(totalUpdateSpans).isGreaterThanOrEqualTo(3);
235+
236+
boolean hasMultiTopicSpan = traces.stream()
237+
.flatMap(trace -> trace.stream())
238+
.anyMatch(span ->
239+
span.getName().contains("[") && span.getName().contains("]") &&
240+
span.getName().contains("process") &&
241+
span.getKind() == io.opentelemetry.api.trace.SpanKind.CONSUMER);
242+
243+
boolean hasIndividualSpans = traces.stream()
244+
.flatMap(trace -> trace.stream())
245+
.anyMatch(span ->
246+
(span.getName().contains(topicName1) ||
247+
span.getName().contains(topicName2) ||
248+
span.getName().contains(topicName3)) &&
249+
!span.getName().contains("[") &&
250+
span.getKind() == io.opentelemetry.api.trace.SpanKind.CONSUMER);
251+
252+
assertThat(hasMultiTopicSpan || hasIndividualSpans).isTrue();
309253
});
310254
}
311255

0 commit comments

Comments
 (0)