@@ -60,17 +60,21 @@ def __init__(
6060 image : str = "confluentinc/cp-kafka:7.6.0" ,
6161 port : int = 9093 ,
6262 wait_strategy_check_string : str = r".*\[KafkaServer id=\d+\] started.*" ,
63+ listener_name : str = "PLAINTEXT" ,
64+ security_protocol : str = "PLAINTEXT" ,
6365 ** kwargs ,
6466 ) -> None :
6567 raise_for_deprecated_parameter (kwargs , "port_to_expose" , "port" )
6668 super ().__init__ (image , ** kwargs )
6769 self .port = port
70+ self .listener_name = listener_name
71+ self .security_protocol = security_protocol
6872 self .kraft_enabled = False
6973 self .wait_for : re .Pattern [str ] = re .compile (wait_strategy_check_string )
7074 self .boot_command = ""
7175 self .cluster_id = "MkU3OEVBNTcwNTJENDM2Qk"
72- self .listeners = f"PLAINTEXT ://0.0.0.0:{ self .port } ,BROKER://0.0.0.0:9092"
73- self .security_protocol_map = "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT "
76+ self .listeners = f"{ listener_name } ://0.0.0.0:{ self .port } ,BROKER://0.0.0.0:9092"
77+ self .security_protocol_map = f "BROKER:PLAINTEXT,{ listener_name } : { security_protocol } "
7478
7579 self .with_exposed_ports (self .port )
7680 self .with_env ("KAFKA_LISTENERS" , self .listeners )
@@ -160,9 +164,9 @@ def tc_start(self) -> None:
160164 host = self .get_container_host_ip ()
161165 port = self .get_exposed_port (self .port )
162166 if kafka_config .limit_broker_to_first_host :
163- listeners = f"PLAINTEXT ://{ host } :{ port } ,BROKER://$(hostname -i | cut -d' ' -f1):9092"
167+ listeners = f"{ self . listener_name } ://{ host } :{ port } ,BROKER://$(hostname -i | cut -d' ' -f1):9092"
164168 else :
165- listeners = f"PLAINTEXT ://{ host } :{ port } ,BROKER://$(hostname -i):9092"
169+ listeners = f"{ self . listener_name } ://{ host } :{ port } ,BROKER://$(hostname -i):9092"
166170 data = (
167171 dedent (
168172 f"""
0 commit comments