@@ -26,7 +26,8 @@ class KafkaContainer(DockerContainer):
26
26
KAFKA_PORT = 9093
27
27
TC_START_SCRIPT = '/tc-start.sh'
28
28
29
- def __init__ (self , image = "confluentinc/cp-kafka:5.4.3" , port_to_expose = KAFKA_PORT , ** kwargs ):
29
+ def __init__ (self , image : str = "confluentinc/cp-kafka:5.4.3" , port_to_expose : int = KAFKA_PORT ,
30
+ ** kwargs ) -> None :
30
31
super (KafkaContainer , self ).__init__ (image , ** kwargs )
31
32
self .port_to_expose = port_to_expose
32
33
self .with_exposed_ports (self .port_to_expose )
@@ -42,19 +43,19 @@ def __init__(self, image="confluentinc/cp-kafka:5.4.3", port_to_expose=KAFKA_POR
42
43
self .with_env ('KAFKA_LOG_FLUSH_INTERVAL_MESSAGES' , '10000000' )
43
44
self .with_env ('KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS' , '0' )
44
45
45
- def get_bootstrap_server (self ):
46
+ def get_bootstrap_server (self ) -> str :
46
47
host = self .get_container_host_ip ()
47
48
port = self .get_exposed_port (self .port_to_expose )
48
49
return '{}:{}' .format (host , port )
49
50
50
51
@wait_container_is_ready (UnrecognizedBrokerVersion , NoBrokersAvailable , KafkaError , ValueError )
51
- def _connect (self ):
52
+ def _connect (self ) -> None :
52
53
bootstrap_server = self .get_bootstrap_server ()
53
54
consumer = KafkaConsumer (group_id = 'test' , bootstrap_servers = [bootstrap_server ])
54
55
if not consumer .bootstrap_connected ():
55
56
raise KafkaError ("Unable to connect with kafka container!" )
56
57
57
- def tc_start (self ):
58
+ def tc_start (self ) -> None :
58
59
host = self .get_container_host_ip ()
59
60
port = self .get_exposed_port (self .port_to_expose )
60
61
listeners = 'PLAINTEXT://{}:{},BROKER://$(hostname -i):9092' .format (host , port )
@@ -78,7 +79,7 @@ def tc_start(self):
78
79
)
79
80
self .create_file (data , KafkaContainer .TC_START_SCRIPT )
80
81
81
- def start (self ):
82
+ def start (self ) -> "KafkaContainer" :
82
83
script = KafkaContainer .TC_START_SCRIPT
83
84
command = 'sh -c "while [ ! -f {} ]; do sleep 0.1; done; sh {}"' .format (script , script )
84
85
self .with_command (command )
@@ -87,7 +88,7 @@ def start(self):
87
88
self ._connect ()
88
89
return self
89
90
90
- def create_file (self , content : bytes , path : str ):
91
+ def create_file (self , content : bytes , path : str ) -> None :
91
92
with BytesIO () as archive , tarfile .TarFile (fileobj = archive , mode = "w" ) as tar :
92
93
tarinfo = tarfile .TarInfo (name = path )
93
94
tarinfo .size = len (content )
0 commit comments