Skip to content

Commit dedb02a

Browse files
committed
Added a test case to remove all the classic consumers
1 parent a1c04c9 commit dedb02a

File tree

1 file changed

+28
-18
lines changed

1 file changed

+28
-18
lines changed

tests/integration/consumer/test_consumer_upgrade_downgrade.py

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# limitations under the License.
1717

1818
import pytest
19+
from enum import Enum
1920
from confluent_kafka import ConsumerGroupType, KafkaException
2021
from tests.common import TestUtils
2122

@@ -35,7 +36,7 @@ def get_group_protocol_type(a, group_id):
3536
raise
3637

3738

38-
def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_protocol):
39+
def check_consumer(kafka_cluster, consumers, admin_client, group_id, topic, expected_protocol):
3940
no_of_messages = 100
4041
total_msg_read = 0
4142
expected_partitions_per_consumer = number_of_partitions // len(consumers)
@@ -50,7 +51,7 @@ def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_proto
5051
assert len(assignment) == expected_partitions_per_consumer
5152
assert len(all_assignments) == number_of_partitions
5253

53-
assert get_group_protocol_type(admin_client, topic) == expected_protocol
54+
assert get_group_protocol_type(admin_client, group_id) == expected_protocol
5455

5556
# Produce some messages to the topic
5657
test_data = ['test-data{}'.format(i) for i in range(0, no_of_messages)]
@@ -67,6 +68,11 @@ def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_proto
6768
assert total_msg_read == no_of_messages, f"Expected to read {no_of_messages} messages, but read {total_msg_read}"
6869

6970

71+
class Operation(Enum):
72+
ADD = 0
73+
REMOVE = 1
74+
75+
7076
def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, partition_assignment_strategy):
7177
"""
7278
Test consumer upgrade and downgrade.
@@ -90,22 +96,26 @@ def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(k
9096
**consumer_conf
9197
}
9298

93-
consumer = kafka_cluster.consumer(consumer_conf_classic)
94-
assert consumer is not None
95-
consumer.subscribe([topic])
96-
check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC)
97-
98-
# Now simulate an upgrade by creating a new consumer with 'consumer' protocol
99-
consumer2 = kafka_cluster.consumer(consumer_conf_consumer)
100-
assert consumer2 is not None
101-
consumer2.subscribe([topic])
102-
check_consumer(kafka_cluster, [consumer, consumer2], admin_client, topic, ConsumerGroupType.CONSUMER)
103-
104-
# Now simulate a downgrade by deleting the second consumer and keeping only 'classic' consumer
105-
consumer2.close()
106-
check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC)
107-
108-
consumer.close()
99+
test_scenarios = [(Operation.ADD, consumer_conf_classic, ConsumerGroupType.CLASSIC),
100+
(Operation.ADD, consumer_conf_consumer, ConsumerGroupType.CONSUMER),
101+
(Operation.REMOVE, None, ConsumerGroupType.CONSUMER),
102+
(Operation.ADD, consumer_conf_classic, ConsumerGroupType.CONSUMER),
103+
(Operation.REMOVE, None, ConsumerGroupType.CLASSIC)]
104+
consumers = []
105+
106+
for operation, conf, expected_protocol in test_scenarios:
107+
if operation == Operation.ADD:
108+
consumer = kafka_cluster.consumer(conf)
109+
assert consumer is not None
110+
consumer.subscribe([topic])
111+
consumers.append(consumer)
112+
elif operation == Operation.REMOVE:
113+
consumer_to_remove = consumers.pop(0)
114+
consumer_to_remove.close()
115+
check_consumer(kafka_cluster, consumers, admin_client, topic, topic, expected_protocol)
116+
117+
assert len(consumers) == 1
118+
consumers[0].close()
109119
kafka_cluster.delete_topic(topic)
110120

111121

0 commit comments

Comments
 (0)