Skip to content

Commit 3fec0f6

Browse files
committed
review
1 parent 0e18b9e commit 3fec0f6

File tree

5 files changed

+168
-146
lines changed

5 files changed

+168
-146
lines changed

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamReflectionUtil.java

Lines changed: 0 additions & 112 deletions
This file was deleted.

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
import com.google.common.collect.ImmutableMap;
1212
import io.opentelemetry.api.common.AttributeKey;
13-
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
1413
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
1514
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
1615
import java.time.Duration;
@@ -33,6 +32,7 @@
3332
import org.apache.kafka.common.serialization.IntegerSerializer;
3433
import org.apache.kafka.common.serialization.StringDeserializer;
3534
import org.apache.kafka.common.serialization.StringSerializer;
35+
import org.junit.jupiter.api.AfterAll;
3636
import org.junit.jupiter.api.BeforeAll;
3737
import org.junit.jupiter.api.extension.RegisterExtension;
3838
import org.testcontainers.containers.wait.strategy.Wait;
@@ -44,8 +44,6 @@ abstract class KafkaStreamsBaseTest {
4444
@RegisterExtension
4545
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
4646

47-
@RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();
48-
4947
protected static final AttributeKey<String> MESSAGING_CLIENT_ID =
5048
AttributeKey.stringKey("messaging.client_id");
5149
protected static final String STREAM_PENDING = "test.pending";
@@ -64,22 +62,20 @@ static void setup() throws ExecutionException, InterruptedException, TimeoutExce
6462
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.Kafka.*Server\\).*", 1))
6563
.withStartupTimeout(Duration.ofMinutes(1));
6664
kafka.start();
67-
cleanup.deferCleanup(kafka::stop);
6865

6966
// create test topic
70-
AdminClient adminClient =
71-
AdminClient.create(ImmutableMap.of("bootstrap.servers", kafka.getBootstrapServers()));
72-
cleanup.deferCleanup(adminClient);
73-
adminClient
74-
.createTopics(
75-
asList(
76-
new NewTopic(STREAM_PENDING, 1, (short) 1),
77-
new NewTopic(STREAM_PROCESSED, 1, (short) 1)))
78-
.all()
79-
.get(10, TimeUnit.SECONDS);
67+
try (AdminClient adminClient =
68+
AdminClient.create(ImmutableMap.of("bootstrap.servers", kafka.getBootstrapServers()))) {
69+
adminClient
70+
.createTopics(
71+
asList(
72+
new NewTopic(STREAM_PENDING, 1, (short) 1),
73+
new NewTopic(STREAM_PROCESSED, 1, (short) 1)))
74+
.all()
75+
.get(10, TimeUnit.SECONDS);
76+
}
8077

8178
producer = new KafkaProducer<>(producerProps(kafka.getBootstrapServers()));
82-
cleanup.deferCleanup(producer);
8379

8480
Map<String, Object> consumerProps =
8581
ImmutableMap.of(
@@ -98,7 +94,6 @@ static void setup() throws ExecutionException, InterruptedException, TimeoutExce
9894
"value.deserializer",
9995
StringDeserializer.class);
10096
consumer = new KafkaConsumer<>(consumerProps);
101-
cleanup.deferCleanup(consumer);
10297
consumer.subscribe(
10398
singleton(STREAM_PROCESSED),
10499
new ConsumerRebalanceListener() {
@@ -112,6 +107,13 @@ public void onPartitionsAssigned(Collection<TopicPartition> collection) {
112107
});
113108
}
114109

