Skip to content

Commit 340a942

Browse files
committed
fix reflection
1 parent 132d79b commit 340a942

File tree

1 file changed

+4
-3
lines changed
  • instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams

1 file changed

+4
-3
lines changed

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamReflectionUtil.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,11 @@ static KStream<Integer, String> stream(Object builder, String topic)
4545
try {
4646
// equivalent to:
4747
// ((org.apache.kafka.streams.kstream.KStreamBuilder)builder).stream(STREAM_PENDING);
48+
String[] topics = new String[] {topic};
4849
return (KStream<Integer, String>)
4950
Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder")
5051
.getMethod("stream", String[].class)
51-
.invoke(builder, topic);
52+
.invoke(builder, (Object) topics);
5253
} catch (ClassNotFoundException
5354
| NoSuchMethodException
5455
| IllegalAccessException
@@ -79,7 +80,7 @@ static KafkaStreams createStreams(
7980
.getMethod("to", Serde.class, Serde.class, String.class)
8081
.invoke(values, Serdes.Integer(), Serdes.String(), topic);
8182

82-
Class<?> ksteamsClass = Class.forName("org.apache.kafka.streams.KStreams");
83+
Class<?> ksteamsClass = Class.forName("org.apache.kafka.streams.KafkaStreams");
8384
Class<?> topologyBuilderClass =
8485
Class.forName("org.apache.kafka.streams.processor.TopologyBuilder");
8586
Constructor<?> constructor =
@@ -107,7 +108,7 @@ static KafkaStreams createStreams(
107108
Class<?> streamsBuilderClass = Class.forName("org.apache.kafka.streams.StreamsBuilder");
108109
Object topology = streamsBuilderClass.getMethod("build").invoke(builder);
109110

110-
Class<?> ksteamsClass = Class.forName("org.apache.kafka.streams.KStreams");
111+
Class<?> ksteamsClass = Class.forName("org.apache.kafka.streams.KafkaStreams");
111112
Class<?> topologyClass = Class.forName("org.apache.kafka.streams.Topology");
112113
Constructor<?> constructor = ksteamsClass.getConstructor(topologyClass, Properties.class);
113114
return (KafkaStreams) constructor.newInstance(topology, config);

0 commit comments

Comments
 (0)