33import tempfile
44import random
55import threading
6+ from concurrent .futures import ThreadPoolExecutor
67import subprocess
78import socket
89import time
@@ -1684,6 +1685,109 @@ def test_notification_push_kafka_multiple_brokers_append():
16841685 notification_push ('kafka' , conn , kafka_brokers = '{host}:9091' .format (host = default_kafka_server ))
16851686
16861687
1688+ @attr ('manual_test' )
1689+ def test_1K_topics ():
1690+ """ test creation of moe than 1K topics """
1691+ conn = connection ()
1692+ zonegroup = get_config_zonegroup ()
1693+ base_bucket_name = gen_bucket_name ()
1694+ host = get_ip ()
1695+ endpoint_address = 'kafka://' + host
1696+ endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&persistent=true'
1697+ topic_count = 1200
1698+ thread_pool_size = 30
1699+ topics = []
1700+ notifications = []
1701+ buckets = []
1702+ log .info (f"creating { topic_count } topics, buckets and notifications" )
1703+ print (f"creating { topic_count } topics, buckets and notifications" )
1704+ # create topics buckets and notifications
1705+ def create_topic_bucket_notification (i ):
1706+ bucket_name = base_bucket_name + str (i )
1707+ topic_name = bucket_name + TOPIC_SUFFIX
1708+ topic_conf = PSTopicS3 (conn , topic_name , zonegroup , endpoint_args = endpoint_args )
1709+ topic_arn = topic_conf .set_config ()
1710+ bucket = conn .create_bucket (bucket_name )
1711+ notification_name = bucket_name + NOTIFICATION_SUFFIX
1712+ topic_conf_list = [{'Id' : notification_name ,
1713+ 'TopicArn' : topic_arn ,
1714+ 'Events' : []
1715+ }]
1716+ s3_notification_conf = PSNotificationS3 (conn , bucket_name , topic_conf_list )
1717+ response , status = s3_notification_conf .set_config ()
1718+ return topic_conf , bucket , s3_notification_conf
1719+
1720+ with ThreadPoolExecutor (max_workers = thread_pool_size ) as executor :
1721+ futures = [executor .submit (create_topic_bucket_notification , i ) for i in range (topic_count )]
1722+ for i , future in enumerate (futures ):
1723+ try :
1724+ topic_conf , bucket , s3_notification_conf = future .result ()
1725+ topics .append (topic_conf )
1726+ buckets .append (bucket )
1727+ notifications .append (s3_notification_conf )
1728+ except Exception as e :
1729+ log .error (f"error creating topic/bucket/notification { i } : { e } " )
1730+
1731+ log .info ("creating objects in buckets" )
1732+ print ("creating objects in buckets" )
1733+ # create objects in the buckets
1734+ def create_object_in_bucket (bucket_idx ):
1735+ bucket = buckets [bucket_idx ]
1736+ content = str (os .urandom (32 ))
1737+ key = bucket .new_key (str (bucket_idx ))
1738+ set_contents_from_string (key , content )
1739+
1740+ with ThreadPoolExecutor (max_workers = thread_pool_size ) as executor :
1741+ futures = [executor .submit (create_object_in_bucket , i ) for i in range (len (buckets ))]
1742+ for future in futures :
1743+ try :
1744+ future .result ()
1745+ except Exception as e :
1746+ log .error (f"error creating object in bucket: { e } " )
1747+
1748+ log .info ("deleting objects from buckets" )
1749+ print ("deleting objects from buckets" )
1750+ # delete objects in the buckets
1751+ def delete_object_in_bucket (bucket_idx ):
1752+ bucket = buckets [bucket_idx ]
1753+ key = bucket .new_key (str (bucket_idx ))
1754+ key .delete ()
1755+
1756+ with ThreadPoolExecutor (max_workers = thread_pool_size ) as executor :
1757+ futures = [executor .submit (delete_object_in_bucket , i ) for i in range (len (buckets ))]
1758+ for future in futures :
1759+ try :
1760+ future .result ()
1761+ except Exception as e :
1762+ log .error (f"error deleting object in bucket: { e } " )
1763+
1764+ # cleanup
1765+ def cleanup_notification (notification ):
1766+ notification .del_config ()
1767+
1768+ def cleanup_topic (topic ):
1769+ topic .del_config ()
1770+
1771+ def cleanup_bucket (i ):
1772+ bucket_name = base_bucket_name + str (i )
1773+ conn .delete_bucket (bucket_name )
1774+
1775+ with ThreadPoolExecutor (max_workers = thread_pool_size ) as executor :
1776+ # cleanup notifications
1777+ notification_futures = [executor .submit (cleanup_notification , notification ) for notification in notifications ]
1778+ # cleanup topics
1779+ topic_futures = [executor .submit (cleanup_topic , topic ) for topic in topics ]
1780+ # cleanup buckets
1781+ bucket_futures = [executor .submit (cleanup_bucket , i ) for i in range (topic_count )]
1782+ # wait for all cleanup operations to complete
1783+ all_futures = notification_futures + topic_futures + bucket_futures
1784+ for future in all_futures :
1785+ try :
1786+ future .result ()
1787+ except Exception as e :
1788+ log .error (f"error during cleanup: { e } " )
1789+
1790+
16871791@attr ('http_test' )
16881792def test_ps_s3_notification_multi_delete_on_master ():
16891793 """ test deletion of multiple keys on master """
@@ -6134,4 +6238,4 @@ def test_persistent_sharded_topic_config_change_kafka():
61346238 conn = connection ()
61356239 new_num_shards = random .randint (2 , 10 )
61366240 default_num_shards = 11
6137- persistent_notification_shard_config_change ('kafka' , conn , new_num_shards , default_num_shards )
6241+ persistent_notification_shard_config_change ('kafka' , conn , new_num_shards , default_num_shards )
0 commit comments