Skip to content

Commit 3b959f3

Browse files
committed
test/rgw/notify: cover case with motre than 1K topics
Signed-off-by: Yuval Lifshitz <[email protected]>
1 parent b984980 commit 3b959f3

File tree

1 file changed

+105
-1
lines changed

1 file changed

+105
-1
lines changed

src/test/rgw/bucket_notification/test_bn.py

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import tempfile
44
import random
55
import threading
6+
from concurrent.futures import ThreadPoolExecutor
67
import subprocess
78
import socket
89
import time
@@ -1684,6 +1685,109 @@ def test_notification_push_kafka_multiple_brokers_append():
16841685
notification_push('kafka', conn, kafka_brokers='{host}:9091'.format(host=default_kafka_server))
16851686

16861687

1688+
@attr('manual_test')
1689+
def test_1K_topics():
1690+
""" test creation of moe than 1K topics """
1691+
conn = connection()
1692+
zonegroup = get_config_zonegroup()
1693+
base_bucket_name = gen_bucket_name()
1694+
host = get_ip()
1695+
endpoint_address = 'kafka://' + host
1696+
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
1697+
topic_count = 1200
1698+
thread_pool_size = 30
1699+
topics = []
1700+
notifications = []
1701+
buckets = []
1702+
log.info(f"creating {topic_count} topics, buckets and notifications")
1703+
print(f"creating {topic_count} topics, buckets and notifications")
1704+
# create topics buckets and notifications
1705+
def create_topic_bucket_notification(i):
1706+
bucket_name = base_bucket_name + str(i)
1707+
topic_name = bucket_name + TOPIC_SUFFIX
1708+
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
1709+
topic_arn = topic_conf.set_config()
1710+
bucket = conn.create_bucket(bucket_name)
1711+
notification_name = bucket_name + NOTIFICATION_SUFFIX
1712+
topic_conf_list = [{'Id': notification_name,
1713+
'TopicArn': topic_arn,
1714+
'Events': []
1715+
}]
1716+
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1717+
response, status = s3_notification_conf.set_config()
1718+
return topic_conf, bucket, s3_notification_conf
1719+
1720+
with ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
1721+
futures = [executor.submit(create_topic_bucket_notification, i) for i in range(topic_count)]
1722+
for i, future in enumerate(futures):
1723+
try:
1724+
topic_conf, bucket, s3_notification_conf = future.result()
1725+
topics.append(topic_conf)
1726+
buckets.append(bucket)
1727+
notifications.append(s3_notification_conf)
1728+
except Exception as e:
1729+
log.error(f"error creating topic/bucket/notification {i}: {e}")
1730+
1731+
log.info("creating objects in buckets")
1732+
print("creating objects in buckets")
1733+
# create objects in the buckets
1734+
def create_object_in_bucket(bucket_idx):
1735+
bucket = buckets[bucket_idx]
1736+
content = str(os.urandom(32))
1737+
key = bucket.new_key(str(bucket_idx))
1738+
set_contents_from_string(key, content)
1739+
1740+
with ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
1741+
futures = [executor.submit(create_object_in_bucket, i) for i in range(len(buckets))]
1742+
for future in futures:
1743+
try:
1744+
future.result()
1745+
except Exception as e:
1746+
log.error(f"error creating object in bucket: {e}")
1747+
1748+
log.info("deleting objects from buckets")
1749+
print("deleting objects from buckets")
1750+
# delete objects in the buckets
1751+
def delete_object_in_bucket(bucket_idx):
1752+
bucket = buckets[bucket_idx]
1753+
key = bucket.new_key(str(bucket_idx))
1754+
key.delete()
1755+
1756+
with ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
1757+
futures = [executor.submit(delete_object_in_bucket, i) for i in range(len(buckets))]
1758+
for future in futures:
1759+
try:
1760+
future.result()
1761+
except Exception as e:
1762+
log.error(f"error deleting object in bucket: {e}")
1763+
1764+
# cleanup
1765+
def cleanup_notification(notification):
1766+
notification.del_config()
1767+
1768+
def cleanup_topic(topic):
1769+
topic.del_config()
1770+
1771+
def cleanup_bucket(i):
1772+
bucket_name = base_bucket_name + str(i)
1773+
conn.delete_bucket(bucket_name)
1774+
1775+
with ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
1776+
# cleanup notifications
1777+
notification_futures = [executor.submit(cleanup_notification, notification) for notification in notifications]
1778+
# cleanup topics
1779+
topic_futures = [executor.submit(cleanup_topic, topic) for topic in topics]
1780+
# cleanup buckets
1781+
bucket_futures = [executor.submit(cleanup_bucket, i) for i in range(topic_count)]
1782+
# wait for all cleanup operations to complete
1783+
all_futures = notification_futures + topic_futures + bucket_futures
1784+
for future in all_futures:
1785+
try:
1786+
future.result()
1787+
except Exception as e:
1788+
log.error(f"error during cleanup: {e}")
1789+
1790+
16871791
@attr('http_test')
16881792
def test_ps_s3_notification_multi_delete_on_master():
16891793
""" test deletion of multiple keys on master """
@@ -6134,4 +6238,4 @@ def test_persistent_sharded_topic_config_change_kafka():
61346238
conn = connection()
61356239
new_num_shards = random.randint(2, 10)
61366240
default_num_shards = 11
6137-
persistent_notification_shard_config_change('kafka', conn, new_num_shards, default_num_shards)
6241+
persistent_notification_shard_config_change('kafka', conn, new_num_shards, default_num_shards)

0 commit comments

Comments
 (0)