@@ -10,10 +10,14 @@ public sealed class KafkaBuilder : ContainerBuilder<KafkaBuilder, KafkaContainer
1010
1111 public const ushort BrokerPort = 9093 ;
1212
13+ public const ushort ControllerPort = 9094 ;
14+
1315 public const ushort ZookeeperPort = 2181 ;
1416
1517 public const string StartupScriptFilePath = "/testcontainers.sh" ;
1618
19+ private const string ProtocolPrefix = "TC" ;
20+
1721 /// <summary>
1822 /// Initializes a new instance of the <see cref="KafkaBuilder" /> class.
1923 /// </summary>
@@ -43,6 +47,49 @@ public override KafkaContainer Build()
4347 return new KafkaContainer ( DockerResourceConfiguration ) ;
4448 }
4549
50+ /// <summary>
51+ /// Adds a listener to the Kafka configuration in the format <c>host:port</c>.
52+ /// </summary>
53+ /// <remarks>
54+ /// The host will be included as a network alias, allowing additional connections
55+ /// to the Kafka broker within the same container network.
56+ ///
57+ /// This method is useful for registering custom listeners beyond the default ones,
58+ /// enabling specific connection points for Kafka brokers.
59+ ///
60+ /// Default listeners include:
61+ /// - <c>PLAINTEXT://0.0.0.0:9092</c>
62+ /// - <c>BROKER://0.0.0.0:9093</c>
63+ /// - <c>CONTROLLER://0.0.0.0:9094</c>
64+ /// </remarks>
65+ /// <param name="kafka">The MsSql database.</param>
66+ /// <returns>A configured instance of <see cref="KafkaBuilder" />.</returns>
67+ public KafkaBuilder WithListener ( string kafka )
68+ {
69+ var index = DockerResourceConfiguration . Listeners ? . Count ( ) ?? 0 ;
70+ var protocol = $ "{ ProtocolPrefix } -{ index } ";
71+ var listener = $ "{ protocol } ://{ kafka } ";
72+ var listenerSecurityProtocolMap = $ "{ protocol } :PLAINTEXT";
73+
74+ var listeners = new [ ] { listener } ;
75+ var listenersSecurityProtocolMap = new [ ] { listenerSecurityProtocolMap } ;
76+
77+ var host = kafka . Split ( ':' ) [ 0 ] ;
78+
79+ var updatedListeners = DockerResourceConfiguration . Environments [ "KAFKA_LISTENERS" ]
80+ . Split ( ',' )
81+ . Concat ( listeners ) ;
82+
83+ var updatedListenersSecurityProtocolMap = DockerResourceConfiguration . Environments [ "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" ]
84+ . Split ( ',' )
85+ . Concat ( listenersSecurityProtocolMap ) ;
86+
87+ return Merge ( DockerResourceConfiguration , new KafkaConfiguration ( listeners , listeners ) )
88+ . WithEnvironment ( "KAFKA_LISTENERS" , string . Join ( "," , updatedListeners ) )
89+ . WithEnvironment ( "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" , string . Join ( "," , updatedListenersSecurityProtocolMap ) )
90+ . WithNetworkAliases ( host ) ;
91+ }
92+
4693 /// <inheritdoc />
4794 protected override KafkaBuilder Init ( )
4895 {
@@ -51,10 +98,12 @@ protected override KafkaBuilder Init()
5198 . WithPortBinding ( KafkaPort , true )
5299 . WithPortBinding ( BrokerPort , true )
53100 . WithPortBinding ( ZookeeperPort , true )
54- . WithEnvironment ( "KAFKA_LISTENERS" , "PLAINTEXT://0.0.0.0:" + KafkaPort + " ,BROKER://0.0.0.0:" + BrokerPort )
55- . WithEnvironment ( "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" , "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT" )
101+ . WithEnvironment ( "KAFKA_LISTENERS" , $ "PLAINTEXT://0.0.0.0:{ KafkaPort } ,BROKER://0.0.0.0:{ BrokerPort } ,CONTROLLER://0.0.0.0: { ControllerPort } " )
102+ . WithEnvironment ( "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" , "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT, PLAINTEXT:PLAINTEXT" )
56103 . WithEnvironment ( "KAFKA_INTER_BROKER_LISTENER_NAME" , "BROKER" )
57104 . WithEnvironment ( "KAFKA_BROKER_ID" , "1" )
105+ . WithEnvironment ( "KAFKA_NODE_ID" , "1" )
106+ . WithEnvironment ( "KAFKA_CONTROLLER_QUORUM_VOTERS" , "1@localhost:" + ControllerPort )
58107 . WithEnvironment ( "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR" , "1" )
59108 . WithEnvironment ( "KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS" , "1" )
60109 . WithEnvironment ( "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR" , "1" )
@@ -68,6 +117,7 @@ protected override KafkaBuilder Init()
68117 . WithStartupCallback ( ( container , ct ) =>
69118 {
70119 const char lf = '\n ' ;
120+ var additionalAdvertisedListeners = string . Join ( "," , container . AdvertisedListeners ?? Array . Empty < string > ( ) ) ;
71121 var startupScript = new StringBuilder ( ) ;
72122 startupScript . Append ( "#!/bin/bash" ) ;
73123 startupScript . Append ( lf ) ;
@@ -79,7 +129,7 @@ protected override KafkaBuilder Init()
79129 startupScript . Append ( lf ) ;
80130 startupScript . Append ( "zookeeper-server-start zookeeper.properties &" ) ;
81131 startupScript . Append ( lf ) ;
82- startupScript . Append ( "export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container . Hostname + ":" + container . GetMappedPublicPort ( KafkaPort ) + ",BROKER://" + container . IpAddress + ":" + BrokerPort ) ;
132+ startupScript . Append ( "export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container . Hostname + ":" + container . GetMappedPublicPort ( KafkaPort ) + ",BROKER://" + container . IpAddress + ":" + BrokerPort + "," + additionalAdvertisedListeners ) ;
83133 startupScript . Append ( lf ) ;
84134 startupScript . Append ( "echo '' > /etc/confluent/docker/ensure" ) ;
85135 startupScript . Append ( lf ) ;
0 commit comments