@@ -25,6 +25,71 @@ def get_kafka_dir(ctx, config):
2525 current_version = kafka_prefix + kafka_version
2626 return '{tdir}/{ver}' .format (tdir = teuthology .get_testdir (ctx ),ver = current_version )
2727
28+ def zookeeper_conf (ctx , client , _id , kafka_dir ):
29+ conf = """
30+ # zookeeper{_id}.properties
31+ dataDir={tdir}/data/zookeeper{_id}
32+ clientPort=218{_id}
33+ maxClientCnxns=0
34+ admin.enableServer=false
35+ tickTime=2000
36+ initLimit=10
37+ syncLimit=5
38+ server.1=localhost:2888:3888
39+ server.2=localhost:2889:3889
40+ """ .format (tdir = kafka_dir , _id = _id )
41+ file_name = 'zookeeper{_id}.properties' .format (_id = _id )
42+ log .info ("zookeeper conf file: %s" , file_name )
43+ log .info (conf )
44+ return ctx .cluster .only (client ).run (
45+ args = [
46+ 'cd' , kafka_dir , run .Raw ('&&' ),
47+ 'mkdir' , '-p' , 'config' , run .Raw ('&&' ),
48+ 'mkdir' , '-p' , 'data/zookeeper{_id}' .format (_id = _id ), run .Raw ('&&' ),
49+ 'echo' , conf , run .Raw ('>' ), 'config/{file_name}' .format (file_name = file_name ), run .Raw ('&&' ),
50+ 'echo' , str (_id ), run .Raw ('>' ), 'data/zookeeper{_id}/myid' .format (_id = _id )
51+ ],
52+ )
53+
54+
55+ def broker_conf (ctx , client , _id , kafka_dir ):
56+ (remote ,) = ctx .cluster .only (client ).remotes .keys ()
57+ conf = """
58+ # kafka{_id}.properties
59+ broker.id={_id}
60+ listeners=PLAINTEXT://0.0.0.0:909{_id}
61+ advertised.listeners=PLAINTEXT://{ip}:909{_id}
62+ log.dirs={tdir}/data/kafka-logs-{_id}
63+ num.network.threads=3
64+ num.io.threads=8
65+ socket.send.buffer.bytes=102400
66+ socket.receive.buffer.bytes=102400
67+ socket.request.max.bytes=369295617
68+ num.partitions=1
69+ num.recovery.threads.per.data.dir=1
70+ offsets.topic.replication.factor=2
71+ transaction.state.log.replication.factor=2
72+ transaction.state.log.min.isr=2
73+ log.retention.hours=168
74+ log.segment.bytes=1073741824
75+ log.retention.check.interval.ms=300000
76+ zookeeper.connect=localhost:2181,localhost:2182
77+ zookeeper.connection.timeout.ms=18000
78+ group.initial.rebalance.delay.ms=0
79+ metadata.max.age.ms=3000
80+ """ .format (tdir = kafka_dir , _id = _id , ip = remote .ip_address )
81+ file_name = 'kafka{_id}.properties' .format (_id = _id )
82+ log .info ("kafka conf file: %s" , file_name )
83+ log .info (conf )
84+ return ctx .cluster .only (client ).run (
85+ args = [
86+ 'cd' , kafka_dir , run .Raw ('&&' ),
87+ 'mkdir' , '-p' , 'config' , run .Raw ('&&' ),
88+ 'mkdir' , '-p' , 'data' , run .Raw ('&&' ),
89+ 'echo' , conf , run .Raw ('>' ), 'config/{file_name}' .format (file_name = file_name )
90+ ],
91+ )
92+
2893
2994@contextlib .contextmanager
3095def install_kafka (ctx , config ):
@@ -59,45 +124,21 @@ def install_kafka(ctx, config):
59124 )
60125
61126 kafka_dir = get_kafka_dir (ctx , config )
62- # create config for second broker
63- second_broker_config_name = "server2.properties"
64- second_broker_data = "{tdir}/data/broker02" .format (tdir = kafka_dir )
65- second_broker_data_logs_escaped = "{}/logs" .format (second_broker_data ).replace ("/" , "\/" )
66-
67- ctx .cluster .only (client ).run (
68- args = ['cd' , '{tdir}' .format (tdir = kafka_dir ), run .Raw ('&&' ),
69- 'cp' , '{tdir}/config/server.properties' .format (tdir = kafka_dir ), '{tdir}/config/{second_broker_config_name}' .format (tdir = kafka_dir , second_broker_config_name = second_broker_config_name ), run .Raw ('&&' ),
70- 'mkdir' , '-p' , '{tdir}/data' .format (tdir = kafka_dir )
71- ],
72- )
73-
74- # edit config
75- ctx .cluster .only (client ).run (
76- args = ['sed' , '-i' , 's/broker.id=0/broker.id=1/g' , '{tdir}/config/{second_broker_config_name}' .format (tdir = kafka_dir , second_broker_config_name = second_broker_config_name ), run .Raw ('&&' ),
77- 'sed' , '-i' , 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/localhost:19092/g' , '{tdir}/config/{second_broker_config_name}' .format (tdir = kafka_dir , second_broker_config_name = second_broker_config_name ), run .Raw ('&&' ),
78- 'sed' , '-i' , 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/localhost:19092/g' , '{tdir}/config/{second_broker_config_name}' .format (tdir = kafka_dir , second_broker_config_name = second_broker_config_name ), run .Raw ('&&' ),
79- 'sed' , '-i' , 's/log.dirs=\/tmp\/kafka-logs/log.dirs={}/g' .format (second_broker_data_logs_escaped ), '{tdir}/config/{second_broker_config_name}' .format (tdir = kafka_dir , second_broker_config_name = second_broker_config_name ), run .Raw ('&&' ),
80- 'cat' , '{tdir}/config/{second_broker_config_name}' .format (tdir = kafka_dir , second_broker_config_name = second_broker_config_name )
81- ]
82- )
127+ # create config for 2 zookeepers
128+ zookeeper_conf (ctx , client , 1 , kafka_dir )
129+ zookeeper_conf (ctx , client , 2 , kafka_dir )
130+ # create config for 2 brokers
131+ broker_conf (ctx , client , 1 , kafka_dir )
132+ broker_conf (ctx , client , 2 , kafka_dir )
83133
84134 try :
85135 yield
86136 finally :
87137 log .info ('Removing packaged dependencies of Kafka...' )
88- test_dir = get_kafka_dir (ctx , config )
89- current_version = get_kafka_version (config )
138+ kafka_dir = get_kafka_dir (ctx , config )
90139 for (client ,_ ) in config .items ():
91140 ctx .cluster .only (client ).run (
92- args = ['rm' , '-rf' , '{tdir}/logs' .format (tdir = test_dir )],
93- )
94-
95- ctx .cluster .only (client ).run (
96- args = ['rm' , '-rf' , test_dir ],
97- )
98-
99- ctx .cluster .only (client ).run (
100- args = ['rm' , '-rf' , '{tdir}/{doc}' .format (tdir = teuthology .get_testdir (ctx ),doc = kafka_file )],
141+ args = ['rm' , '-rf' , '{tdir}' .format (tdir = kafka_dir )],
101142 )
102143
103144
@@ -114,32 +155,48 @@ def run_kafka(ctx,config):
114155 (remote ,) = ctx .cluster .only (client ).remotes .keys ()
115156 kafka_dir = get_kafka_dir (ctx , config )
116157
117- second_broker_data = "{tdir}/data/broker02" .format (tdir = kafka_dir )
118- second_broker_java_log_dir = "{}/java_logs" .format (second_broker_data )
119-
120158 ctx .cluster .only (client ).run (
121159 args = ['cd' , '{tdir}/bin' .format (tdir = kafka_dir ), run .Raw ('&&' ),
122- './zookeeper-server-start.sh' ,
123- '{tir}/config/zookeeper.properties' .format (tir = kafka_dir ),
124- run .Raw ('&' ), 'exit'
160+ './zookeeper-server-start.sh' , '-daemon' ,
161+ '{tdir}/config/zookeeper1.properties' .format (tdir = kafka_dir )
162+ ],
163+ )
164+ ctx .cluster .only (client ).run (
165+ args = ['cd' , '{tdir}/bin' .format (tdir = kafka_dir ), run .Raw ('&&' ),
166+ './zookeeper-server-start.sh' , '-daemon' ,
167+ '{tdir}/config/zookeeper2.properties' .format (tdir = kafka_dir )
125168 ],
126169 )
170+ # wait for zookeepers to start
171+ time .sleep (5 )
172+ for zk_id in [1 , 2 ]:
173+ ctx .cluster .only (client ).run (
174+ args = ['cd' , '{tdir}/bin' .format (tdir = kafka_dir ), run .Raw ('&&' ),
175+ './zookeeper-shell.sh' , 'localhost:218{_id}' .format (_id = zk_id ), 'ls' , '/' ],
176+ )
177+ zk_started = False
178+ while not zk_started :
179+ result = ctx .cluster .only (client ).run (
180+ args = ['cd' , '{tdir}/bin' .format (tdir = kafka_dir ), run .Raw ('&&' ),
181+ './zookeeper-shell.sh' , 'localhost:218{_id}' .format (_id = zk_id ), 'ls' , '/' ],
182+ )
183+ log .info ("Checking if Zookeeper %d is started. Result: %s" , zk_id , str (result ))
184+ zk_started = True
127185
128186 ctx .cluster .only (client ).run (
129187 args = ['cd' , '{tdir}/bin' .format (tdir = kafka_dir ), run .Raw ('&&' ),
130- './kafka-server-start.sh' ,
131- '{tir}/config/server.properties' .format (tir = get_kafka_dir (ctx , config )),
132- run .Raw ('&' ), 'exit'
188+ './kafka-server-start.sh' , '-daemon' ,
189+ '{tdir}/config/kafka1.properties' .format (tdir = get_kafka_dir (ctx , config ))
133190 ],
134191 )
135-
136192 ctx .cluster .only (client ).run (
137193 args = ['cd' , '{tdir}/bin' .format (tdir = kafka_dir ), run .Raw ('&&' ),
138- run .Raw ('LOG_DIR={second_broker_java_log_dir}' .format (second_broker_java_log_dir = second_broker_java_log_dir )),
139- './kafka-server-start.sh' , '{tdir}/config/server2.properties' .format (tdir = kafka_dir ),
140- run .Raw ('&' ), 'exit'
194+ './kafka-server-start.sh' , '-daemon' ,
195+ '{tdir}/config/kafka2.properties' .format (tdir = get_kafka_dir (ctx , config ))
141196 ],
142197 )
198+ # wait for kafka to start
199+ time .sleep (5 )
143200
144201 try :
145202 yield
@@ -151,27 +208,41 @@ def run_kafka(ctx,config):
151208
152209 ctx .cluster .only (client ).run (
153210 args = ['cd' , '{tdir}/bin' .format (tdir = get_kafka_dir (ctx , config )), run .Raw ('&&' ),
154- './kafka-server-stop.sh' ,
155- '{tir }/config/kafka .properties' .format (tir = get_kafka_dir (ctx , config )),
211+ './kafka-server-stop.sh' ,
212+ '{tdir }/config/kafka1 .properties' .format (tdir = get_kafka_dir (ctx , config )),
156213 ],
157214 )
158215
216+ ctx .cluster .only (client ).run (
217+ args = ['cd' , '{tdir}/bin' .format (tdir = get_kafka_dir (ctx , config )), run .Raw ('&&' ),
218+ './kafka-server-stop.sh' ,
219+ '{tdir}/config/kafka2.properties' .format (tdir = get_kafka_dir (ctx , config )),
220+ ],
221+ )
222+
223+ # wait for kafka to stop
159224 time .sleep (5 )
160225
161226 ctx .cluster .only (client ).run (
162- args = ['cd' , '{tdir}/bin' .format (tdir = get_kafka_dir (ctx , config )), run .Raw ('&&' ),
227+ args = ['cd' , '{tdir}/bin' .format (tdir = get_kafka_dir (ctx , config )), run .Raw ('&&' ),
228+ './zookeeper-server-stop.sh' ,
229+ '{tir}/config/zookeeper1.properties' .format (tir = get_kafka_dir (ctx , config )),
230+ ],
231+ )
232+ ctx .cluster .only (client ).run (
233+ args = ['cd' , '{tdir}/bin' .format (tdir = get_kafka_dir (ctx , config )), run .Raw ('&&' ),
163234 './zookeeper-server-stop.sh' ,
164- '{tir}/config/zookeeper .properties' .format (tir = get_kafka_dir (ctx , config )),
235+ '{tir}/config/zookeeper2 .properties' .format (tir = get_kafka_dir (ctx , config )),
165236 ],
166237 )
167238
239+ # wait for zookeeper to stop
168240 time .sleep (5 )
169-
170241 ctx .cluster .only (client ).run (args = ['killall' , '-9' , 'java' ])
171242
172243
173244@contextlib .contextmanager
174- def run_admin_cmds (ctx ,config ):
245+ def run_admin_cmds (ctx , config ):
175246 """
176247 Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic.
177248 """
@@ -182,9 +253,9 @@ def run_admin_cmds(ctx,config):
182253
183254 ctx .cluster .only (client ).run (
184255 args = [
185- 'cd' , '{tdir}/bin' .format (tdir = get_kafka_dir (ctx , config )), run .Raw ('&&' ),
256+ 'cd' , '{tdir}/bin' .format (tdir = get_kafka_dir (ctx , config )), run .Raw ('&&' ),
186257 './kafka-topics.sh' , '--create' , '--topic' , 'quickstart-events' ,
187- '--bootstrap-server' , 'localhost:9092'
258+ '--bootstrap-server' , 'localhost:9091,localhost: 9092' ,
188259 ],
189260 )
190261
@@ -193,7 +264,7 @@ def run_admin_cmds(ctx,config):
193264 'cd' , '{tdir}/bin' .format (tdir = get_kafka_dir (ctx , config )), run .Raw ('&&' ),
194265 'echo' , "First" , run .Raw ('|' ),
195266 './kafka-console-producer.sh' , '--topic' , 'quickstart-events' ,
196- '--bootstrap-server' , 'localhost:9092'
267+ '--bootstrap-server' , 'localhost:9091,localhost: 9092' ,
197268 ],
198269 )
199270
@@ -202,8 +273,7 @@ def run_admin_cmds(ctx,config):
202273 'cd' , '{tdir}/bin' .format (tdir = get_kafka_dir (ctx , config )), run .Raw ('&&' ),
203274 './kafka-console-consumer.sh' , '--topic' , 'quickstart-events' ,
204275 '--from-beginning' ,
205- '--bootstrap-server' , 'localhost:9092' ,
206- run .Raw ('&' ), 'exit'
276+ '--bootstrap-server' , 'localhost:9091,localhost:9092' , '--max-messages' , '1' ,
207277 ],
208278 )
209279
0 commit comments