@@ -434,7 +434,8 @@ def __init__(self, topic, security_type):
434434 self .consumer = KafkaConsumer (topic ,
435435 bootstrap_servers = kafka_server + ':' + str (port ),
436436 security_protocol = security_type ,
437- consumer_timeout_ms = 16000 )
437+ consumer_timeout_ms = 16000 ,
438+ auto_offset_reset = 'earliest' )
438439 print ('Kafka consumer created on topic: ' + topic )
439440 break
440441 except Exception as error :
@@ -1534,8 +1535,8 @@ def test_ps_s3_notification_push_kafka_on_master():
15341535 time_diff = time .time () - start_time
15351536 print ('average time for creation + kafka notification is: ' + str (time_diff * 1000 / number_of_objects ) + ' milliseconds' )
15361537
1537- print ('wait for 5sec for the messages...' )
1538- time .sleep (5 )
1538+ print ('wait for 10sec for the messages...' )
1539+ time .sleep (10 )
15391540 keys = list (bucket .list ())
15401541 receiver .verify_s3_events (keys , exact_match = True , etags = etags )
15411542
@@ -1551,8 +1552,8 @@ def test_ps_s3_notification_push_kafka_on_master():
15511552 time_diff = time .time () - start_time
15521553 print ('average time for deletion + kafka notification is: ' + str (time_diff * 1000 / number_of_objects ) + ' milliseconds' )
15531554
1554- print ('wait for 5sec for the messages...' )
1555- time .sleep (5 )
1555+ print ('wait for 10sec for the messages...' )
1556+ time .sleep (10 )
15561557 receiver .verify_s3_events (keys , exact_match = True , deletions = True , etags = etags )
15571558 except Exception as e :
15581559 assert False , str (e )
0 commit comments