Skip to content

Commit 447ea3f

Browse files
committed
rgw/notifications: test refactoring
* kafka: pass full broker list to consumer in tests * kafka: use ip instead of localhost * kafka: make sure topic exists before consumer start * kafka: fix zookeeper and broker conf in tests * kafka: verify receiver in the test * kafka: tests were not running (Fixes: https://tracker.ceph.com/issues/72240) * kafka: failover tests were failing (Fixes: https://tracker.ceph.com/issues/71585) * simplify basic tests run command * v2 migration tests were not running * fix failing migration tests Signed-off-by: Yuval Lifshitz <[email protected]>
1 parent 149c9d1 commit 447ea3f

File tree

10 files changed

+355
-203
lines changed

10 files changed

+355
-203
lines changed
File renamed without changes.
File renamed without changes.

qa/suites/rgw/notifications/tasks/others/0-install.yaml renamed to qa/suites/rgw/notifications/tasks/basic/0-install.yaml

File renamed without changes.

qa/suites/rgw/notifications/tasks/others/supported-distros renamed to qa/suites/rgw/notifications/tasks/basic/supported-distros

File renamed without changes.

qa/suites/rgw/notifications/tasks/others/test_others.yaml renamed to qa/suites/rgw/notifications/tasks/basic/test_basic.yaml

File renamed without changes.

qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ tasks:
44
kafka_version: 3.8.1
55
- notification-tests:
66
client.0:
7-
extra_attr: ["kafka_test", "data_path_v2_kafka_test"]
7+
extra_attr: ["kafka_test"]
88
rgw_server: client.0

qa/tasks/kafka_failover.py

Lines changed: 126 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,71 @@ def get_kafka_dir(ctx, config):
2525
current_version = kafka_prefix + kafka_version
2626
return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
2727

28+
def zookeeper_conf(ctx, client, _id, kafka_dir):
29+
conf = """
30+
# zookeeper{_id}.properties
31+
dataDir={tdir}/data/zookeeper{_id}
32+
clientPort=218{_id}
33+
maxClientCnxns=0
34+
admin.enableServer=false
35+
tickTime=2000
36+
initLimit=10
37+
syncLimit=5
38+
server.1=localhost:2888:3888
39+
server.2=localhost:2889:3889
40+
""".format(tdir=kafka_dir, _id=_id)
41+
file_name = 'zookeeper{_id}.properties'.format(_id=_id)
42+
log.info("zookeeper conf file: %s", file_name)
43+
log.info(conf)
44+
return ctx.cluster.only(client).run(
45+
args=[
46+
'cd', kafka_dir, run.Raw('&&'),
47+
'mkdir', '-p', 'config', run.Raw('&&'),
48+
'mkdir', '-p', 'data/zookeeper{_id}'.format(_id=_id), run.Raw('&&'),
49+
'echo', conf, run.Raw('>'), 'config/{file_name}'.format(file_name=file_name), run.Raw('&&'),
50+
'echo', str(_id), run.Raw('>'), 'data/zookeeper{_id}/myid'.format(_id=_id)
51+
],
52+
)
53+
54+
55+
def broker_conf(ctx, client, _id, kafka_dir):
56+
(remote,) = ctx.cluster.only(client).remotes.keys()
57+
conf = """
58+
# kafka{_id}.properties
59+
broker.id={_id}
60+
listeners=PLAINTEXT://0.0.0.0:909{_id}
61+
advertised.listeners=PLAINTEXT://{ip}:909{_id}
62+
log.dirs={tdir}/data/kafka-logs-{_id}
63+
num.network.threads=3
64+
num.io.threads=8
65+
socket.send.buffer.bytes=102400
66+
socket.receive.buffer.bytes=102400
67+
socket.request.max.bytes=369295617
68+
num.partitions=1
69+
num.recovery.threads.per.data.dir=1
70+
offsets.topic.replication.factor=2
71+
transaction.state.log.replication.factor=2
72+
transaction.state.log.min.isr=2
73+
log.retention.hours=168
74+
log.segment.bytes=1073741824
75+
log.retention.check.interval.ms=300000
76+
zookeeper.connect=localhost:2181,localhost:2182
77+
zookeeper.connection.timeout.ms=18000
78+
group.initial.rebalance.delay.ms=0
79+
metadata.max.age.ms=3000
80+
""".format(tdir=kafka_dir, _id=_id, ip=remote.ip_address)
81+
file_name = 'kafka{_id}.properties'.format(_id=_id)
82+
log.info("kafka conf file: %s", file_name)
83+
log.info(conf)
84+
return ctx.cluster.only(client).run(
85+
args=[
86+
'cd', kafka_dir, run.Raw('&&'),
87+
'mkdir', '-p', 'config', run.Raw('&&'),
88+
'mkdir', '-p', 'data', run.Raw('&&'),
89+
'echo', conf, run.Raw('>'), 'config/{file_name}'.format(file_name=file_name)
90+
],
91+
)
92+
2893

2994
@contextlib.contextmanager
3095
def install_kafka(ctx, config):
@@ -59,45 +124,21 @@ def install_kafka(ctx, config):
59124
)
60125

