@@ -220,7 +220,6 @@ def __init__(self, addr, delay=0, cloudevents=False):
220220 # make sure that http handler is able to consume requests
221221 url = 'http://{}:{}' .format (self .addr [0 ], self .addr [1 ])
222222 response = requests .post (url , {})
223- print (response )
224223 assert response .status_code == 200
225224
226225
@@ -1795,7 +1794,7 @@ def test_ps_s3_lifecycle_on_master():
17951794 time .sleep (20 )
17961795
17971796 no_keys = list (bucket .list ())
1798- wait_for_queue_to_drain (topic_name )
1797+ wait_for_queue_to_drain (topic_name , http_port = port )
17991798 assert_equal (len (no_keys ), 0 )
18001799 event_keys = []
18011800 events = http_server .get_and_reset_events ()
@@ -1900,7 +1899,7 @@ def test_ps_s3_lifecycle_abort_mpu_on_master():
19001899 print ('wait for 20s for the lifecycle...' )
19011900 time .sleep (20 )
19021901
1903- wait_for_queue_to_drain (topic_name )
1902+ wait_for_queue_to_drain (topic_name , http_port = port )
19041903 events = http_server .get_and_reset_events ()
19051904 for event in events :
19061905 assert_equal (event ['Records' ][0 ]['eventName' ], 'ObjectLifecycle:Expiration:AbortMPU' )
@@ -2961,7 +2960,17 @@ def test_ps_s3_persistent_cleanup():
29612960 http_server .close ()
29622961
29632962
2964- def wait_for_queue_to_drain (topic_name , tenant = None , account = None ):
2963+ def check_http_server (http_port ):
2964+ str_port = str (http_port )
2965+ cmd = 'netstat -tlnnp | grep python | grep ' + str_port
2966+ proc = subprocess .Popen (cmd , stdout = subprocess .PIPE , shell = True )
2967+ out = proc .communicate ()[0 ]
2968+ assert len (out ) > 0 , 'http python server NOT listening on port ' + str_port
2969+ log .info ("http python server listening on port " + str_port )
2970+ log .info (out .decode ('utf-8' ))
2971+
2972+
2973+ def wait_for_queue_to_drain (topic_name , tenant = None , account = None , http_port = None ):
29652974 retries = 0
29662975 entries = 1
29672976 start_time = time .time ()
@@ -2972,6 +2981,8 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None):
29722981 if account :
29732982 cmd += ['--account-id' , account ]
29742983 while entries > 0 :
2984+ if http_port :
2985+ check_http_server (http_port )
29752986 result = admin (cmd , get_config_cluster ())
29762987 assert_equal (result [1 ], 0 )
29772988 parsed_result = json .loads (result [0 ])
@@ -3064,7 +3075,7 @@ def test_ps_s3_persistent_topic_stats():
30643075 # start an http server in a separate thread
30653076 http_server = HTTPServerWithEvents ((host , port ))
30663077
3067- wait_for_queue_to_drain (topic_name )
3078+ wait_for_queue_to_drain (topic_name , http_port = port )
30683079
30693080 # cleanup
30703081 s3_notification_conf .del_config ()
@@ -4607,7 +4618,7 @@ def test_persistent_ps_s3_reload():
46074618 time_diff = time .time () - start_time
46084619 print ('average time for creation + async http notification is: ' + str (time_diff * 1000 / number_of_objects ) + ' milliseconds' )
46094620
4610- wait_for_queue_to_drain (topic_name1 )
4621+ wait_for_queue_to_drain (topic_name1 , http_port = http_port )
46114622
46124623 client_threads = []
46134624 start_time = time .time ()
@@ -4630,7 +4641,7 @@ def test_persistent_ps_s3_reload():
46304641 result = admin (['period' , 'commit' ], get_config_cluster ())
46314642 assert_equal (result [1 ], 0 )
46324643
4633- wait_for_queue_to_drain (topic_name1 )
4644+ wait_for_queue_to_drain (topic_name1 , http_port = http_port )
46344645 # verify events
46354646 keys = list (bucket .list ())
46364647 http_server .verify_s3_events (keys , exact_match = False )
@@ -4766,7 +4777,7 @@ def test_persistent_ps_s3_data_path_v2_migration():
47664777 # start an http server in a separate thread
47674778 http_server = HTTPServerWithEvents ((host , http_port ))
47684779
4769- wait_for_queue_to_drain (topic_name )
4780+ wait_for_queue_to_drain (topic_name , http_port = http_port )
47704781 # verify events
47714782 keys = list (bucket .list ())
47724783 # exact match is false because the notifications are persistent.
0 commit comments