Skip to content

Commit a797d77

Browse files
authored
Prepare for 3.6.0 Client
- remove reflection - add server-common jars
1 parent f61bf74 commit a797d77

File tree

3 files changed

+11
-80
lines changed

3 files changed

+11
-80
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,8 @@ project ('spring-kafka-test') {
403403
}
404404
api "org.apache.kafka:kafka-clients:$kafkaVersion:test"
405405
api "org.apache.kafka:kafka-metadata:$kafkaVersion"
406+
api "org.apache.kafka:kafka-server-common:$kafkaVersion"
407+
api "org.apache.kafka:kafka-server-common:$kafkaVersion:test"
406408
api "org.apache.kafka:kafka-streams-test-utils:$kafkaVersion"
407409
api "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion"
408410
api "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 8 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import java.io.File;
2020
import java.io.IOException;
2121
import java.io.UncheckedIOException;
22-
import java.lang.reflect.InvocationTargetException;
23-
import java.lang.reflect.Method;
2422
import java.net.InetSocketAddress;
2523
import java.nio.file.Files;
2624
import java.time.Duration;
@@ -53,7 +51,6 @@
5351
import org.apache.kafka.common.KafkaException;
5452
import org.apache.kafka.common.TopicPartition;
5553
import org.apache.kafka.common.security.auth.SecurityProtocol;
56-
import org.apache.kafka.common.utils.AppInfoParser;
5754
import org.apache.kafka.common.utils.Exit;
5855
import org.apache.kafka.common.utils.Time;
5956
import org.apache.kafka.common.utils.Utils;
@@ -70,7 +67,6 @@
7067
import org.springframework.retry.policy.SimpleRetryPolicy;
7168
import org.springframework.retry.support.RetryTemplate;
7269
import org.springframework.util.Assert;
73-
import org.springframework.util.ReflectionUtils;
7470

7571
import kafka.cluster.EndPoint;
7672
import kafka.server.KafkaConfig;
@@ -121,40 +117,6 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
121117

122118
public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = DEFAULT_ZK_SESSION_TIMEOUT;
123119

124-
private static final Method GET_BROKER_STATE_METHOD;
125-
126-
private static final Method BROKER_CONFIGS_METHOD;
127-
128-
static {
129-
try {
130-
Method method = KafkaServer.class.getDeclaredMethod("brokerState");
131-
if (method.getReturnType().equals(AtomicReference.class)) {
132-
GET_BROKER_STATE_METHOD = method;
133-
}
134-
else {
135-
GET_BROKER_STATE_METHOD = null;
136-
}
137-
}
138-
catch (NoSuchMethodException | SecurityException ex) {
139-
throw new IllegalStateException("Failed to determine KafkaServer.brokerState() method; client version: "
140-
+ AppInfoParser.getVersion(), ex);
141-
}
142-
try {
143-
AtomicReference<Method> configsMethod = new AtomicReference<>();
144-
ReflectionUtils.doWithMethods(TestUtils.class,
145-
method -> configsMethod.set(method),
146-
method -> method.getName().equals("createBrokerConfig"));
147-
BROKER_CONFIGS_METHOD = configsMethod.get();
148-
if (BROKER_CONFIGS_METHOD == null) {
149-
throw new IllegalStateException();
150-
}
151-
}
152-
catch (IllegalStateException ex) {
153-
throw new IllegalStateException("Failed to obtain TestUtils.createBrokerConfig method; client version: "
154-
+ AppInfoParser.getVersion(), ex);
155-
}
156-
}
157-
158120
private final int count;
159121

160122
private final boolean controlledShutdown;
@@ -414,29 +376,13 @@ private void overrideExitMethods() {
414376
}
415377

416378
private Properties createBrokerProperties(int i) {
417-
try {
418-
if (BROKER_CONFIGS_METHOD.getParameterTypes().length == 21) { // 3.3.2
419-
return (Properties) BROKER_CONFIGS_METHOD.invoke(null, i, this.zkConnect, this.controlledShutdown,
420-
true, this.kafkaPorts[i],
421-
scala.Option.apply(null),
422-
scala.Option.apply(null),
423-
scala.Option.apply(null),
424-
true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1, false,
425-
this.partitionsPerTopic, (short) this.count, false);
426-
}
427-
else {
428-
return (Properties) BROKER_CONFIGS_METHOD.invoke(null, i, this.zkConnect, this.controlledShutdown,
429-
true, this.kafkaPorts[i],
430-
scala.Option.apply(null),
431-
scala.Option.apply(null),
432-
scala.Option.apply(null),
433-
true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1, false,
434-
this.partitionsPerTopic, (short) this.count);
435-
}
436-
}
437-
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
438-
throw new IllegalStateException(ex);
439-
}
379+
return TestUtils.createBrokerConfig(i, this.zkConnect, this.controlledShutdown,
380+
true, this.kafkaPorts[i],
381+
scala.Option.apply(null),
382+
scala.Option.apply(null),
383+
scala.Option.apply(null),
384+
true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1, false,
385+
this.partitionsPerTopic, (short) this.count, false);
440386
}
441387

442388
/**
@@ -628,24 +574,7 @@ public void destroy() {
628574
}
629575

630576
private boolean brokerRunning(KafkaServer kafkaServer) {
631-
try {
632-
return !kafkaServer.brokerState().equals(BrokerState.NOT_RUNNING);
633-
}
634-
catch (NoSuchMethodError error) {
635-
if (GET_BROKER_STATE_METHOD != null) {
636-
try {
637-
return !GET_BROKER_STATE_METHOD.invoke(kafkaServer).toString().equals("NOT_RUNNING");
638-
}
639-
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
640-
logger.debug(ex, "Could not determine broker state during shutdown");
641-
return true;
642-
}
643-
}
644-
else {
645-
logger.debug("Could not determine broker state during shutdown");
646-
return true;
647-
}
648-
}
577+
return !kafkaServer.brokerState().equals(BrokerState.NOT_RUNNING);
649578
}
650579

651580
public Set<String> getTopics() {

spring-kafka/src/test/java/org/springframework/kafka/support/serializer/ToStringSerializationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public void testSerialization_usingHeaders() {
217217
void nullValue() {
218218
ParseStringDeserializer<Object> deserializer =
219219
new ParseStringDeserializer<>(ToStringSerializationTests::parseWithHeaders);
220-
assertThat(deserializer.deserialize("foo", new RecordHeaders(), null)).isNull();
220+
assertThat(deserializer.deserialize("foo", new RecordHeaders(), (byte[]) null)).isNull();
221221
}
222222

223223
@Test

0 commit comments

Comments
 (0)