2323import java .util .concurrent .ExecutionException ;
2424import java .util .concurrent .TimeUnit ;
2525import java .util .concurrent .TimeoutException ;
26+ import java .util .function .Consumer ;
2627import java .util .function .Function ;
2728import java .util .regex .Matcher ;
2829import java .util .regex .Pattern ;
@@ -52,6 +53,7 @@ public class StrimziKafkaCluster implements KafkaContainer {
5253 private final String logFilePath ;
5354 private final int fixedExposedPort ;
5455 private final Function <StrimziKafkaContainer , String > bootstrapServersProvider ;
56+ private final Consumer <StrimziKafkaContainer > containerCustomizer ;
5557
5658 // OAuth fields
5759 private final boolean oauthEnabled ;
@@ -66,9 +68,9 @@ public class StrimziKafkaCluster implements KafkaContainer {
6668
6769 // not editable
6870 private final Network network ;
69- private Collection <KafkaContainer > nodes ;
70- private Collection <KafkaContainer > controllers ;
71- private Collection <KafkaContainer > brokers ;
71+ private Collection <StrimziKafkaContainer > nodes ;
72+ private Collection <StrimziKafkaContainer > controllers ;
73+ private Collection <StrimziKafkaContainer > brokers ;
7274 private final String clusterId ;
7375
7476 private StrimziKafkaCluster (StrimziKafkaClusterBuilder builder ) {
@@ -89,6 +91,7 @@ private StrimziKafkaCluster(StrimziKafkaClusterBuilder builder) {
8991 this .logFilePath = builder .logFilePath ;
9092 this .fixedExposedPort = builder .fixedExposedPort ;
9193 this .bootstrapServersProvider = builder .bootstrapServersProvider ;
94+ this .containerCustomizer = builder .containerCustomizer ;
9295
9396 // OAuth configuration
9497 this .oauthEnabled = builder .oauthEnabled ;
@@ -162,6 +165,7 @@ private void prepareCombinedRolesCluster(final Map<String, String> kafkaConfigur
162165 applyFixedPort (kafkaContainer , nodeId );
163166 applyBootstrapServersProvider (kafkaContainer );
164167 applyOAuthConfiguration (kafkaContainer );
168+ applyContainerCustomizer (kafkaContainer );
165169
166170 LOGGER .info ("Started combined role node with id: {}" , kafkaContainer );
167171
@@ -205,6 +209,7 @@ private StrimziKafkaContainer createControllerContainer(int controllerId, Map<St
205209 applyKafkaVersion (controllerContainer , kafkaVersion );
206210 applyLogCollection (controllerContainer );
207211 applyOAuthConfiguration (controllerContainer );
212+ applyContainerCustomizer (controllerContainer );
208213
209214 LOGGER .info ("Started controller-only node with id: {}" , controllerContainer );
210215 return controllerContainer ;
@@ -231,6 +236,7 @@ private StrimziKafkaContainer createBrokerContainer(int brokerIndex, Map<String,
231236 applyFixedPort (brokerContainer , brokerIndex );
232237 applyBootstrapServersProvider (brokerContainer );
233238 applyOAuthConfiguration (brokerContainer );
239+ applyContainerCustomizer (brokerContainer );
234240
235241 LOGGER .info ("Started broker-only node with id: {}" , brokerContainer );
236242 return brokerContainer ;
@@ -267,6 +273,12 @@ private void applyBootstrapServersProvider(StrimziKafkaContainer container) {
267273 }
268274 }
269275
276+ private void applyContainerCustomizer (StrimziKafkaContainer container ) {
277+ if (this .containerCustomizer != null ) {
278+ this .containerCustomizer .accept (container );
279+ }
280+ }
281+
270282 /**
271283 * Applies OAuth configuration to a Kafka container if OAuth is enabled.
272284 *
@@ -336,7 +348,7 @@ private void configureProxyContainerPorts() {
336348 * </p>
337349 */
338350 public static class StrimziKafkaClusterBuilder {
339- private int brokersNum ;
351+ private int brokersNum = 1 ;
340352 private int controllersNum ;
341353 private boolean useDedicatedRoles ;
342354 private int internalTopicReplicationFactor ;
@@ -349,6 +361,7 @@ public static class StrimziKafkaClusterBuilder {
349361 private String logFilePath ;
350362 private int fixedExposedPort ;
351363 private Function <StrimziKafkaContainer , String > bootstrapServersProvider ;
364+ private Consumer <StrimziKafkaContainer > containerCustomizer ;
352365
353366 // OAuth fields
354367 private boolean oauthEnabled ;
@@ -535,14 +548,26 @@ public StrimziKafkaClusterBuilder withPort(final int fixedPort) {
535548 * for example when using a shared network with specific hostnames.
536549 * </p>
537550 *
538- * @param provider a function that takes a {@link StrimziKafkaContainer } and returns the bootstrap servers string
551+ * @param provider a function that takes a {@link GenericContainer } and returns the bootstrap servers string
539552 * @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
540553 */
541554 public StrimziKafkaClusterBuilder withBootstrapServers (final Function <StrimziKafkaContainer , String > provider ) {
542555 this .bootstrapServersProvider = provider ;
543556 return this ;
544557 }
545558
559+ /**
560+ * Assigns a customizer function to modify each Kafka container after its creation.
561+ * This allows for additional configurations or adjustments to be applied to each container.
562+ *
563+ * @param customizer a consumer that takes a {@link GenericContainer} for customization
564+ * @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
565+ */
566+ public StrimziKafkaClusterBuilder withContainerCustomizer (final Consumer <StrimziKafkaContainer > customizer ) {
567+ this .containerCustomizer = customizer ;
568+ return this ;
569+ }
570+
546571 /**
547572 * Configures OAuth settings for all nodes in the cluster.
548573 *
@@ -631,10 +656,8 @@ public StrimziKafkaCluster build() {
631656 *
632657 * @return Collection of GenericContainer representing the cluster nodes
633658 */
634- public Collection <GenericContainer <?>> getNodes () {
635- return nodes .stream ()
636- .map (node -> (GenericContainer <?>) node )
637- .collect (Collectors .toList ());
659+ public Collection <StrimziKafkaContainer > getNodes () {
660+ return nodes ;
638661 }
639662
640663 /**
@@ -644,7 +667,7 @@ public Collection<GenericContainer<?>> getNodes() {
644667 @ DoNotMutate
645668 public String getNetworkBootstrapServers () {
646669 return getBrokers ().stream ()
647- .map (broker -> (( StrimziKafkaContainer ) broker ). getNetworkBootstrapServers () )
670+ .map (StrimziKafkaContainer :: getNetworkBootstrapServers )
648671 .collect (Collectors .joining ("," ));
649672 }
650673
@@ -673,7 +696,7 @@ public String getBootstrapControllers() {
673696 @ DoNotMutate
674697 public String getNetworkBootstrapControllers () {
675698 return getControllers ().stream ()
676- .map (controller -> (( StrimziKafkaContainer ) controller ). getNetworkBootstrapControllers () )
699+ .map (StrimziKafkaContainer :: getNetworkBootstrapControllers )
677700 .collect (Collectors .joining ("," ));
678701 }
679702
@@ -716,7 +739,7 @@ public void start() {
716739 }
717740
718741 // Start all Kafka containers
719- Stream <KafkaContainer > startables = this .nodes .stream ();
742+ Stream <StrimziKafkaContainer > startables = this .nodes .stream ();
720743 try {
721744 Startables .deepStart (startables ).get (60 , TimeUnit .SECONDS );
722745 } catch (InterruptedException e ) {
@@ -737,10 +760,8 @@ public void start() {
737760 private boolean checkAllBrokersReady () {
738761 try {
739762 // check broker nodes for quorum readiness (if combined-node then we check all nodes)
740- Collection <KafkaContainer > brokersToCheck = getBrokers ();
741-
742- for (KafkaContainer kafkaContainer : brokersToCheck ) {
743- if (!isBrokerReady ((StrimziKafkaContainer ) kafkaContainer )) {
763+ for (StrimziKafkaContainer kafkaContainer : getBrokers ()) {
764+ if (!isBrokerReady (kafkaContainer )) {
744765 return false ;
745766 }
746767 }
@@ -850,7 +871,7 @@ protected Network getNetwork() {
850871 *
851872 * @return Collection of controller nodes
852873 */
853- public Collection <KafkaContainer > getControllers () {
874+ public Collection <StrimziKafkaContainer > getControllers () {
854875 if (this .useDedicatedRoles ) {
855876 return this .controllers ;
856877 } else {
@@ -867,7 +888,7 @@ public Collection<KafkaContainer> getControllers() {
867888 *
868889 * @return Collection of broker nodes
869890 */
870- public Collection <KafkaContainer > getBrokers () {
891+ public Collection <StrimziKafkaContainer > getBrokers () {
871892 if (this .useDedicatedRoles ) {
872893 return this .brokers ;
873894 } else {
@@ -899,8 +920,7 @@ public Proxy getProxyForNode(int nodeId) {
899920 throw new IllegalStateException ("Proxy container has not been configured for this cluster" );
900921 }
901922
902- for (final KafkaContainer node : this .nodes ) {
903- final StrimziKafkaContainer kafkaNode = (StrimziKafkaContainer ) node ;
923+ for (final StrimziKafkaContainer kafkaNode : this .nodes ) {
904924 if (kafkaNode .getNodeId () == nodeId ) {
905925 return kafkaNode .getProxy ();
906926 }
0 commit comments