Skip to content

Commit 91fa915

Browse files
committed
Add end to end tests itesting ability to form client connections to controller(s) (IN_VM only at the moment).
Signed-off-by: kwall <[email protected]>
1 parent 7049108 commit 91fa915

File tree

5 files changed

+96
-0
lines changed

5 files changed

+96
-0
lines changed

api/src/main/java/io/kroxylicious/testing/kafka/api/KafkaCluster.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@ public interface KafkaCluster extends AutoCloseable {
105105
*/
106106
String getBootstrapServers();
107107

108+
/**
109+
* Gets the bootstrap controllers for this cluster
110+
* @return bootstrap controllers
111+
* @throws UnsupportedOperationException zookeeper based clusters do not support this operation.
112+
*/
113+
String getBootstrapControllers() throws UnsupportedOperationException;
114+
108115
/**
109116
* Gets the cluster id
110117
* @return The cluster id for KRaft-based clusters, otherwise null;
@@ -132,4 +139,16 @@ public interface KafkaCluster extends AutoCloseable {
132139
* @return mutable configuration map
133140
*/
134141
Map<String, Object> getKafkaClientConfiguration(String user, String password);
142+
143+
/**
144+
* Gets the kafka configuration for making connections to the controllers of this cluster as required by the
145+
* {@code org.apache.kafka.clients.admin.AdminClient}. Details such the bootstrap and SASL configuration
146+
* are provided automatically.
147+
* The returned map is guaranteed to be mutable and is unique to the caller.
148+
*
149+
* @return mutable configuration map
150+
* @throws UnsupportedOperationException zookeeper based clusters do not support this operation.
151+
*/
152+
Map<String, Object> getControllerAdminClientConfiguration() throws UnsupportedOperationException;
153+
135154
}

impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,12 @@ public Map<String, Object> getConnectConfigForCluster(String bootstrapServers, S
619619
return kafkaConfig;
620620
}
621621

