Skip to content

Commit 8cab408

Browse files
garyrussellartembilan
authored andcommitted
kafka-clients 3.3.2 Compatibility
Additional parameter on `TestUtils.createBrokerConfig()`. Retain compatibility with 3.3.1 and earlier.
1 parent cd455f6 commit 8cab408

File tree

1 file changed

+42
-9
lines changed

1 file changed

+42
-9
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.springframework.retry.policy.SimpleRetryPolicy;
7171
import org.springframework.retry.support.RetryTemplate;
7272
import org.springframework.util.Assert;
73+
import org.springframework.util.ReflectionUtils;
7374

7475
import kafka.cluster.EndPoint;
7576
import kafka.server.KafkaConfig;
@@ -122,6 +123,8 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
122123

123124
private static final Method GET_BROKER_STATE_METHOD;
124125

126+
private static final Method BROKER_CONFIGS_METHOD;
127+
125128
static {
126129
try {
127130
Method method = KafkaServer.class.getDeclaredMethod("brokerState");
@@ -132,9 +135,23 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
132135
GET_BROKER_STATE_METHOD = null;
133136
}
134137
}
135-
catch (NoSuchMethodException | SecurityException e) {
138+
catch (NoSuchMethodException | SecurityException ex) {
136139
throw new IllegalStateException("Failed to determine KafkaServer.brokerState() method; client version: "
137-
+ AppInfoParser.getVersion(), e);
140+
+ AppInfoParser.getVersion(), ex);
141+
}
142+
try {
143+
AtomicReference<Method> configsMethod = new AtomicReference<>();
144+
ReflectionUtils.doWithMethods(TestUtils.class,
145+
method -> configsMethod.set(method),
146+
method -> method.getName().equals("createBrokerConfig"));
147+
BROKER_CONFIGS_METHOD = configsMethod.get();
148+
if (BROKER_CONFIGS_METHOD == null) {
149+
throw new IllegalStateException();
150+
}
151+
}
152+
catch (IllegalStateException ex) {
153+
throw new IllegalStateException("Failed to obtain TestUtils.createBrokerConfig method; client version: "
154+
+ AppInfoParser.getVersion(), ex);
138155
}
139156
}
140157

@@ -395,13 +412,29 @@ private void overrideExitMethods() {
395412
}
396413

397414
private Properties createBrokerProperties(int i) {
398-
return TestUtils.createBrokerConfig(i, this.zkConnect, this.controlledShutdown,
399-
true, this.kafkaPorts[i],
400-
scala.Option.apply(null),
401-
scala.Option.apply(null),
402-
scala.Option.apply(null),
403-
true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1, false,
404-
this.partitionsPerTopic, (short) this.count);
415+
try {
416+
if (BROKER_CONFIGS_METHOD.getParameterTypes().length == 21) { // 3.3.2
417+
return (Properties) BROKER_CONFIGS_METHOD.invoke(null, i, this.zkConnect, this.controlledShutdown,
418+
true, this.kafkaPorts[i],
419+
scala.Option.apply(null),
420+
scala.Option.apply(null),
421+
scala.Option.apply(null),
422+
true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1, false,
423+
this.partitionsPerTopic, (short) this.count, false);
424+
}
425+
else {
426+
return (Properties) BROKER_CONFIGS_METHOD.invoke(null, i, this.zkConnect, this.controlledShutdown,
427+
true, this.kafkaPorts[i],
428+
scala.Option.apply(null),
429+
scala.Option.apply(null),
430+
scala.Option.apply(null),
431+
true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1, false,
432+
this.partitionsPerTopic, (short) this.count);
433+
}
434+
}
435+
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
436+
throw new IllegalStateException(ex);
437+
}
405438
}
406439

407440
/**

0 commit comments

Comments
 (0)