61126
kafka_dir = get_kafka_dir(ctx, config)
62-
# create config for second broker
63-
second_broker_config_name = "server2.properties"
64-
second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir)
65-
second_broker_data_logs_escaped = "{}/logs".format(second_broker_data).replace("/", "\/")
66-
67-
ctx.cluster.only(client).run(
68-
args=['cd', '{tdir}'.format(tdir=kafka_dir), run.Raw('&&'),
69-
'cp', '{tdir}/config/server.properties'.format(tdir=kafka_dir), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
70-
'mkdir', '-p', '{tdir}/data'.format(tdir=kafka_dir)
71-
],
72-
)
73-
74-
# edit config
75-
ctx.cluster.only(client).run(
76-
args=['sed', '-i', 's/broker.id=0/broker.id=1/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
77-
'sed', '-i', 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
78-
'sed', '-i', 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
79-
'sed', '-i', 's/log.dirs=\/tmp\/kafka-logs/log.dirs={}/g'.format(second_broker_data_logs_escaped), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
80-
'cat', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name)
81-
]
82-
)
127+
# create config for 2 zookeepers
128+
zookeeper_conf(ctx, client, 1, kafka_dir)
129+
zookeeper_conf(ctx, client, 2, kafka_dir)
130+
# create config for 2 brokers
131+
broker_conf(ctx, client, 1, kafka_dir)
132+
broker_conf(ctx, client, 2, kafka_dir)
83133

84134
try:
85135
yield
86136
finally:
87137
log.info('Removing packaged dependencies of Kafka...')
88-
test_dir=get_kafka_dir(ctx, config)
89-
current_version = get_kafka_version(config)
138+
kafka_dir=get_kafka_dir(ctx, config)
90139
for (client,_) in config.items():
91140
ctx.cluster.only(client).run(
92-
args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)],
93-
)
94-
95-
ctx.cluster.only(client).run(
96-
args=['rm', '-rf', test_dir],
97-
)
98-
99-
ctx.cluster.only(client).run(
100-
args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=kafka_file)],
141+
args=['rm', '-rf', '{tdir}'.format(tdir=kafka_dir)],
101142
)
102143

103144

@@ -114,32 +155,48 @@ def run_kafka(ctx,config):
114155
(remote,) = ctx.cluster.only(client).remotes.keys()
115156
kafka_dir = get_kafka_dir(ctx, config)
116157

117-
second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir)
118-
second_broker_java_log_dir = "{}/java_logs".format(second_broker_data)
119-
120158
ctx.cluster.only(client).run(
121159
args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
122-
'./zookeeper-server-start.sh',
123-
'{tir}/config/zookeeper.properties'.format(tir=kafka_dir),
124-
run.Raw('&'), 'exit'
160+
'./zookeeper-server-start.sh', '-daemon',
161+
'{tdir}/config/zookeeper1.properties'.format(tdir=kafka_dir)
162+
],
163+
)
164+
ctx.cluster.only(client).run(
165+
args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
166+
'./zookeeper-server-start.sh', '-daemon',
167+
'{tdir}/config/zookeeper2.properties'.format(tdir=kafka_dir)
125168
],
126169
)
170+
# wait for zookeepers to start
171+
time.sleep(5)
172+
for zk_id in [1, 2]:
173+
ctx.cluster.only(client).run(
174+
args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
175+
'./zookeeper-shell.sh', 'localhost:218{_id}'.format(_id=zk_id), 'ls', '/'],
176+
)
177+
zk_started = False
178+
while not zk_started:
179+
result = ctx.cluster.only(client).run(
180+
args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
181+
'./zookeeper-shell.sh', 'localhost:218{_id}'.format(_id=zk_id), 'ls', '/'],
182+
)
183+
log.info("Checking if Zookeeper %d is started. Result: %s", zk_id, str(result))
184+
zk_started = True
127185

128186
ctx.cluster.only(client).run(
129187
args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
130-
'./kafka-server-start.sh',
131-
'{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
132-
run.Raw('&'), 'exit'
188+
'./kafka-server-start.sh', '-daemon',
189+
'{tdir}/config/kafka1.properties'.format(tdir=get_kafka_dir(ctx, config))
133190
],
134191
)
135-
136192
ctx.cluster.only(client).run(
137193
args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
138-
run.Raw('LOG_DIR={second_broker_java_log_dir}'.format(second_broker_java_log_dir=second_broker_java_log_dir)),
139-
'./kafka-server-start.sh', '{tdir}/config/server2.properties'.format(tdir=kafka_dir),
140-
run.Raw('&'), 'exit'
194+
'./kafka-server-start.sh', '-daemon',
195+
'{tdir}/config/kafka2.properties'.format(tdir=get_kafka_dir(ctx, config))
141196
],
142197
)
198+
# wait for kafka to start
199+
time.sleep(5)
143200

144201
try:
145202
yield
@@ -151,27 +208,41 @@ def run_kafka(ctx,config):
151208

152209
ctx.cluster.only(client).run(
153210
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
154-
'./kafka-server-stop.sh',
155-
'{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
211+
'./kafka-server-stop.sh',
212+
'{tdir}/config/kafka1.properties'.format(tdir=get_kafka_dir(ctx, config)),
156213
],
157214
)
158215

