Skip to content

Commit d87ca4c

Browse files
committed
fix ut on latestDep
1 parent ec2ff35 commit d87ca4c

File tree

3 files changed

+118
-153
lines changed

3 files changed

+118
-153
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.kafkastreams;
7+
8+
import java.lang.reflect.Constructor;
9+
import java.lang.reflect.InvocationTargetException;
10+
import java.lang.reflect.Method;
11+
import java.util.Properties;
12+
import org.apache.kafka.common.serialization.Serde;
13+
import org.apache.kafka.common.serialization.Serdes;
14+
import org.apache.kafka.streams.KafkaStreams;
15+
import org.apache.kafka.streams.kstream.KStream;
16+
17+
/**
18+
* kafka stream reflection util which is used to compatible with different versions of kafka stream
19+
*/
20+
class KafkaStreamReflectionUtil {
21+
22+
private KafkaStreamReflectionUtil() {}
23+
24+
@SuppressWarnings("ClassNewInstance")
25+
static Object createBuilder()
26+
throws InstantiationException, IllegalAccessException, ClassNotFoundException {
27+
try {
28+
// Different class names for test and latestDepTest.
29+
return Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance();
30+
} catch (ClassNotFoundException | NoClassDefFoundError e) {
31+
return Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance();
32+
}
33+
}
34+
35+
@SuppressWarnings("unchecked")
36+
static KStream<Integer, String> stream(Object builder, String topic)
37+
throws IllegalAccessException,
38+
InvocationTargetException,
39+
NoSuchMethodException,
40+
ClassNotFoundException {
41+
// Different api for test and latestDepTest.
42+
try {
43+
// equivalent to:
44+
// ((org.apache.kafka.streams.kstream.KStreamBuilder)builder).stream(STREAM_PENDING);
45+
return (KStream<Integer, String>)
46+
Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder")
47+
.getMethod("stream", String.class)
48+
.invoke(builder, topic);
49+
} catch (ClassNotFoundException e) {
50+
// equivalent to:
51+
// ((org.apache.kafka.streams.StreamsBuilder)builder).stream(STREAM_PENDING);
52+
return (KStream<Integer, String>)
53+
Class.forName("org.apache.kafka.streams.StreamsBuilder")
54+
.getMethod("stream", String.class)
55+
.invoke(builder, topic);
56+
}
57+
}
58+
59+
static KafkaStreams createStreams(
60+
Object builder, KStream<Integer, String> values, Properties config, String topic)
61+
throws ClassNotFoundException,
62+
NoSuchMethodException,
63+
InvocationTargetException,
64+
IllegalAccessException,
65+
InstantiationException {
66+
// Different api for test and latestDepTest.
67+
try {
68+
// equivalent to:
69+
// values.to(Serdes.Integer(), Serdes.String(), STREAM_PROCESSED);
70+
// return new KafkaStreams(builder, config);
71+
Class<?> ksteamClass = Class.forName("org.apache.kafka.streams.kstream.KStream");
72+
ksteamClass
73+
.getMethod("to", Serde.class, Serde.class, String.class)
74+
.invoke(values, Serdes.Integer(), Serdes.String(), topic);
75+
76+
Class<?> ksteamsClass = Class.forName("org.apache.kafka.streams.KStreams");
77+
Class<?> topologyBuilderClass =
78+
Class.forName("org.apache.kafka.streams.processor.TopologyBuilder");
79+
Constructor<?> constructor =
80+
ksteamsClass.getConstructor(topologyBuilderClass, Properties.class);
81+
82+
return (KafkaStreams) constructor.newInstance(builder, config);
83+
} catch (NoSuchMethodException
84+
| IllegalAccessException
85+
| InvocationTargetException
86+
| ClassNotFoundException
87+
| InstantiationException e) {
88+
// equivalent to:
89+
// Produced<Integer, String> produced = Produced.with(Serdes.Integer(), Serdes.String());
90+
// values.to(STREAM_PROCESSED, produced);
91+
//
92+
// Topology topology = builder.build();
93+
// new KafkaStreams(topology, props);
94+
Class<?> producedClass = Class.forName("org.apache.kafka.streams.kstream.Produced");
95+
Method producedWith = producedClass.getMethod("with", Serde.class, Serde.class);
96+
Object producer = producedWith.invoke(null, Serdes.Integer(), Serdes.String());
97+
98+
Class<?> ksteamClass = Class.forName("org.apache.kafka.streams.kstream.KStream");
99+
ksteamClass.getMethod("to", String.class, producedClass).invoke(values, topic, producer);
100+
101+
Class<?> streamsBuilderClass = Class.forName("org.apache.kafka.streams.StreamsBuilder");
102+
Object topology = streamsBuilderClass.getMethod("build").invoke(builder);
103+
104+
Class<?> ksteamsClass = Class.forName("org.apache.kafka.streams.KStreams");
105+
Class<?> topologyClass = Class.forName("org.apache.kafka.streams.Topology");
106+
Constructor<?> constructor = ksteamsClass.getConstructor(topologyClass, Properties.class);
107+
return (KafkaStreams) constructor.newInstance(topology, config);
108+
}
109+
}
110+
}

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java