110+
@AfterAll
111+
static void cleanup() {
112+
consumer.close();
113+
producer.close();
114+
kafka.stop();
115+
}
116+
115117
static Map<String, Object> producerProps(String servers) {
116118
// values copied from spring's KafkaTestUtils
117119
return ImmutableMap.of(

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,17 +56,17 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
5656
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
5757

5858
// CONFIGURE PROCESSOR
59-
Object builder = KafkaStreamReflectionUtil.createBuilder();
60-
KStream<Integer, String> textLines = KafkaStreamReflectionUtil.stream(builder, STREAM_PENDING);
59+
KafkaStreamsReflectionUtil.StreamBuilder streamBuilder =
60+
KafkaStreamsReflectionUtil.createBuilder();
61+
KStream<Integer, String> textLines = streamBuilder.stream(STREAM_PENDING);
6162
KStream<Integer, String> values =
6263
textLines.mapValues(
6364
textLine -> {
6465
Span.current().setAttribute("asdf", "testing");
6566
return textLine.toLowerCase(Locale.ROOT);
6667
});
6768

68-
KafkaStreams streams =
69-
KafkaStreamReflectionUtil.createStreams(builder, values, config, STREAM_PROCESSED);
69+
KafkaStreams streams = streamBuilder.createStreams(values, config, STREAM_PROCESSED);
7070
streams.start();
7171

7272
String greeting = "TESTING TESTING 123!";
@@ -104,7 +104,9 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
104104
.hasKind(SpanKind.PRODUCER)
105105
.hasNoParent()
106106
.hasAttributesSatisfyingExactly(
107-
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
107+
equalTo(
108+
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
109+
MessagingIncubatingAttributes.MessagingSystemIncubatingValues.KAFKA),
108110
equalTo(
109111
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
110112
STREAM_PENDING),
@@ -125,7 +127,10 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
125127
List<AttributeAssertion> assertions =
126128
new ArrayList<>(
127129
asList(
128-
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
130+
equalTo(
131+
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
132+
MessagingIncubatingAttributes.MessagingSystemIncubatingValues
133+
.KAFKA),
129134
equalTo(
130135
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
131136
STREAM_PENDING),
@@ -149,7 +154,10 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
149154
List<AttributeAssertion> assertions =
150155
new ArrayList<>(
151156
asList(
152-
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
157+
equalTo(
158+
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
159+
MessagingIncubatingAttributes.MessagingSystemIncubatingValues
160+
.KAFKA),
153161
equalTo(
154162
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
155163
STREAM_PENDING),
@@ -187,7 +195,9 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
187195
.hasKind(SpanKind.PRODUCER)
188196
.hasParent(trace.getSpan(1))
189197
.hasAttributesSatisfyingExactly(
190-
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
198+
equalTo(
199+
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
200+
MessagingIncubatingAttributes.MessagingSystemIncubatingValues.KAFKA),
191201
equalTo(
192202
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
193203
STREAM_PROCESSED),
@@ -208,7 +218,10 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
208218
List<AttributeAssertion> assertions =
209219
new ArrayList<>(
210220
asList(
211-
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
221+
equalTo(
222+
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
223+
MessagingIncubatingAttributes.MessagingSystemIncubatingValues
224+
.KAFKA),
212225
equalTo(
213226
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
214227
STREAM_PROCESSED),
@@ -231,7 +244,10 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
231244
List<AttributeAssertion> assertions =
232245
new ArrayList<>(
233246
asList(
234-
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
247+
equalTo(
248+
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
249+
MessagingIncubatingAttributes.MessagingSystemIncubatingValues
250+
.KAFKA),
235251
equalTo(
236252
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
237253
STREAM_PROCESSED),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.kafkastreams;
7+
8+
import java.lang.reflect.Constructor;
9+
import java.lang.reflect.Method;
10+
import java.util.Properties;
11+
import org.apache.kafka.common.serialization.Serde;
12+
import org.apache.kafka.common.serialization.Serdes;
13+
import org.apache.kafka.streams.KafkaStreams;
14+
import org.apache.kafka.streams.kstream.KStream;
15+
16+
/**
17+
* Kafka streams reflection util which is used to be compatible with different versions of kafka
18+
* streams.
19+
*/
20+
class KafkaStreamsReflectionUtil {
21+
22+
private KafkaStreamsReflectionUtil() {}
23+
24+
static class StreamBuilder {
25+
private final Object builder;
26+
27+
StreamBuilder(Object builder) {
28+
this.builder = builder;
29+
}
30+
31+
@SuppressWarnings("unchecked")
32+
KStream<Integer, String> stream(String topic)
33+
throws Exception { // Different api for test and latestDepTest.
34+
Method method;
35+
Object[] arguments;
36+
try {
37+
// equivalent to:
38+
// ((org.apache.kafka.streams.kstream.KStreamBuilder)builder).stream(STREAM_PENDING);
39+
method = builder.getClass().getMethod("stream", String[].class);
40+
String[] topics = new String[] {topic};
41+
arguments = new Object[] {topics};
42+
} catch (Exception exception) {
43+
// equivalent to:
44+
// ((org.apache.kafka.streams.StreamsBuilder)builder).stream(STREAM_PENDING);
45+
method = builder.getClass().getMethod("stream", String.class);
46+
arguments = new Object[] {topic};
47+
}
48+
49+
return (KStream<Integer, String>) method.invoke(builder, arguments);
50+
}
51+
52+
KafkaStreams createStreams(KStream<Integer, String> values, Properties config, String topic)
53+
throws Exception {
54+
Constructor<?> constructor;
55+
// Different api for test and latestDepTest.
56+
try {
57+
// equivalent to:
58+
// values.to(Serdes.Integer(), Serdes.String(), STREAM_PROCESSED);
59+
// return new KafkaStreams(builder, config);
60+
KStream.class
61+
.getMethod("to", Serde.class, Serde.class, String.class)
62+
.invoke(values, Serdes.Integer(), Serdes.String(), topic);
63+
64+
Class<?> topologyBuilderClass =
65+
Class.forName("org.apache.kafka.streams.processor.TopologyBuilder");
66+
constructor = KafkaStreams.class.getConstructor(topologyBuilderClass, Properties.class);
67+
} catch (Exception exception) {
68+
constructor = null;
69+
}
70+
if (constructor != null) {
71+
return (KafkaStreams) constructor.newInstance(builder, config);
72+
}
73+
74+
// equivalent to:
75+
// Produced<Integer, String> produced = Produced.with(Serdes.Integer(), Serdes.String());
76+
// values.to(STREAM_PROCESSED, produced);
77+
//
78+
// Topology topology = builder.build();
79+
// new KafkaStreams(topology, props);
80+
Class<?> producedClass = Class.forName("org.apache.kafka.streams.kstream.Produced");
81+
Method producedWith = producedClass.getMethod("with", Serde.class, Serde.class);
82+
Object producer = producedWith.invoke(null, Serdes.Integer(), Serdes.String());
83+
84+
KStream.class.getMethod("to", String.class, producedClass).invoke(values, topic, producer);
85+
86+
Object topology = builder.getClass().getMethod("build").invoke(builder);
87+
88+
Class<?> topologyClass = Class.forName("org.apache.kafka.streams.Topology");
89+
constructor = KafkaStreams.class.getConstructor(topologyClass, Properties.class);
90+
91+
return (KafkaStreams) constructor.newInstance(topology, config);
92+
}
93+
}
94+
95+
static StreamBuilder createBuilder() throws Exception {
96+
Class<?> builderClass;
97+
try {
98+
// Different class names for test and latestDepTest.
99+
builderClass = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder");
100+
} catch (Exception e) {
101+
builderClass = Class.forName("org.apache.kafka.streams.StreamsBuilder");
102+
}
103+
return new StreamBuilder(builderClass.getConstructor().newInstance());
104+
}
105+
}

0 commit comments

Comments
 (0)