77import org .testcontainers .utility .DockerImageName ;
88
99import java .util .ArrayList ;
10+ import java .util .Arrays ;
1011import java .util .HashSet ;
1112import java .util .List ;
1213import java .util .Objects ;
1314import java .util .Set ;
1415import java .util .function .Supplier ;
16+ import java .util .stream .Collectors ;
1517
1618/**
1719 * Testcontainers implementation for Apache Kafka.
@@ -85,7 +87,7 @@ public KafkaContainer withEmbeddedZookeeper() {
8587 if (this .kraftEnabled ) {
8688 throw new IllegalStateException ("Cannot configure Zookeeper when using Kraft mode" );
8789 }
88- getContainerDef (). withEmbeddedZookeeper () ;
90+ this . externalZookeeperConnect = null ;
8991 return self ();
9092 }
9193
@@ -94,7 +96,6 @@ public KafkaContainer withExternalZookeeper(String connectString) {
9496 throw new IllegalStateException ("Cannot configure Zookeeper when using Kraft mode" );
9597 }
9698 this .externalZookeeperConnect = connectString ;
97- getContainerDef ().withZookeeper (connectString );
9899 return self ();
99100 }
100101
@@ -104,7 +105,6 @@ public KafkaContainer withKraft() {
104105 }
105106 verifyMinKraftVersion ();
106107 this .kraftEnabled = true ;
107- getContainerDef ().withRaft ();
108108 return self ();
109109 }
110110
@@ -140,8 +140,22 @@ public String getBootstrapServers() {
140140 protected void configure () {
141141 getContainerDef ().resolveListeners ();
142142
143- if (!this .kraftEnabled && this .externalZookeeperConnect == null ) {
143+ if (this .kraftEnabled ) {
144+ configureKraft ();
145+ } else {
146+ configureZookeeper ();
147+ }
148+ }
149+
150+ protected void configureKraft () {
151+ getContainerDef ().withRaft ();
152+ }
153+
154+ protected void configureZookeeper () {
155+ if (this .externalZookeeperConnect == null ) {
144156 getContainerDef ().withEmbeddedZookeeper ();
157+ } else {
158+ getContainerDef ().withZookeeper (this .externalZookeeperConnect );
145159 }
146160 }
147161
@@ -261,8 +275,12 @@ private static class KafkaContainerDef extends ContainerDef {
261275 }
262276
263277 private void resolveListeners () {
264- Set <String > additionalKafkaListeners = new HashSet <>();
265- Set <String > additionalListenerSecurityProtocolMap = new HashSet <>();
278+ Set <String > listeners = Arrays
279+ .stream (this .envVars .get ("KAFKA_LISTENERS" ).split ("," ))
280+ .collect (Collectors .toSet ());
281+ Set <String > listenerSecurityProtocolMap = Arrays
282+ .stream (this .envVars .get ("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" ).split ("," ))
283+ .collect (Collectors .toSet ());
266284
267285 List <Supplier <String >> listenersToTransform = new ArrayList <>(this .listeners );
268286 for (int i = 0 ; i < listenersToTransform .size (); i ++) {
@@ -272,21 +290,18 @@ private void resolveListeners() {
272290 String listenerPort = listener .split (":" )[1 ];
273291 String listenerProtocol = String .format ("%s://0.0.0.0:%s" , protocol , listenerPort );
274292 String protocolMap = String .format ("%s:PLAINTEXT" , protocol );
275- additionalKafkaListeners .add (listenerProtocol );
276- additionalListenerSecurityProtocolMap .add (protocolMap );
293+ listeners .add (listenerProtocol );
294+ listenerSecurityProtocolMap .add (protocolMap );
277295
278296 String host = listener .split (":" )[0 ];
279297 addNetworkAlias (host );
280298 }
281299
282- String kafkaListeners = String .join ("," , additionalKafkaListeners );
283- String kafkaListenerSecurityProtocolMap = String .join ("," , additionalListenerSecurityProtocolMap );
300+ String kafkaListeners = String .join ("," , listeners );
301+ String kafkaListenerSecurityProtocolMap = String .join ("," , listenerSecurityProtocolMap );
284302
285- this .envVars .computeIfPresent ("KAFKA_LISTENERS" , (k , v ) -> String .join ("," , v , kafkaListeners ));
286- this .envVars .computeIfPresent (
287- "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" ,
288- (k , v ) -> String .join ("," , v , kafkaListenerSecurityProtocolMap )
289- );
303+ this .envVars .put ("KAFKA_LISTENERS" , kafkaListeners );
304+ this .envVars .put ("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" , kafkaListenerSecurityProtocolMap );
290305 }
291306
292307 void withListener (Supplier <String > listenerSupplier ) {
0 commit comments