Skip to content

Commit 55e84f9

Browse files
authored
Fix kafka streams latest dep test (#13546)
1 parent e733836 commit 55e84f9

File tree

4 files changed

+41
-10
lines changed

4 files changed

+41
-10
lines changed

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.clients.admin.NewTopic;
2929
import org.apache.kafka.clients.consumer.Consumer;
3030
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
31+
import org.apache.kafka.clients.consumer.ConsumerRecords;
3132
import org.apache.kafka.clients.consumer.KafkaConsumer;
3233
import org.apache.kafka.clients.producer.KafkaProducer;
3334
import org.apache.kafka.clients.producer.Producer;
@@ -146,7 +147,7 @@ static void awaitUntilConsumerIsReady() throws InterruptedException {
146147
return;
147148
}
148149
for (int i = 0; i < 10; i++) {
149-
consumer.poll(0);
150+
poll(Duration.ofMillis(100));
150151
if (consumerReady.await(1, TimeUnit.SECONDS)) {
151152
break;
152153
}
@@ -157,6 +158,10 @@ static void awaitUntilConsumerIsReady() throws InterruptedException {
157158
consumer.seekToBeginning(Collections.emptyList());
158159
}
159160

161+
public static ConsumerRecords<Integer, String> poll(Duration duration) {
162+
return KafkaStreamsReflectionUtil.poll(consumer, duration);
163+
}
164+
160165
static Context getContext(Headers headers) {
161166
String traceparent =
162167
new String(

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
5353
@Test
5454
void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
5555
Properties config = new Properties();
56-
config.putAll(producerProps(KafkaStreamsBaseTest.kafka.getBootstrapServers()));
56+
config.putAll(producerProps(kafka.getBootstrapServers()));
5757
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
5858
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
5959
config.put(
@@ -74,12 +74,11 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
7474
streams.start();
7575

7676
String greeting = "TESTING TESTING 123!";
77-
KafkaStreamsBaseTest.producer.send(new ProducerRecord<>(STREAM_PENDING, 10, greeting));
77+
producer.send(new ProducerRecord<>(STREAM_PENDING, 10, greeting));
7878

7979
awaitUntilConsumerIsReady();
8080
@SuppressWarnings("PreferJavaTimeOverload")
81-
ConsumerRecords<Integer, String> records =
82-
KafkaStreamsBaseTest.consumer.poll(Duration.ofSeconds(10).toMillis());
81+
ConsumerRecords<Integer, String> records = poll(Duration.ofSeconds(10));
8382
Headers receivedHeaders = null;
8483
for (ConsumerRecord<Integer, String> record : records) {
8584
Span.current().setAttribute("testing", 123);

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsReflectionUtil.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77

88
import java.lang.reflect.Constructor;
99
import java.lang.reflect.Method;
10+
import java.time.Duration;
1011
import java.util.Properties;
12+
import org.apache.kafka.clients.consumer.Consumer;
13+
import org.apache.kafka.clients.consumer.ConsumerRecords;
1114
import org.apache.kafka.common.serialization.Serde;
1215
import org.apache.kafka.common.serialization.Serdes;
1316
import org.apache.kafka.streams.KafkaStreams;
@@ -18,6 +21,32 @@
1821
* streams.
1922
*/
2023
class KafkaStreamsReflectionUtil {
24+
private static final Method consumerPollDurationMethod = getConsumerPollMethod(Duration.class);
25+
private static final Method consumerPollLongMethod = getConsumerPollMethod(long.class);
26+
27+
private static Method getConsumerPollMethod(Class<?>... types) {
28+
try {
29+
return Consumer.class.getMethod("poll", types);
30+
} catch (NoSuchMethodException e) {
31+
return null;
32+
}
33+
}
34+
35+
@SuppressWarnings("unchecked")
36+
public static <K, V> ConsumerRecords<K, V> poll(Consumer<K, V> consumer, Duration duration) {
37+
try {
38+
if (consumerPollDurationMethod != null) {
39+
// not present in early versions
40+
return (ConsumerRecords<K, V>) consumerPollDurationMethod.invoke(consumer, duration);
41+
} else if (consumerPollLongMethod != null) {
42+
// not present in 4.x
43+
return (ConsumerRecords<K, V>) consumerPollLongMethod.invoke(consumer, duration.toMillis());
44+
}
45+
} catch (Exception exception) {
46+
throw new IllegalStateException(exception);
47+
}
48+
throw new IllegalStateException("poll method not found");
49+
}
2150

2251
private KafkaStreamsReflectionUtil() {}
2352

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
5050
@Test
5151
void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
5252
Properties config = new Properties();
53-
config.putAll(producerProps(KafkaStreamsBaseTest.kafka.getBootstrapServers()));
53+
config.putAll(producerProps(kafka.getBootstrapServers()));
5454
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
5555
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
5656
config.put(
@@ -71,12 +71,10 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
7171
streams.start();
7272

7373
String greeting = "TESTING TESTING 123!";
74-
KafkaStreamsBaseTest.producer.send(new ProducerRecord<>(STREAM_PENDING, 10, greeting));
74+
producer.send(new ProducerRecord<>(STREAM_PENDING, 10, greeting));
7575

7676
// check that the message was received
77-
@SuppressWarnings("PreferJavaTimeOverload")
78-
ConsumerRecords<Integer, String> records =
79-
KafkaStreamsBaseTest.consumer.poll(Duration.ofSeconds(10).toMillis());
77+
ConsumerRecords<Integer, String> records = poll(Duration.ofSeconds(10));
8078
Headers receivedHeaders = null;
8179
for (ConsumerRecord<Integer, String> record : records) {
8280
Span.current().setAttribute("testing", 123);

0 commit comments

Comments
 (0)