@@ -36,6 +36,7 @@ def get_group_protocol_type(a, group_id):
3636
3737
3838def check_consumer (kafka_cluster , consumers , admin_client , topic , expected_protocol ):
39+ no_of_messages = 100
3940 total_msg_read = 0
4041 expected_partitions_per_consumer = number_of_partitions // len (consumers )
4142 while len (consumers [- 1 ].assignment ()) != expected_partitions_per_consumer :
@@ -52,16 +53,18 @@ def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_proto
5253 assert get_group_protocol_type (admin_client , topic ) == expected_protocol
5354
5455 # Produce some messages to the topic
55- kafka_cluster .seed_topic (topic )
56+ test_data = ['test-data{}' .format (i ) for i in range (0 , no_of_messages )]
57+ test_keys = ['test-key{}' .format (i ) for i in range (0 , no_of_messages )] # we want each partition to have data
58+ kafka_cluster .seed_topic (topic , test_data , test_keys )
5659
57- while total_msg_read < 100 :
60+ while total_msg_read < no_of_messages :
5861 for consumer in consumers :
5962 # Poll for messages
6063 msg = consumer .poll (0.1 )
6164 if msg is not None :
6265 total_msg_read += 1
63-
64- assert total_msg_read == 100 , "Expected to read 100 messages, but read {}" . format ( total_msg_read )
66+
67+ assert total_msg_read == no_of_messages , f "Expected to read { no_of_messages } messages, but read { total_msg_read } "
6568
6669
6770def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy (kafka_cluster , partition_assignment_strategy ):
0 commit comments