216+
ctx.cluster.only(client).run(
217+
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
218+
'./kafka-server-stop.sh',
219+
'{tdir}/config/kafka2.properties'.format(tdir=get_kafka_dir(ctx, config)),
220+
],
221+
)
222+
223+
# wait for kafka to stop
159224
time.sleep(5)
160225

161226
ctx.cluster.only(client).run(
162-
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
227+
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
228+
'./zookeeper-server-stop.sh',
229+
'{tir}/config/zookeeper1.properties'.format(tir=get_kafka_dir(ctx, config)),
230+
],
231+
)
232+
ctx.cluster.only(client).run(
233+
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
163234
'./zookeeper-server-stop.sh',
164-
'{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
235+
'{tir}/config/zookeeper2.properties'.format(tir=get_kafka_dir(ctx, config)),
165236
],
166237
)
167238

239+
# wait for zookeeper to stop
168240
time.sleep(5)
169-
170241
ctx.cluster.only(client).run(args=['killall', '-9', 'java'])
171242

172243

173244
@contextlib.contextmanager
174-
def run_admin_cmds(ctx,config):
245+
def run_admin_cmds(ctx, config):
175246
"""
176247
Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic.
177248
"""
@@ -182,9 +253,9 @@ def run_admin_cmds(ctx,config):
182253

183254
ctx.cluster.only(client).run(
184255
args=[
185-
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
256+
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
186257
'./kafka-topics.sh', '--create', '--topic', 'quickstart-events',
187-
'--bootstrap-server', 'localhost:9092'
258+
'--bootstrap-server', 'localhost:9091,localhost:9092',
188259
],
189260
)
190261

@@ -193,7 +264,7 @@ def run_admin_cmds(ctx,config):
193264
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
194265
'echo', "First", run.Raw('|'),
195266
'./kafka-console-producer.sh', '--topic', 'quickstart-events',
196-
'--bootstrap-server', 'localhost:9092'
267+
'--bootstrap-server', 'localhost:9091,localhost:9092',
197268
],
198269
)
199270

@@ -202,8 +273,7 @@ def run_admin_cmds(ctx,config):
202273
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
203274
'./kafka-console-consumer.sh', '--topic', 'quickstart-events',
204275
'--from-beginning',
205-
'--bootstrap-server', 'localhost:9092',
206-
run.Raw('&'), 'exit'
276+
'--bootstrap-server', 'localhost:9091,localhost:9092', '--max-messages', '1',
207277
],
208278
)
209279

qa/tasks/notification_tests.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ def run_tests(ctx, config):
220220
for client, client_config in config.items():
221221
(remote,) = ctx.cluster.only(client).remotes.keys()
222222

223-
attr = ["!kafka_test", "!data_path_v2_kafka_test", "!kafka_failover", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"]
223+
attr = ["basic_test"]
224224

225225
if 'extra_attr' in client_config:
226226
attr = client_config.get('extra_attr')
@@ -291,6 +291,7 @@ def task(ctx,config):
291291
endpoint = ctx.rgw.role_endpoints.get(client)
292292
assert endpoint, 'bntests: no rgw endpoint for {}'.format(client)
293293

294+
cluster_name, _, _ = teuthology.split_role(client)
294295
bntests_conf[client] = ConfigObj(
295296
indent_type='',
296297
infile={
@@ -299,7 +300,7 @@ def task(ctx,config):
299300
'port':endpoint.port,
300301
'host':endpoint.dns_name,
301302
'zonegroup':ctx.rgw.zonegroup,
302-
'cluster':'noname',
303+
'cluster':cluster_name,
303304
'version':'v2'
304305
},
305306
's3 main':{}

src/test/rgw/bucket_notification/api.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -247,13 +247,19 @@ def delete_all_topics(conn, tenant, cluster):
247247
if tenant == '':
248248
topics_result = admin(['topic', 'list'], cluster)
249249
topics_json = json.loads(topics_result[0])
250-
for topic in topics_json:
251-
rm_result = admin(['topic', 'rm', '--topic', topic['name']], cluster)
252-
print(rm_result)
250+
try:
251+
for topic in topics_json['topics']:
252+
admin(['topic', 'rm', '--topic', topic['name']], cluster)
253+
except TypeError:
254+
for topic in topics_json:
255+
admin(['topic', 'rm', '--topic', topic['name']], cluster)
253256
else:
254257
topics_result = admin(['topic', 'list', '--tenant', tenant], cluster)
255258
topics_json = json.loads(topics_result[0])
256-
for topic in topics_json:
257-
rm_result = admin(['topic', 'rm', '--tenant', tenant, '--topic', topic['name']], cluster)
258-
print(rm_result)
259+
try:
260+
for topic in topics_json['topics']:
261+
admin(['topic', 'rm', '--tenant', tenant, '--topic', topic['name']], cluster)
262+
except TypeError:
263+
for topic in topics_json:
264+
admin(['topic', 'rm', '--tenant', tenant, '--topic', topic['name']], cluster)
259265

0 commit comments

Comments
 (0)