622+
public Map<String, Object> getControllerAdminClientConfigForCluster(String bootstrapControllers) {
623+
var kafkaConfig = new HashMap<String, Object>();
624+
kafkaConfig.put("bootstrap.controllers", bootstrapControllers);
625+
return kafkaConfig;
626+
}
627+
622628
private void buildSecurityProtocolConfig(Map<String, Object> kafkaConfig) {
623629
String clientTrustStoreFilePath;
624630
String clientTrustStorePassword;

impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,15 @@ public synchronized String getBootstrapServers() {
260260
return buildBrokerList(nodeId -> getEndpointPair(Listener.EXTERNAL, nodeId));
261261
}
262262

263+
@Override
264+
public String getBootstrapControllers() {
265+
if (!clusterConfig.isKraftMode()) {
266+
throw new UnsupportedOperationException("Zookeeper based clusters don't support this operation");
267+
}
268+
269+
return buildBrokerList(nodeId -> getEndpointPair(Listener.CONTROLLER, nodeId));
270+
}
271+
263272
private synchronized String buildBrokerList(Function<Integer, KafkaClusterConfig.KafkaEndpoints.EndpointPair> endpointFunc) {
264273
return servers.keySet().stream()
265274
.filter(this::isBroker)
@@ -278,6 +287,11 @@ public Map<String, Object> getKafkaClientConfiguration(String user, String passw
278287
return clusterConfig.getConnectConfigForCluster(getBootstrapServers(), user, password);
279288
}
280289

290+
@Override
291+
public Map<String, Object> getControllerAdminClientConfiguration() {
292+
return clusterConfig.getControllerAdminClientConfigForCluster(getBootstrapControllers());
293+
}
294+
281295
@Override
282296
public synchronized int addBroker() {
283297
// find next free kafka node.id

impl/src/main/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,11 @@ public synchronized String getBootstrapServers() {
274274
return buildBrokerList(nodeId -> this.getEndpointPair(Listener.EXTERNAL, nodeId));
275275
}
276276

277+
@Override
278+
public String getBootstrapControllers() {
279+
throw new UnsupportedOperationException();
280+
}
281+
277282
private synchronized String buildBrokerList(Function<Integer, KafkaClusterConfig.KafkaEndpoints.EndpointPair> endpointFunc) {
278283
return nodes.keySet().stream()
279284
.filter(this::isBroker)
@@ -590,6 +595,11 @@ public Map<String, Object> getKafkaClientConfiguration(String user, String passw
590595
return clusterConfig.getConnectConfigForCluster(getBootstrapServers(), user, password);
591596
}
592597

598+
@Override
599+
public Map<String, Object> getControllerAdminClientConfiguration() {
600+
throw new UnsupportedOperationException();
601+
}
602+
593603
@Override
594604
public synchronized EndpointPair getEndpointPair(Listener listener, int nodeId) {
595605
switch (listener) {

impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,14 @@
5151
import io.kroxylicious.testing.kafka.common.KafkaClusterFactory;
5252
import io.kroxylicious.testing.kafka.common.KeytoolCertificateGenerator;
5353
import io.kroxylicious.testing.kafka.common.Utils;
54+
import io.kroxylicious.testing.kafka.invm.InVMKafkaCluster;
5455

5556
import static org.assertj.core.api.Assertions.assertThat;
5657
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5758
import static org.junit.jupiter.api.Assertions.assertEquals;
5859
import static org.junit.jupiter.api.Assertions.assertThrows;
5960
import static org.junit.jupiter.api.Assertions.fail;
61+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
6062

6163
/**
6264
* Test case that simply exercises the ability to control the kafka cluster from the test.
@@ -491,6 +493,51 @@ void kraftClusterWithMinBootstrapInterBrokerProtocol() throws Exception {
491493
}
492494
}
493495

496+
/**
497+
* KIP-919 tests ability to connect to the controller.bootstrap.
498+
*/
499+
@ParameterizedTest
500+
@ValueSource(ints = { 1, 2 })
501+
void kraftAdminConnectionToControllers(int numControllers) throws Exception {
502+
int brokersNum = 1;
503+
try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder()
504+
.testInfo(testInfo)
505+
.brokersNum(brokersNum)
506+
.kraftControllers(numControllers)
507+
.kraftMode(true)
508+
.build())) {
509+
assumeTrue(cluster instanceof InVMKafkaCluster, "admin client connections to KRaft controllers not yet supported.");
510+
cluster.start();
511+
512+
try (var controllerAdmin = CloseableAdmin.create(cluster.getControllerAdminClientConfiguration())) {
513+
var nodes = controllerAdmin.describeCluster().nodes().get(5, TimeUnit.SECONDS);
514+
assertThat(nodes).hasSize(numControllers);
515+
}
516+
517+
try (var brokerAdmin = CloseableAdmin.create(cluster.getKafkaClientConfiguration())) {
518+
var nodes = brokerAdmin.describeCluster().nodes().get(5, TimeUnit.SECONDS);
519+
assertThat(nodes).hasSize(brokersNum);
520+
}
521+
}
522+
}
523+
524+
@Test
525+
void zookeeperDisallowsAdminConnectionToControllers() throws Exception {
526+
try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder()
527+
.testInfo(testInfo)
528+
.kraftMode(false)
529+
.build())) {
530+
cluster.start();
531+
532+
assertThatThrownBy(cluster::getBootstrapControllers)
533+
.isInstanceOf(UnsupportedOperationException.class);
534+
535+
assertThatThrownBy(cluster::getControllerAdminClientConfiguration)
536+
.isInstanceOf(UnsupportedOperationException.class);
537+
}
538+
539+
}
540+
494541
private void verifyRecordRoundTrip(int expected, KafkaCluster cluster) throws Exception {
495542
var topic = "roundTrip" + Uuid.randomUuid();
496543
var message = "Hello, world!";

0 commit comments

Comments
 (0)