@@ -2957,25 +2957,48 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None, http_port=Non
29572957 log .info ('waited for %ds for queue %s to drain' , time_diff , topic_name )
29582958
29592959
2960- @attr ('basic_test' )
2961- def test_ps_s3_persistent_topic_stats ():
2962- """ test persistent topic stats """
2963- conn = connection ()
2960+ @attr ('kafka_test' )
2961+ def persistent_topic_stats (conn , endpoint_type ):
29642962 zonegroup = get_config_zonegroup ()
29652963
2966- # create random port for the http server
2967- host = get_ip ()
2968- port = random .randint (10000 , 20000 )
2969-
29702964 # create bucket
29712965 bucket_name = gen_bucket_name ()
29722966 bucket = conn .create_bucket (bucket_name )
29732967 topic_name = bucket_name + TOPIC_SUFFIX
29742968
2969+ host = get_ip ()
2970+ task = None
2971+ port = None
2972+ if endpoint_type == 'http' :
2973+ # create random port for the http server
2974+ port = random .randint (10000 , 20000 )
2975+ # start an http server in a separate thread
2976+ receiver = HTTPServerWithEvents ((host , port ))
2977+ endpoint_address = 'http://' + host + ':' + str (port )
2978+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + \
2979+ '&retry_sleep_duration=1'
2980+ elif endpoint_type == 'amqp' :
2981+ # start amqp receiver
2982+ exchange = 'ex1'
2983+ task , receiver = create_amqp_receiver_thread (exchange , topic_name )
2984+ task .start ()
2985+ endpoint_address = 'amqp://' + host
2986+ endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker&persistent=true' + \
2987+ '&retry_sleep_duration=1'
2988+ elif endpoint_type == 'kafka' :
2989+ # start kafka receiver
2990+ task , receiver = create_kafka_receiver_thread (topic_name )
2991+ task .start ()
2992+ endpoint_address = 'kafka://' + host
2993+ endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&persistent=true' + \
2994+ '&retry_sleep_duration=1'
2995+ else :
2996+ return SkipTest ('Unknown endpoint type: ' + endpoint_type )
2997+
29752998 # create s3 topic
2976- endpoint_address = 'http ://' + host + ':' + str ( port )
2977- endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + \
2978- '&retry_sleep_duration=1'
2999+ endpoint_address = 'kafka ://' + host + ':1234' # wrong port
3000+ endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker& persistent=true' + \
3001+ '&retry_sleep_duration=1'
29793002 topic_conf = PSTopicS3 (conn , topic_name , zonegroup , endpoint_args = endpoint_args )
29803003 topic_arn = topic_conf .set_config ()
29813004 # create s3 notification
@@ -3022,8 +3045,12 @@ def test_ps_s3_persistent_topic_stats():
30223045 # topic stats
30233046 get_stats_persistent_topic (topic_name , 2 * number_of_objects )
30243047
3025- # start an http server in a separate thread
3026- http_server = HTTPServerWithEvents ((host , port ))
3048+ # change the endpoint port
3049+ endpoint_address = 'kafka://' + host
3050+ endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&persistent=true' + \
3051+ '&retry_sleep_duration=1'
3052+ topic_conf = PSTopicS3 (conn , topic_name , zonegroup , endpoint_args = endpoint_args )
3053+ topic_arn = topic_conf .set_config ()
30273054
30283055 wait_for_queue_to_drain (topic_name , http_port = port )
30293056
@@ -3032,27 +3059,44 @@ def test_ps_s3_persistent_topic_stats():
30323059 topic_conf .del_config ()
30333060 # delete the bucket
30343061 conn .delete_bucket (bucket_name )
3035- http_server .close ()
3062+ receiver .close (task )
30363063
3037- @attr ('basic_test' )
3064+
3065+ @attr ('http_test' )
3066+ def persistent_topic_stats_http ():
3067+ """ test persistent topic stats, http endpoint """
3068+ conn = connection ()
3069+ persistent_topic_stats (conn , 'http' )
3070+
3071+
3072+ @attr ('kafka_test' )
3073+ def persistent_topic_stats_kafka ():
3074+ """ test persistent topic stats, kafka endpoint """
3075+ conn = connection ()
3076+ persistent_topic_stats (conn , 'kafka' )
3077+
3078+
3079+ @attr ('kafka_test' )
30383080def test_persistent_topic_dump ():
30393081 """ test persistent topic dump """
30403082 conn = connection ()
30413083 zonegroup = get_config_zonegroup ()
30423084
3043- # create random port for the http server
3044- host = get_ip ()
3045- port = random .randint (10000 , 20000 )
3046-
30473085 # create bucket
30483086 bucket_name = gen_bucket_name ()
30493087 bucket = conn .create_bucket (bucket_name )
30503088 topic_name = bucket_name + TOPIC_SUFFIX
30513089
3090+ # start kafka receiver
3091+ host = get_ip ()
3092+ task , receiver = create_kafka_receiver_thread (topic_name )
3093+ task .start ()
3094+
3095+
30523096 # create s3 topic
3053- endpoint_address = 'http ://' + host + ':' + str ( port )
3054- endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + \
3055- '&retry_sleep_duration=1'
3097+ endpoint_address = 'kafka ://WrongHost' # wrong port
3098+ endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker& persistent=true' + \
3099+ '&retry_sleep_duration=1'
30563100 topic_conf = PSTopicS3 (conn , topic_name , zonegroup , endpoint_args = endpoint_args )
30573101 topic_arn = topic_conf .set_config ()
30583102 # create s3 notification
@@ -3103,10 +3147,14 @@ def test_persistent_topic_dump():
31033147 parsed_result = json .loads (result [0 ])
31043148 assert_equal (len (parsed_result ), 2 * number_of_objects )
31053149
3106- # start an http server in a separate thread
3107- http_server = HTTPServerWithEvents ((host , port ))
3150+ # change the endpoint port
3151+ endpoint_address = 'kafka://' + host
3152+ endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&persistent=true' + \
3153+ '&retry_sleep_duration=1'
3154+ topic_conf = PSTopicS3 (conn , topic_name , zonegroup , endpoint_args = endpoint_args )
3155+ topic_arn = topic_conf .set_config ()
31083156
3109- wait_for_queue_to_drain (topic_name , http_port = port )
3157+ wait_for_queue_to_drain (topic_name ,)
31103158
31113159 result = admin (['topic' , 'dump' , '--topic' , topic_name ], get_config_cluster ())
31123160 assert_equal (result [1 ], 0 )
@@ -3118,7 +3166,7 @@ def test_persistent_topic_dump():
31183166 topic_conf .del_config ()
31193167 # delete the bucket
31203168 conn .delete_bucket (bucket_name )
3121- http_server .close ()
3169+ receiver .close (task )
31223170
31233171
31243172def ps_s3_persistent_topic_configs (persistency_time , config_dict ):
@@ -3653,33 +3701,50 @@ def test_ps_s3_persistent_multiple_gateways():
36533701 http_server .close ()
36543702
36553703
3656- @attr ('http_test' )
3657- def test_ps_s3_persistent_multiple_endpoints ():
3658- """ test pushing persistent notification when one of the endpoints has error """
3659- conn = connection ()
3704+ def persistent_topic_multiple_endpoints (conn , endpoint_type ):
36603705 zonegroup = get_config_zonegroup ()
36613706
3662- # create random port for the http server
3663- host = get_ip ()
3664- port = random .randint (10000 , 20000 )
3665- # start an http server in a separate thread
3666- number_of_objects = 10
3667- http_server = HTTPServerWithEvents ((host , port ))
3668-
36693707 # create bucket
36703708 bucket_name = gen_bucket_name ()
36713709 bucket = conn .create_bucket (bucket_name )
36723710 topic_name = bucket_name + TOPIC_SUFFIX
3711+ topic_name_1 = topic_name + '_1'
3712+
3713+ host = get_ip ()
3714+ task = None
3715+ port = None
3716+ if endpoint_type == 'http' :
3717+ # create random port for the http server
3718+ port = random .randint (10000 , 20000 )
3719+ # start an http server in a separate thread
3720+ receiver = HTTPServerWithEvents ((host , port ))
3721+ endpoint_address = 'http://' + host + ':' + str (port )
3722+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + \
3723+ '&retry_sleep_duration=1'
3724+ elif endpoint_type == 'amqp' :
3725+ # start amqp receiver
3726+ exchange = 'ex1'
3727+ task , receiver = create_amqp_receiver_thread (exchange , topic_name_1 )
3728+ task .start ()
3729+ endpoint_address = 'amqp://' + host
3730+ endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker&persistent=true' + \
3731+ '&retry_sleep_duration=1'
3732+ elif endpoint_type == 'kafka' :
3733+ # start kafka receiver
3734+ task , receiver = create_kafka_receiver_thread (topic_name_1 )
3735+ task .start ()
3736+ endpoint_address = 'kafka://' + host
3737+ endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&persistent=true' + \
3738+ '&retry_sleep_duration=1'
3739+ else :
3740+ return SkipTest ('Unknown endpoint type: ' + endpoint_type )
36733741
36743742 # create two s3 topics
3675- endpoint_address = 'http://' + host + ':' + str (port )
3676- endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + \
3677- '&retry_sleep_duration=1'
3678- topic_conf1 = PSTopicS3 (conn , topic_name + '_1' , zonegroup , endpoint_args = endpoint_args )
3743+ topic_conf1 = PSTopicS3 (conn , topic_name_1 , zonegroup , endpoint_args = endpoint_args )
36793744 topic_arn1 = topic_conf1 .set_config ()
36803745 endpoint_address = 'http://kaboom:9999'
36813746 endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + \
3682- '&retry_sleep_duration=1'
3747+ '&retry_sleep_duration=1'
36833748 topic_conf2 = PSTopicS3 (conn , topic_name + '_2' , zonegroup , endpoint_args = endpoint_args )
36843749 topic_arn2 = topic_conf2 .set_config ()
36853750
@@ -3701,6 +3766,7 @@ def test_ps_s3_persistent_multiple_endpoints():
37013766
37023767 client_threads = []
37033768 start_time = time .time ()
3769+ number_of_objects = 10
37043770 for i in range (number_of_objects ):
37053771 key = bucket .new_key (str (i ))
37063772 content = str (os .urandom (1024 * 1024 ))
@@ -3711,9 +3777,8 @@ def test_ps_s3_persistent_multiple_endpoints():
37113777
37123778 keys = list (bucket .list ())
37133779
3714- wait_for_queue_to_drain (topic_name + '_1' )
3715-
3716- http_server .verify_s3_events (keys , exact_match = True , deletions = False )
3780+ wait_for_queue_to_drain (topic_name_1 , http_port = port )
3781+ receiver .verify_s3_events (keys , exact_match = True , deletions = False )
37173782
37183783 # delete objects from the bucket
37193784 client_threads = []
@@ -3724,17 +3789,31 @@ def test_ps_s3_persistent_multiple_endpoints():
37243789 client_threads .append (thr )
37253790 [thr .join () for thr in client_threads ]
37263791
3727- wait_for_queue_to_drain (topic_name + '_1' )
3728-
3729- http_server .verify_s3_events (keys , exact_match = True , deletions = True )
3792+ wait_for_queue_to_drain (topic_name_1 , http_port = port )
3793+ receiver .verify_s3_events (keys , exact_match = True , deletions = True )
37303794
37313795 # cleanup
37323796 s3_notification_conf1 .del_config ()
37333797 topic_conf1 .del_config ()
37343798 s3_notification_conf2 .del_config ()
37353799 topic_conf2 .del_config ()
37363800 conn .delete_bucket (bucket_name )
3737- http_server .close ()
3801+ receiver .close (task )
3802+
3803+
3804+ @attr ('http_test' )
3805+ def test_persistent_multiple_endpoints_http ():
3806+ """ test pushing persistent notification when one of the endpoints has error, http endpoint """
3807+ conn = connection ()
3808+ persistent_topic_multiple_endpoints (conn , 'http' )
3809+
3810+
3811+ @attr ('kafka_test' )
3812+ def test_persistent_multiple_endpoints_kafka ():
3813+ """ test pushing persistent notification when one of the endpoints has error, kafka endpoint """
3814+ conn = connection ()
3815+ persistent_topic_multiple_endpoints (conn , 'kafka' )
3816+
37383817
37393818def persistent_notification (endpoint_type , conn , account = None ):
37403819 """ test pushing persistent notification """
@@ -4668,18 +4747,12 @@ def test_persistent_ps_s3_reload():
46684747 http_server .close ()
46694748
46704749
4671- @attr ('data_path_v2_test' )
4672- def test_persistent_ps_s3_data_path_v2_migration ():
4750+ def persistent_data_path_v2_migration (conn , endpoint_type ):
46734751 """ test data path v2 persistent migration """
46744752 if get_config_cluster () == 'noname' :
46754753 return SkipTest ('realm is needed for migration test' )
4676- conn = connection ()
46774754 zonegroup = get_config_zonegroup ()
46784755
4679- # create random port for the http server
4680- host = get_ip ()
4681- http_port = random .randint (10000 , 20000 )
4682-
46834756 # disable v2 notification
46844757 zonegroup_modify_feature (enable = False , feature_name = zonegroup_feature_notification_v2 )
46854758
@@ -4688,10 +4761,35 @@ def test_persistent_ps_s3_data_path_v2_migration():
46884761 bucket = conn .create_bucket (bucket_name )
46894762 topic_name = bucket_name + TOPIC_SUFFIX
46904763
4691- # create s3 topic
4692- endpoint_address = 'http://' + host + ':' + str (http_port )
4693- endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + \
4694- '&retry_sleep_duration=1'
4764+ host = get_ip ()
4765+ task = None
4766+ port = None
4767+ if endpoint_type == 'http' :
4768+ # create random port for the http server
4769+ port = random .randint (10000 , 20000 )
4770+ # start an http server in a separate thread
4771+ receiver = HTTPServerWithEvents ((host , port ))
4772+ endpoint_address = 'http://' + host + ':' + str (port )
4773+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + \
4774+ '&retry_sleep_duration=1'
4775+ elif endpoint_type == 'amqp' :
4776+ # start amqp receiver
4777+ exchange = 'ex1'
4778+ task , receiver = create_amqp_receiver_thread (exchange , topic_name )
4779+ task .start ()
4780+ endpoint_address = 'amqp://' + host
4781+ endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker&persistent=true' + \
4782+ '&retry_sleep_duration=1'
4783+ elif endpoint_type == 'kafka' :
4784+ # start kafka receiver
4785+ task , receiver = create_kafka_receiver_thread (topic_name )
4786+ task .start ()
4787+ endpoint_address = 'kafka://' + host
4788+ endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&persistent=true' + \
4789+ '&retry_sleep_duration=1'
4790+ else :
4791+ return SkipTest ('Unknown endpoint type: ' + endpoint_type )
4792+
46954793 topic_conf = PSTopicS3 (conn , topic_name , zonegroup , endpoint_args = endpoint_args )
46964794 topic_arn = topic_conf .set_config ()
46974795 # create s3 notification
@@ -4758,14 +4856,11 @@ def test_persistent_ps_s3_data_path_v2_migration():
47584856 # topic stats
47594857 get_stats_persistent_topic (topic_name , 2 * number_of_objects )
47604858
4761- # start an http server in a separate thread
4762- http_server = HTTPServerWithEvents ((host , http_port ))
4763-
4764- wait_for_queue_to_drain (topic_name , http_port = http_port )
4859+ wait_for_queue_to_drain (topic_name )
47654860 # verify events
47664861 keys = list (bucket .list ())
47674862 # exact match is false because the notifications are persistent.
4768- http_server .verify_s3_events (keys , exact_match = False )
4863+ receiver .verify_s3_events (keys , exact_match = False )
47694864
47704865 except Exception as e :
47714866 assert False , str (e )
@@ -4782,8 +4877,21 @@ def test_persistent_ps_s3_data_path_v2_migration():
47824877 [thr .join () for thr in client_threads ]
47834878 # delete the bucket
47844879 conn .delete_bucket (bucket_name )
4785- if http_server :
4786- http_server .close ()
4880+ receiver .close (task )
4881+
4882+
4883+ @attr ('data_path_v2_test' )
4884+ def persistent_data_path_v2_migration_http ():
4885+ """ test data path v2 persistent migration, http endpoint """
4886+ conn = connection ()
4887+ persistent_data_path_v2_migration (conn , 'http' )
4888+
4889+
4890+ @attr ('data_path_v2_kafka_test' )
4891+ def persistent_data_path_v2_migration_kafka ():
4892+ """ test data path v2 persistent migration, kafka endpoint """
4893+ conn = connection ()
4894+ persistent_data_path_v2_migration (conn , 'kafka' )
47874895
47884896
47894897@attr ('data_path_v2_test' )
0 commit comments