@@ -220,7 +220,6 @@ def __init__(self, addr, delay=0, cloudevents=False):
220220 # make sure that http handler is able to consume requests
221221 url = 'http://{}:{}' .format (self .addr [0 ], self .addr [1 ])
222222 response = requests .post (url , {})
223- print (response )
224223 assert response .status_code == 200
225224
226225
@@ -1795,7 +1794,7 @@ def test_ps_s3_lifecycle_on_master():
17951794 time .sleep (20 )
17961795
17971796 no_keys = list (bucket .list ())
1798- wait_for_queue_to_drain (topic_name )
1797+ wait_for_queue_to_drain (topic_name , http_port = port )
17991798 assert_equal (len (no_keys ), 0 )
18001799 event_keys = []
18011800 events = http_server .get_and_reset_events ()
@@ -1900,7 +1899,7 @@ def test_ps_s3_lifecycle_abort_mpu_on_master():
19001899 print ('wait for 20s for the lifecycle...' )
19011900 time .sleep (20 )
19021901
1903- wait_for_queue_to_drain (topic_name )
1902+ wait_for_queue_to_drain (topic_name , http_port = port )
19041903 events = http_server .get_and_reset_events ()
19051904 for event in events :
19061905 assert_equal (event ['Records' ][0 ]['eventName' ], 'ObjectLifecycle:Expiration:AbortMPU' )
@@ -2961,7 +2960,17 @@ def test_ps_s3_persistent_cleanup():
29612960 http_server .close ()
29622961
29632962
2964- def wait_for_queue_to_drain (topic_name , tenant = None , account = None ):
2963+ def check_http_server (http_port ):
2964+ str_port = str (http_port )
2965+ cmd = 'netstat -tlnnp | grep python | grep ' + str_port
2966+ proc = subprocess .Popen (cmd , stdout = subprocess .PIPE , shell = True )
2967+ out = proc .communicate ()[0 ]
2968+ assert len (out ) > 0 , 'http python server NOT listening on port ' + str_port
2969+ log .info ("http python server listening on port " + str_port )
2970+ log .info (out .decode ('utf-8' ))
2971+
2972+
2973+ def wait_for_queue_to_drain (topic_name , tenant = None , account = None , http_port = None ):
29652974 retries = 0
29662975 entries = 1
29672976 start_time = time .time ()
@@ -2972,6 +2981,8 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None):
29722981 if account :
29732982 cmd += ['--account-id' , account ]
29742983 while entries > 0 :
2984+ if http_port :
2985+ check_http_server (http_port )
29752986 result = admin (cmd , get_config_cluster ())
29762987 assert_equal (result [1 ], 0 )
29772988 parsed_result = json .loads (result [0 ])
@@ -3064,7 +3075,7 @@ def test_ps_s3_persistent_topic_stats():
30643075 # start an http server in a separate thread
30653076 http_server = HTTPServerWithEvents ((host , port ))
30663077
3067- wait_for_queue_to_drain (topic_name )
3078+ wait_for_queue_to_drain (topic_name , http_port = port )
30683079
30693080 # cleanup
30703081 s3_notification_conf .del_config ()
@@ -4599,7 +4610,7 @@ def test_persistent_ps_s3_reload():
45994610 time_diff = time .time () - start_time
46004611 print ('average time for creation + async http notification is: ' + str (time_diff * 1000 / number_of_objects ) + ' milliseconds' )
46014612
4602- wait_for_queue_to_drain (topic_name1 )
4613+ wait_for_queue_to_drain (topic_name1 , http_port = http_port )
46034614
46044615 client_threads = []
46054616 start_time = time .time ()
@@ -4622,7 +4633,7 @@ def test_persistent_ps_s3_reload():
46224633 result = admin (['period' , 'commit' ], get_config_cluster ())
46234634 assert_equal (result [1 ], 0 )
46244635
4625- wait_for_queue_to_drain (topic_name1 )
4636+ wait_for_queue_to_drain (topic_name1 , http_port = http_port )
46264637 # verify events
46274638 keys = list (bucket .list ())
46284639 http_server .verify_s3_events (keys , exact_match = False )
@@ -4758,7 +4769,7 @@ def test_persistent_ps_s3_data_path_v2_migration():
47584769 # start an http server in a separate thread
47594770 http_server = HTTPServerWithEvents ((host , http_port ))
47604771
4761- wait_for_queue_to_drain (topic_name )
4772+ wait_for_queue_to_drain (topic_name , http_port = http_port )
47624773 # verify events
47634774 keys = list (bucket .list ())
47644775 # exact match is false because the notifications are persistent.
0 commit comments