Lines changed: 4 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@
2424
import io.opentelemetry.sdk.trace.data.LinkData;
2525
import io.opentelemetry.sdk.trace.data.SpanData;
2626
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
27-
import java.lang.reflect.Constructor;
2827
import java.lang.reflect.InvocationTargetException;
29-
import java.lang.reflect.Method;
3028
import java.nio.charset.StandardCharsets;
3129
import java.time.Duration;
3230
import java.util.Collections;
@@ -40,49 +38,15 @@
4038
import org.apache.kafka.clients.consumer.ConsumerRecords;
4139
import org.apache.kafka.clients.producer.ProducerRecord;
4240
import org.apache.kafka.common.header.Headers;
43-
import org.apache.kafka.common.serialization.Serde;
4441
import org.apache.kafka.common.serialization.Serdes;
4542
import org.apache.kafka.streams.KafkaStreams;
4643
import org.apache.kafka.streams.StreamsConfig;
4744
import org.apache.kafka.streams.kstream.KStream;
48-
import org.apache.kafka.streams.processor.TopologyBuilder;
49-
import org.jetbrains.annotations.NotNull;
5045
import org.junit.jupiter.api.DisplayName;
5146
import org.junit.jupiter.api.Test;
5247

5348
class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
5449

55-
@SuppressWarnings("ClassNewInstance")
56-
private static @NotNull Object createBuilder()
57-
throws InstantiationException, IllegalAccessException, ClassNotFoundException {
58-
Object builder;
59-
try {
60-
// Different class names for test and latestDepTest.
61-
builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance();
62-
} catch (ClassNotFoundException | NoClassDefFoundError e) {
63-
builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance();
64-
}
65-
return builder;
66-
}
67-
68-
@SuppressWarnings("unchecked")
69-
private static KStream<Integer, String> stream(Object builder)
70-
throws IllegalAccessException,
71-
InvocationTargetException,
72-
NoSuchMethodException,
73-
ClassNotFoundException {
74-
Method streamMethod;
75-
try {
76-
Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder");
77-
return ((org.apache.kafka.streams.kstream.KStreamBuilder) builder).stream(STREAM_PENDING);
78-
} catch (ClassNotFoundException e) {
79-
streamMethod =
80-
Class.forName("org.apache.kafka.streams.StreamsBuilder")
81-
.getMethod("stream", String.class);
82-
return (KStream<Integer, String>) streamMethod.invoke(builder, STREAM_PENDING);
83-
}
84-
}
85-
8650
@DisplayName("test kafka produce and consume with streams in-between")
8751
@Test
8852
void testKafkaProduceAndConsumeWithStreamsInBetween()
@@ -102,16 +66,17 @@ void testKafkaProduceAndConsumeWithStreamsInBetween()
10266
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
10367

10468
// CONFIGURE PROCESSOR
105-
Object builder = createBuilder();
106-
KStream<Integer, String> textLines = stream(builder);
69+
Object builder = KafkaStreamReflectionUtil.createBuilder();
70+
KStream<Integer, String> textLines = KafkaStreamReflectionUtil.stream(builder, STREAM_PENDING);
10771
KStream<Integer, String> values =
10872
textLines.mapValues(
10973
textLine -> {
11074
Span.current().setAttribute("asdf", "testing");
11175
return textLine.toLowerCase(Locale.ROOT);
11276
});
11377

114-
KafkaStreams streams = createStreams(builder, values, config);
78+
KafkaStreams streams =
79+
KafkaStreamReflectionUtil.createStreams(builder, values, config, STREAM_PROCESSED);
11580
streams.start();
11681

