@@ -3005,7 +3005,6 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None, http_port=Non
30053005 log .info ('waited for %ds for queue %s to drain' , time_diff , topic_name )
30063006
30073007
3008- @attr ('kafka_test' )
30093008def persistent_topic_stats (conn , endpoint_type ):
30103009 zonegroup = get_config_zonegroup ()
30113010
@@ -3017,36 +3016,32 @@ def persistent_topic_stats(conn, endpoint_type):
30173016 host = get_ip ()
30183017 task = None
30193018 port = None
3019+ wrong_port = 1234
3020+ endpoint_address = endpoint_type + '://' + host + ':' + str (wrong_port )
30203021 if endpoint_type == 'http' :
30213022 # create random port for the http server
30223023 port = random .randint (10000 , 20000 )
30233024 # start an http server in a separate thread
30243025 receiver = HTTPServerWithEvents ((host , port ))
3025- endpoint_address = 'http://' + host + ':' + str (port )
30263026 endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + \
30273027 '&retry_sleep_duration=1'
30283028 elif endpoint_type == 'amqp' :
30293029 # start amqp receiver
30303030 exchange = 'ex1'
30313031 task , receiver = create_amqp_receiver_thread (exchange , topic_name )
30323032 task .start ()
3033- endpoint_address = 'amqp://' + host
30343033 endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker&persistent=true' + \
30353034 '&retry_sleep_duration=1'
30363035 elif endpoint_type == 'kafka' :
30373036 # start kafka receiver
30383037 task , receiver = create_kafka_receiver_thread (topic_name )
30393038 task .start ()
3040- endpoint_address = 'kafka://' + host
30413039 endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&persistent=true' + \
30423040 '&retry_sleep_duration=1'
30433041 else :
30443042 return SkipTest ('Unknown endpoint type: ' + endpoint_type )
30453043
30463044 # create s3 topic
3047- endpoint_address = 'kafka://' + host + ':1234' # wrong port
3048- endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&persistent=true' + \
3049- '&retry_sleep_duration=1'
30503045 topic_conf = PSTopicS3 (conn , topic_name , zonegroup , endpoint_args = endpoint_args )
30513046 topic_arn = topic_conf .set_config ()
30523047 # create s3 notification
@@ -3094,9 +3089,19 @@ def persistent_topic_stats(conn, endpoint_type):
30943089 get_stats_persistent_topic (topic_name , 2 * number_of_objects )
30953090
30963091 # change the endpoint port
3097- endpoint_address = 'kafka://' + host
3098- endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&persistent=true' + \
3099- '&retry_sleep_duration=1'
3092+ if endpoint_type == 'http' :
3093+ endpoint_address = endpoint_type + '://' + host + ':' + str (port )
3094+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + \
3095+ '&retry_sleep_duration=1'
3096+ elif endpoint_type == 'amqp' :
3097+ endpoint_address = endpoint_type + '://' + host
3098+ endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker&persistent=true' + \
3099+ '&retry_sleep_duration=1'
3100+ elif endpoint_type == 'kafka' :
3101+ endpoint_address = endpoint_type + '://' + host
3102+ endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&persistent=true' + \
3103+ '&retry_sleep_duration=1'
3104+
31003105 topic_conf = PSTopicS3 (conn , topic_name , zonegroup , endpoint_args = endpoint_args )
31013106 topic_arn = topic_conf .set_config ()
31023107
@@ -3111,19 +3116,26 @@ def persistent_topic_stats(conn, endpoint_type):
31113116
31123117
31133118@attr ('http_test' )
3114- def persistent_topic_stats_http ():
3119+ def test_persistent_topic_stats_http ():
31153120 """ test persistent topic stats, http endpoint """
31163121 conn = connection ()
31173122 persistent_topic_stats (conn , 'http' )
31183123
31193124
31203125@attr ('kafka_test' )
3121- def persistent_topic_stats_kafka ():
3126+ def test_persistent_topic_stats_kafka ():
31223127 """ test persistent topic stats, kafka endpoint """
31233128 conn = connection ()
31243129 persistent_topic_stats (conn , 'kafka' )
31253130
31263131
3132+ @attr ('amqp_test' )
3133+ def test_persistent_topic_stats_amqp ():
3134+ """ test persistent topic stats, amqp endpoint """
3135+ conn = connection ()
3136+ persistent_topic_stats (conn , 'amqp' )
3137+
3138+
31273139@attr ('kafka_test' )
31283140def test_persistent_topic_dump ():
31293141 """ test persistent topic dump """
0 commit comments