11782
String greeting = "TESTING TESTING 123!";
@@ -342,41 +307,4 @@ public Iterable<String> keys(String carrier) {
342307
assertThat(spanContext.getTraceId()).isEqualTo(streamSendSpan.getTraceId());
343308
assertThat(spanContext.getSpanId()).isEqualTo(streamSendSpan.getSpanId());
344309
}
345-
346-
private static KafkaStreams createStreams(
347-
Object builder, KStream<Integer, String> values, Properties config)
348-
throws ClassNotFoundException,
349-
NoSuchMethodException,
350-
InvocationTargetException,
351-
IllegalAccessException,
352-
InstantiationException {
353-
try {
354-
// Different api for test and latestDepTest.
355-
values.to(Serdes.Integer(), Serdes.String(), STREAM_PROCESSED);
356-
return new KafkaStreams((TopologyBuilder) builder, config);
357-
} catch (NoSuchMethodError e) {
358-
// equivalent to:
359-
// Produced<Integer, String> produced = Produced.with(Serdes.Integer(), Serdes.String());
360-
// values.to(STREAM_PROCESSED, produced);
361-
//
362-
// Topology topology = builder.build();
363-
// new KafkaStreams(topology, props);
364-
Class<?> producedClass = Class.forName("org.apache.kafka.streams.kstream.Produced");
365-
Method producedWith = producedClass.getMethod("with", Serde.class, Serde.class);
366-
Object producer = producedWith.invoke(null, Serdes.Integer(), Serdes.String());
367-
368-
Class<?> ksteamClass = Class.forName("org.apache.kafka.streams.kstream.KStream");
369-
ksteamClass
370-
.getMethod("to", String.class, producedClass)
371-
.invoke(values, STREAM_PROCESSED, producer);
372-
373-
Class<?> streamsBuilderClass = Class.forName("org.apache.kafka.streams.StreamsBuilder");
374-
Object topology = streamsBuilderClass.getMethod("build").invoke(builder);
375-
376-
Class<?> ksteamsClass = Class.forName("org.apache.kafka.streams.KStreams");
377-
Class<?> topologyClass = Class.forName("org.apache.kafka.streams.Topology");
378-
Constructor<?> constructor = ksteamsClass.getConstructor(topologyClass, Properties.class);
379-
return (KafkaStreams) constructor.newInstance(topology, config);
380-
}
381-
}
382310
}

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java

Lines changed: 4 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import static java.util.Arrays.asList;
1313
import static org.assertj.core.api.Assertions.assertThat;
1414

15-
import groovy.lang.MissingMethodException;
1615
import io.opentelemetry.api.trace.Span;
1716
import io.opentelemetry.api.trace.SpanContext;
1817
import io.opentelemetry.api.trace.SpanKind;
@@ -22,9 +21,7 @@
2221
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
2322
import io.opentelemetry.sdk.trace.data.SpanData;
2423
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
25-
import java.lang.reflect.Constructor;
2624
import java.lang.reflect.InvocationTargetException;
27-
import java.lang.reflect.Method;
2825
import java.nio.charset.StandardCharsets;
2926
import java.time.Duration;
3027
import java.util.Collections;
@@ -38,49 +35,15 @@
3835
import org.apache.kafka.clients.consumer.ConsumerRecords;
3936
import org.apache.kafka.clients.producer.ProducerRecord;
4037
import org.apache.kafka.common.header.Headers;
41-
import org.apache.kafka.common.serialization.Serde;
4238
import org.apache.kafka.common.serialization.Serdes;
4339
import org.apache.kafka.streams.KafkaStreams;
4440
import org.apache.kafka.streams.StreamsConfig;
4541
import org.apache.kafka.streams.kstream.KStream;
46-
import org.apache.kafka.streams.processor.TopologyBuilder;
47-
import org.jetbrains.annotations.NotNull;
4842
import org.junit.jupiter.api.DisplayName;
4943
import org.junit.jupiter.api.Test;
5044

5145
class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
5246

53-
@SuppressWarnings("ClassNewInstance")
54-
private static @NotNull Object createBuilder()
55-
throws InstantiationException, IllegalAccessException, ClassNotFoundException {
56-
Object builder;
57-
try {
58-
// Different class names for test and latestDepTest.
59-
builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance();
60-
} catch (ClassNotFoundException | NoClassDefFoundError e) {
61-
builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance();
62-
}
63-
return builder;
64-
}
65-
66-
@SuppressWarnings("unchecked")
67-
private static KStream<Integer, String> stream(Object builder)
68-
throws IllegalAccessException,
69-
InvocationTargetException,
70-
NoSuchMethodException,
71-
ClassNotFoundException {
72-
Method streamMethod;
73-
try {
74-
Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder");
75-
return ((org.apache.kafka.streams.kstream.KStreamBuilder) builder).stream(STREAM_PENDING);
76-
} catch (ClassNotFoundException e) {
77-
streamMethod =
78-
Class.forName("org.apache.kafka.streams.StreamsBuilder")
79-
.getMethod("stream", String.class);
80-
return (KStream<Integer, String>) streamMethod.invoke(builder, STREAM_PENDING);
81-
}
82-
}
83-
8447
@DisplayName("test kafka produce and consume with streams in-between")
8548
@Test
8649
void testKafkaProduceAndConsumeWithStreamsInBetween()
@@ -100,16 +63,17 @@ void testKafkaProduceAndConsumeWithStreamsInBetween()
10063
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
10164

10265
// CONFIGURE PROCESSOR
103-
Object builder = createBuilder();
104-
KStream<Integer, String> textLines = stream(builder);
66+
Object builder = KafkaStreamReflectionUtil.createBuilder();
67+
KStream<Integer, String> textLines = KafkaStreamReflectionUtil.stream(builder, STREAM_PENDING);
10568
KStream<Integer, String> values =
10669
textLines.mapValues(
10770
textLine -> {
10871
Span.current().setAttribute("asdf", "testing");
10972
return textLine.toLowerCase(Locale.ROOT);
11073
});
11174

112-
KafkaStreams streams = createStreams(builder, values, config);
75+
KafkaStreams streams =
76+
KafkaStreamReflectionUtil.createStreams(builder, values, config, STREAM_PROCESSED);
11377
streams.start();
11478

11579
String greeting = "TESTING TESTING 123!";
@@ -275,41 +239,4 @@ public Iterable<String> keys(String carrier) {
275239
assertThat(spanContext.getTraceId()).isEqualTo(streamSendSpan.getTraceId());
276240
assertThat(spanContext.getSpanId()).isEqualTo(streamSendSpan.getSpanId());
277241
}
278-
279-
private static KafkaStreams createStreams(
280-
Object builder, KStream<Integer, String> values, Properties config)
281-
throws ClassNotFoundException,
282-
NoSuchMethodException,
283-
InvocationTargetException,
284-
IllegalAccessException,
285-
InstantiationException {
286-
try {
287-
// Different api for test and latestDepTest.
288-
values.to(Serdes.Integer(), Serdes.String(), STREAM_PROCESSED);
289-
return new KafkaStreams((TopologyBuilder) builder, config);
290-
} catch (MissingMethodException e) {
291-
// equivalent to:
292-
// Produced<Integer, String> produced = Produced.with(Serdes.Integer(), Serdes.String());
293-
// values.to(STREAM_PROCESSED, produced);
294-
//
295-
// Topology topology = builder.build();
296-
// new KafkaStreams(topology, props);
297-
Class<?> producedClass = Class.forName("org.apache.kafka.streams.kstream.Produced");
298-
Method producedWith = producedClass.getMethod("with", Serde.class, Serde.class);
299-
Object producer = producedWith.invoke(null, Serdes.Integer(), Serdes.String());
300-
301-
Class<?> ksteamClass = Class.forName("org.apache.kafka.streams.kstream.KStream");
302-
ksteamClass
303-
.getMethod("to", String.class, producedClass)
304-
.invoke(values, STREAM_PROCESSED, producer);
305-
306-
Class<?> streamsBuilderClass = Class.forName("org.apache.kafka.streams.StreamsBuilder");
307-
Object topology = streamsBuilderClass.getMethod("build").invoke(builder);
308-
309-
Class<?> ksteamsClass = Class.forName("org.apache.kafka.streams.KStreams");
310-
Class<?> topologyClass = Class.forName("org.apache.kafka.streams.Topology");
311-
Constructor<?> constructor = ksteamsClass.getConstructor(topologyClass, Properties.class);
312-
return (KafkaStreams) constructor.newInstance(topology, config);
313-
}
314-
}
315242
}

0 commit comments

Comments
 (0)