From 14691103f180c044c5d418a50dfb6641afc744bc Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Thu, 17 Jul 2025 12:35:57 +0530 Subject: [PATCH 01/10] Added online upgrade and downgrade test --- tests/common/__init__.py | 25 ++-- tests/integration/cluster_fixture.py | 15 +- .../test_consumer_upgrade_downgrade.py | 131 ++++++++++++++++++ 3 files changed, 159 insertions(+), 12 deletions(-) create mode 100644 tests/integration/consumer/test_consumer_upgrade_downgrade.py diff --git a/tests/common/__init__.py b/tests/common/__init__.py index 3d9ec5c7a..5834604c1 100644 --- a/tests/common/__init__.py +++ b/tests/common/__init__.py @@ -55,22 +55,25 @@ def use_group_protocol_consumer(): @staticmethod def update_conf_group_protocol(conf=None): - if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer(): + if TestUtils.can_upgrade_group_protocol_to_consumer(conf): conf['group.protocol'] = 'consumer' + @staticmethod + def can_upgrade_group_protocol_to_consumer(conf): + return conf is not None and 'group.id' in conf and 'group.protocol' not in conf and TestUtils.use_group_protocol_consumer() + @staticmethod def remove_forbidden_conf_group_protocol_consumer(conf): - if conf is None: + if conf is None or not TestUtils.use_group_protocol_consumer() or conf.get('group.protocol', 'consumer') != 'consumer': return - if TestUtils.use_group_protocol_consumer(): - forbidden_conf_properties = ["session.timeout.ms", - "partition.assignment.strategy", - "heartbeat.interval.ms", - "group.protocol.type"] - for prop in forbidden_conf_properties: - if prop in conf: - print("Skipping setting forbidden configuration {prop} for `CONSUMER` protocol") - del conf[prop] + forbidden_conf_properties = ["session.timeout.ms", + "partition.assignment.strategy", + "heartbeat.interval.ms", + "group.protocol.type"] + for prop in forbidden_conf_properties: + if prop in conf: + print(f"Skipping setting forbidden configuration {prop} for `CONSUMER` protocol") + del conf[prop] class TestConsumer(Consumer): diff --git a/tests/integration/cluster_fixture.py b/tests/integration/cluster_fixture.py index 0d441ca1d..e6bf06a84 100644 --- a/tests/integration/cluster_fixture.py +++ b/tests/integration/cluster_fixture.py @@ -233,6 +233,19 @@ def create_topic(self, prefix, conf=None, **create_topic_kwargs): future_topic.get(name).result() return name + + def delete_topic(self, topic): + """ + Deletes a topic with this cluster. + + :param str topic: topic name + """ + future = self.admin().delete_topics([topic]) + try: + future.get(topic).result() + print("Topic {} deleted".format(topic)) + except Exception as e: + print("Failed to delete topic {}: {}".format(topic, e)) def create_topic_and_wait_propogation(self, prefix, conf=None, **create_topic_kwargs): """ @@ -273,7 +286,7 @@ def seed_topic(self, topic, value_source=None, key_source=None, header_source=No value_source = ['test-data{}'.format(i) for i in range(0, 100)] if key_source is None: - key_source = [None] + key_source = ['test-key{}'.format(i) for i in range(0, 100)] KafkaClusterFixture._produce(self._producer, topic, value_source, key_source, header_source) diff --git a/tests/integration/consumer/test_consumer_upgrade_downgrade.py b/tests/integration/consumer/test_consumer_upgrade_downgrade.py new file mode 100644 index 000000000..4397e3c4f --- /dev/null +++ b/tests/integration/consumer/test_consumer_upgrade_downgrade.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limit + +import pytest +from confluent_kafka import ConsumerGroupType, IsolationLevel, KafkaException, TopicPartition +from confluent_kafka.admin import OffsetSpec +from tests.common import TestUtils + +topic_prefix = "test_consumer_upgrade_downgrade_" +number_of_partitions = 10 + + +def get_group_protocol_type(a, group_id): + futureMap = a.describe_consumer_groups([group_id]) + try: + future = futureMap[group_id] + g = future.result() + return g.type + except KafkaException as e: + print("Error while describing group id '{}': {}".format(group_id, e)) + except Exception: + raise + + +def list_offsets(a, topic, no_of_partitions): + topic_partition_offsets = {} + for partition in range(no_of_partitions): + topic_partition = TopicPartition(topic, partition) + topic_partition_offsets[topic_partition] = OffsetSpec.latest() + futmap = a.list_offsets(topic_partition_offsets, isolation_level=IsolationLevel.READ_COMMITTED, request_timeout=30) + for partition, fut in futmap.items(): + try: + result = fut.result() + print("Topicname : {} Partition_Index : {} Offset : {} Timestamp : {}" + .format(partition.topic, partition.partition, result.offset, + result.timestamp)) + except KafkaException as e: + print("Topicname : {} Partition_Index : {} Error : {}" + .format(partition.topic, partition.partition, e)) + + +# def produce_messages(producer, topic, partitions, num_messages): +# for i in range(num_messages): +# key = "key-{}".format(i) +# value = "value-{}".format(i) +# partition = i % partitions +# producer.produce(topic, key=key, value=value, partition=partition) +# producer.flush() + + +def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_protocol): + total_msg_read = 0 + while len(consumers[-1].assignment()) != number_of_partitions // len(consumers): + for consumer in consumers: + consumer.poll(0.1) + + for consumer in consumers: + assert len(consumer.assignment()) == number_of_partitions // len(consumers) + + assert get_group_protocol_type(admin_client, topic) == expected_protocol + + # Produce some messages to the topic + kafka_cluster.seed_topic(topic) + list_offsets(admin_client, topic, number_of_partitions) + + while total_msg_read < 100: + for consumer in consumers: + # Poll for messages + msg = consumer.poll(0.1) + if msg is not None: + total_msg_read += 1 + + assert total_msg_read == 100, "Expected to read 100 messages, but read {}".format(total_msg_read) + + +def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, partition_assignment_strategy): + """ + Test consumer upgrade and downgrade. + """ + topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, + { + "num_partitions": number_of_partitions + }) + admin_client = kafka_cluster.admin() + + # Create a consumer with the latest version + consumer_conf = {'group.id': topic, + 'auto.offset.reset': 'earliest', + 'group.protocol': 'classic'} + consumer_conf['partition.assignment.strategy'] = partition_assignment_strategy + consumer = kafka_cluster.consumer(consumer_conf) + assert consumer is not None + consumer.subscribe([topic]) + check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC) + del consumer_conf['partition.assignment.strategy'] + + # Now simulate an upgrade by creating a new consumer with 'consumer' protocol + consumer_conf['group.protocol'] = 'consumer' + consumer2 = kafka_cluster.consumer(consumer_conf) + assert consumer2 is not None + consumer2.subscribe([topic]) + check_consumer(kafka_cluster, [consumer, consumer2], admin_client, topic, ConsumerGroupType.CONSUMER) + + # Now simulate a downgrade by deleting the second consumer and keeping only 'classic' consumer + consumer2.close() + check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC) + + consumer.close() + kafka_cluster.delete_topic(topic) + + +@pytest.mark.skipif(not TestUtils.use_group_protocol_consumer(), + reason="Skipping test as group protocol consumer is not enabled") +def test_consumer_upgrade_downgrade(kafka_cluster): + perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'roundrobin') + perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'range') + perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'cooperative-sticky') From d2a44bb721df6f3a56ba70d3d5040fd0d418b796 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 24 Sep 2025 19:00:43 +0530 Subject: [PATCH 02/10] Corrected license header --- tests/integration/consumer/test_consumer_error.py | 2 +- tests/integration/consumer/test_consumer_memberid.py | 2 +- .../consumer/test_consumer_topicpartition_metadata.py | 2 +- tests/integration/consumer/test_consumer_upgrade_downgrade.py | 2 +- tests/integration/consumer/test_cooperative_rebalance_1.py | 2 +- tests/integration/consumer/test_cooperative_rebalance_2.py | 2 +- tests/integration/consumer/test_incremental_assign.py | 2 +- tests/integration/producer/test_transactions.py | 2 +- tests/test_error.py | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/integration/consumer/test_consumer_error.py b/tests/integration/consumer/test_consumer_error.py index 6376b32d2..73f368e6f 100644 --- a/tests/integration/consumer/test_consumer_error.py +++ b/tests/integration/consumer/test_consumer_error.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. # import pytest diff --git a/tests/integration/consumer/test_consumer_memberid.py b/tests/integration/consumer/test_consumer_memberid.py index 68d545fcf..9dbaf325d 100644 --- a/tests/integration/consumer/test_consumer_memberid.py +++ b/tests/integration/consumer/test_consumer_memberid.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. import pytest from tests.common import TestUtils diff --git a/tests/integration/consumer/test_consumer_topicpartition_metadata.py b/tests/integration/consumer/test_consumer_topicpartition_metadata.py index a8c3734c4..e24b76204 100644 --- a/tests/integration/consumer/test_consumer_topicpartition_metadata.py +++ b/tests/integration/consumer/test_consumer_topicpartition_metadata.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. from confluent_kafka import TopicPartition diff --git a/tests/integration/consumer/test_consumer_upgrade_downgrade.py b/tests/integration/consumer/test_consumer_upgrade_downgrade.py index 4397e3c4f..996b869ab 100644 --- a/tests/integration/consumer/test_consumer_upgrade_downgrade.py +++ b/tests/integration/consumer/test_consumer_upgrade_downgrade.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. import pytest from confluent_kafka import ConsumerGroupType, IsolationLevel, KafkaException, TopicPartition diff --git a/tests/integration/consumer/test_cooperative_rebalance_1.py b/tests/integration/consumer/test_cooperative_rebalance_1.py index 2645c7028..52c9b6c8f 100644 --- a/tests/integration/consumer/test_cooperative_rebalance_1.py +++ b/tests/integration/consumer/test_cooperative_rebalance_1.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. import pytest import time diff --git a/tests/integration/consumer/test_cooperative_rebalance_2.py b/tests/integration/consumer/test_cooperative_rebalance_2.py index f3a6df91b..e6e9eeb82 100644 --- a/tests/integration/consumer/test_cooperative_rebalance_2.py +++ b/tests/integration/consumer/test_cooperative_rebalance_2.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. import pytest import time diff --git a/tests/integration/consumer/test_incremental_assign.py b/tests/integration/consumer/test_incremental_assign.py index e2d9e8941..2fa5140b8 100644 --- a/tests/integration/consumer/test_incremental_assign.py +++ b/tests/integration/consumer/test_incremental_assign.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. import pytest from uuid import uuid1 diff --git a/tests/integration/producer/test_transactions.py b/tests/integration/producer/test_transactions.py index ff83fea70..fcb77befc 100644 --- a/tests/integration/producer/test_transactions.py +++ b/tests/integration/producer/test_transactions.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. # import inspect import sys diff --git a/tests/test_error.py b/tests/test_error.py index a06dfc0ad..7a502c9d6 100644 --- a/tests/test_error.py +++ b/tests/test_error.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. # from confluent_kafka import KafkaError From b2a2179401f53da33cb6aca1e7ed1d889100053a Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 24 Sep 2025 20:26:31 +0530 Subject: [PATCH 03/10] Minor refactoring related to the PR comments * Update topic prefix to include partition.assignment.strategy * Add test to check uniqueness after each rebalance * Add test to check all partitions are assigned after each rebalance * Extracted variable for repetitive code part --- .../test_consumer_upgrade_downgrade.py | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/tests/integration/consumer/test_consumer_upgrade_downgrade.py b/tests/integration/consumer/test_consumer_upgrade_downgrade.py index 996b869ab..7f63ecea0 100644 --- a/tests/integration/consumer/test_consumer_upgrade_downgrade.py +++ b/tests/integration/consumer/test_consumer_upgrade_downgrade.py @@ -53,23 +53,19 @@ def list_offsets(a, topic, no_of_partitions): .format(partition.topic, partition.partition, e)) -# def produce_messages(producer, topic, partitions, num_messages): -# for i in range(num_messages): -# key = "key-{}".format(i) -# value = "value-{}".format(i) -# partition = i % partitions -# producer.produce(topic, key=key, value=value, partition=partition) -# producer.flush() - - def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_protocol): total_msg_read = 0 - while len(consumers[-1].assignment()) != number_of_partitions // len(consumers): + expected_partitions_per_consumer = number_of_partitions // len(consumers) + while len(consumers[-1].assignment()) != expected_partitions_per_consumer: for consumer in consumers: consumer.poll(0.1) + all_assignments = set() for consumer in consumers: - assert len(consumer.assignment()) == number_of_partitions // len(consumers) + assignment = consumer.assignment() + all_assignments.update(assignment) + assert len(assignment) == expected_partitions_per_consumer + assert len(all_assignments) == number_of_partitions assert get_group_protocol_type(admin_client, topic) == expected_protocol @@ -91,7 +87,8 @@ def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(k """ Test consumer upgrade and downgrade. """ - topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, + topic_name_prefix = f"{topic_prefix}_{partition_assignment_strategy}" + topic = kafka_cluster.create_topic_and_wait_propogation(topic_name_prefix, { "num_partitions": number_of_partitions }) @@ -114,7 +111,7 @@ def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(k assert consumer2 is not None consumer2.subscribe([topic]) check_consumer(kafka_cluster, [consumer, consumer2], admin_client, topic, ConsumerGroupType.CONSUMER) - + # Now simulate a downgrade by deleting the second consumer and keeping only 'classic' consumer consumer2.close() check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC) From 0d0d7ceddddd972bc7930179acd6ce1a992c220f Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 24 Sep 2025 20:31:41 +0530 Subject: [PATCH 04/10] Removed list_offsets usage --- .../test_consumer_upgrade_downgrade.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/tests/integration/consumer/test_consumer_upgrade_downgrade.py b/tests/integration/consumer/test_consumer_upgrade_downgrade.py index 7f63ecea0..18120cb96 100644 --- a/tests/integration/consumer/test_consumer_upgrade_downgrade.py +++ b/tests/integration/consumer/test_consumer_upgrade_downgrade.py @@ -36,23 +36,6 @@ def get_group_protocol_type(a, group_id): raise -def list_offsets(a, topic, no_of_partitions): - topic_partition_offsets = {} - for partition in range(no_of_partitions): - topic_partition = TopicPartition(topic, partition) - topic_partition_offsets[topic_partition] = OffsetSpec.latest() - futmap = a.list_offsets(topic_partition_offsets, isolation_level=IsolationLevel.READ_COMMITTED, request_timeout=30) - for partition, fut in futmap.items(): - try: - result = fut.result() - print("Topicname : {} Partition_Index : {} Offset : {} Timestamp : {}" - .format(partition.topic, partition.partition, result.offset, - result.timestamp)) - except KafkaException as e: - print("Topicname : {} Partition_Index : {} Error : {}" - .format(partition.topic, partition.partition, e)) - - def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_protocol): total_msg_read = 0 expected_partitions_per_consumer = number_of_partitions // len(consumers) @@ -71,7 +54,6 @@ def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_proto # Produce some messages to the topic kafka_cluster.seed_topic(topic) - list_offsets(admin_client, topic, number_of_partitions) while total_msg_read < 100: for consumer in consumers: From 12ca622eea1e061f24107fb9a2e8c776a2dc0627 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 24 Sep 2025 20:53:31 +0530 Subject: [PATCH 05/10] Created two configs for classic and consumer protocol consumer --- .../test_consumer_upgrade_downgrade.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/tests/integration/consumer/test_consumer_upgrade_downgrade.py b/tests/integration/consumer/test_consumer_upgrade_downgrade.py index 18120cb96..82ca42f26 100644 --- a/tests/integration/consumer/test_consumer_upgrade_downgrade.py +++ b/tests/integration/consumer/test_consumer_upgrade_downgrade.py @@ -16,8 +16,7 @@ # limitations under the License. import pytest -from confluent_kafka import ConsumerGroupType, IsolationLevel, KafkaException, TopicPartition -from confluent_kafka.admin import OffsetSpec +from confluent_kafka import ConsumerGroupType, KafkaException from tests.common import TestUtils topic_prefix = "test_consumer_upgrade_downgrade_" @@ -76,20 +75,25 @@ def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(k }) admin_client = kafka_cluster.admin() - # Create a consumer with the latest version consumer_conf = {'group.id': topic, - 'auto.offset.reset': 'earliest', - 'group.protocol': 'classic'} - consumer_conf['partition.assignment.strategy'] = partition_assignment_strategy - consumer = kafka_cluster.consumer(consumer_conf) + 'auto.offset.reset': 'earliest'} + consumer_conf_classic = { + 'group.protocol': 'classic', + 'partition.assignment.strategy': partition_assignment_strategy, + **consumer_conf + } + consumer_conf_consumer = { + 'group.protocol': 'consumer', + **consumer_conf + } + + consumer = kafka_cluster.consumer(consumer_conf_classic) assert consumer is not None consumer.subscribe([topic]) check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC) - del consumer_conf['partition.assignment.strategy'] # Now simulate an upgrade by creating a new consumer with 'consumer' protocol - consumer_conf['group.protocol'] = 'consumer' - consumer2 = kafka_cluster.consumer(consumer_conf) + consumer2 = kafka_cluster.consumer(consumer_conf_consumer) assert consumer2 is not None consumer2.subscribe([topic]) check_consumer(kafka_cluster, [consumer, consumer2], admin_client, topic, ConsumerGroupType.CONSUMER) From 1a6721250ec454fa715c9173eb91fc905fd40950 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 24 Sep 2025 21:05:52 +0530 Subject: [PATCH 06/10] Reverted changes for seed_topics --- tests/integration/cluster_fixture.py | 2 +- .../consumer/test_consumer_upgrade_downgrade.py | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/integration/cluster_fixture.py b/tests/integration/cluster_fixture.py index e6bf06a84..e8f9f6096 100644 --- a/tests/integration/cluster_fixture.py +++ b/tests/integration/cluster_fixture.py @@ -286,7 +286,7 @@ def seed_topic(self, topic, value_source=None, key_source=None, header_source=No value_source = ['test-data{}'.format(i) for i in range(0, 100)] if key_source is None: - key_source = ['test-key{}'.format(i) for i in range(0, 100)] + key_source = [None] KafkaClusterFixture._produce(self._producer, topic, value_source, key_source, header_source) diff --git a/tests/integration/consumer/test_consumer_upgrade_downgrade.py b/tests/integration/consumer/test_consumer_upgrade_downgrade.py index 82ca42f26..956528c32 100644 --- a/tests/integration/consumer/test_consumer_upgrade_downgrade.py +++ b/tests/integration/consumer/test_consumer_upgrade_downgrade.py @@ -36,6 +36,7 @@ def get_group_protocol_type(a, group_id): def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_protocol): + no_of_messages = 100 total_msg_read = 0 expected_partitions_per_consumer = number_of_partitions // len(consumers) while len(consumers[-1].assignment()) != expected_partitions_per_consumer: @@ -52,16 +53,18 @@ def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_proto assert get_group_protocol_type(admin_client, topic) == expected_protocol # Produce some messages to the topic - kafka_cluster.seed_topic(topic) + test_data = ['test-data{}'.format(i) for i in range(0, no_of_messages)] + test_keys = ['test-key{}'.format(i) for i in range(0, no_of_messages)] # we want each partition to have data + kafka_cluster.seed_topic(topic, test_data, test_keys) - while total_msg_read < 100: + while total_msg_read < no_of_messages: for consumer in consumers: # Poll for messages msg = consumer.poll(0.1) if msg is not None: total_msg_read += 1 - - assert total_msg_read == 100, "Expected to read 100 messages, but read {}".format(total_msg_read) + + assert total_msg_read == no_of_messages, f"Expected to read {no_of_messages} messages, but read {total_msg_read}" def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, partition_assignment_strategy): From cc506a35317809c699393735842079e7e193f9a3 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 24 Sep 2025 21:34:45 +0530 Subject: [PATCH 07/10] Added a test case to remove all the classic consumers --- .../test_consumer_upgrade_downgrade.py | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/tests/integration/consumer/test_consumer_upgrade_downgrade.py b/tests/integration/consumer/test_consumer_upgrade_downgrade.py index 956528c32..e341b1523 100644 --- a/tests/integration/consumer/test_consumer_upgrade_downgrade.py +++ b/tests/integration/consumer/test_consumer_upgrade_downgrade.py @@ -16,6 +16,7 @@ # limitations under the License. import pytest +from enum import Enum from confluent_kafka import ConsumerGroupType, KafkaException from tests.common import TestUtils @@ -35,7 +36,7 @@ def get_group_protocol_type(a, group_id): raise -def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_protocol): +def check_consumer(kafka_cluster, consumers, admin_client, group_id, topic, expected_protocol): no_of_messages = 100 total_msg_read = 0 expected_partitions_per_consumer = number_of_partitions // len(consumers) @@ -50,7 +51,7 @@ def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_proto assert len(assignment) == expected_partitions_per_consumer assert len(all_assignments) == number_of_partitions - assert get_group_protocol_type(admin_client, topic) == expected_protocol + assert get_group_protocol_type(admin_client, group_id) == expected_protocol # Produce some messages to the topic test_data = ['test-data{}'.format(i) for i in range(0, no_of_messages)] @@ -67,6 +68,11 @@ def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_proto assert total_msg_read == no_of_messages, f"Expected to read {no_of_messages} messages, but read {total_msg_read}" +class Operation(Enum): + ADD = 0 + REMOVE = 1 + + def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, partition_assignment_strategy): """ Test consumer upgrade and downgrade. @@ -90,22 +96,26 @@ def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(k **consumer_conf } - consumer = kafka_cluster.consumer(consumer_conf_classic) - assert consumer is not None - consumer.subscribe([topic]) - check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC) - - # Now simulate an upgrade by creating a new consumer with 'consumer' protocol - consumer2 = kafka_cluster.consumer(consumer_conf_consumer) - assert consumer2 is not None - consumer2.subscribe([topic]) - check_consumer(kafka_cluster, [consumer, consumer2], admin_client, topic, ConsumerGroupType.CONSUMER) - - # Now simulate a downgrade by deleting the second consumer and keeping only 'classic' consumer - consumer2.close() - check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC) - - consumer.close() + test_scenarios = [(Operation.ADD, consumer_conf_classic, ConsumerGroupType.CLASSIC), + (Operation.ADD, consumer_conf_consumer, ConsumerGroupType.CONSUMER), + (Operation.REMOVE, None, ConsumerGroupType.CONSUMER), + (Operation.ADD, consumer_conf_classic, ConsumerGroupType.CONSUMER), + (Operation.REMOVE, None, ConsumerGroupType.CLASSIC)] + consumers = [] + + for operation, conf, expected_protocol in test_scenarios: + if operation == Operation.ADD: + consumer = kafka_cluster.consumer(conf) + assert consumer is not None + consumer.subscribe([topic]) + consumers.append(consumer) + elif operation == Operation.REMOVE: + consumer_to_remove = consumers.pop(0) + consumer_to_remove.close() + check_consumer(kafka_cluster, consumers, admin_client, topic, topic, expected_protocol) + + assert len(consumers) == 1 + consumers[0].close() kafka_cluster.delete_topic(topic) From bf64b91b4efcafc25059d4cfbd29e8cae94f2bcf Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 24 Sep 2025 23:05:40 +0530 Subject: [PATCH 08/10] Style fixes --- tests/common/__init__.py | 7 +++++-- tests/integration/cluster_fixture.py | 2 +- .../consumer/test_consumer_upgrade_downgrade.py | 9 +++++---- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/tests/common/__init__.py b/tests/common/__init__.py index 5834604c1..0bf9c54ea 100644 --- a/tests/common/__init__.py +++ b/tests/common/__init__.py @@ -60,11 +60,14 @@ def update_conf_group_protocol(conf=None): @staticmethod def can_upgrade_group_protocol_to_consumer(conf): - return conf is not None and 'group.id' in conf and 'group.protocol' not in conf and TestUtils.use_group_protocol_consumer() + return (conf is not None and 'group.id' in conf and + 'group.protocol' not in conf and TestUtils.use_group_protocol_consumer()) @staticmethod def remove_forbidden_conf_group_protocol_consumer(conf): - if conf is None or not TestUtils.use_group_protocol_consumer() or conf.get('group.protocol', 'consumer') != 'consumer': + if (conf is None or + not TestUtils.use_group_protocol_consumer() or + conf.get('group.protocol', 'consumer') != 'consumer'): return forbidden_conf_properties = ["session.timeout.ms", "partition.assignment.strategy", diff --git a/tests/integration/cluster_fixture.py b/tests/integration/cluster_fixture.py index e8f9f6096..9d92937a9 100644 --- a/tests/integration/cluster_fixture.py +++ b/tests/integration/cluster_fixture.py @@ -233,7 +233,7 @@ def create_topic(self, prefix, conf=None, **create_topic_kwargs): future_topic.get(name).result() return name - + def delete_topic(self, topic): """ Deletes a topic with this cluster. diff --git a/tests/integration/consumer/test_consumer_upgrade_downgrade.py b/tests/integration/consumer/test_consumer_upgrade_downgrade.py index e341b1523..3d09c42b7 100644 --- a/tests/integration/consumer/test_consumer_upgrade_downgrade.py +++ b/tests/integration/consumer/test_consumer_upgrade_downgrade.py @@ -73,15 +73,16 @@ class Operation(Enum): REMOVE = 1 -def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, partition_assignment_strategy): +def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy( + kafka_cluster, partition_assignment_strategy): """ Test consumer upgrade and downgrade. """ topic_name_prefix = f"{topic_prefix}_{partition_assignment_strategy}" topic = kafka_cluster.create_topic_and_wait_propogation(topic_name_prefix, - { - "num_partitions": number_of_partitions - }) + { + "num_partitions": number_of_partitions + }) admin_client = kafka_cluster.admin() consumer_conf = {'group.id': topic, From 5a902bb49fe843330fc14ba144847a76c47d0d09 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 24 Sep 2025 23:15:33 +0530 Subject: [PATCH 09/10] Fix 'propogation' a typo - 'propagation' --- tests/integration/admin/test_basic_operations.py | 2 +- tests/integration/admin/test_delete_records.py | 6 +++--- tests/integration/admin/test_describe_operations.py | 2 +- .../admin/test_incremental_alter_configs.py | 4 ++-- tests/integration/admin/test_list_offsets.py | 2 +- tests/integration/cluster_fixture.py | 6 +++--- tests/integration/consumer/test_consumer_error.py | 6 +++--- tests/integration/consumer/test_consumer_memberid.py | 2 +- .../test_consumer_topicpartition_metadata.py | 2 +- .../consumer/test_consumer_upgrade_downgrade.py | 2 +- .../consumer/test_cooperative_rebalance_1.py | 4 ++-- .../consumer/test_cooperative_rebalance_2.py | 2 +- .../integration/consumer/test_incremental_assign.py | 4 ++-- tests/integration/producer/test_transactions.py | 10 +++++----- .../schema_registry/_async/test_avro_serializers.py | 8 ++++---- .../schema_registry/_async/test_json_serializers.py | 12 ++++++------ .../schema_registry/_async/test_proto_serializers.py | 8 ++++---- .../schema_registry/_sync/test_avro_serializers.py | 8 ++++---- .../schema_registry/_sync/test_json_serializers.py | 12 ++++++------ .../schema_registry/_sync/test_proto_serializers.py | 8 ++++---- tests/integration/serialization/test_serializers.py | 6 +++--- 21 files changed, 58 insertions(+), 58 deletions(-) diff --git a/tests/integration/admin/test_basic_operations.py b/tests/integration/admin/test_basic_operations.py index 9889409de..7efa92855 100644 --- a/tests/integration/admin/test_basic_operations.py +++ b/tests/integration/admin/test_basic_operations.py @@ -208,7 +208,7 @@ def test_basic_operations(kafka_cluster): # Second iteration: create topic. # for validate in (True, False): - our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, + our_topic = kafka_cluster.create_topic_and_wait_propagation(topic_prefix, { "num_partitions": num_partitions, "config": topic_config, diff --git a/tests/integration/admin/test_delete_records.py b/tests/integration/admin/test_delete_records.py index 2c58e36c4..2717013a6 100644 --- a/tests/integration/admin/test_delete_records.py +++ b/tests/integration/admin/test_delete_records.py @@ -25,7 +25,7 @@ def test_delete_records(kafka_cluster): admin_client = kafka_cluster.admin() # Create a topic with a single partition - topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records", + topic = kafka_cluster.create_topic_and_wait_propagation("test-del-records", { "num_partitions": 1, "replication_factor": 1, @@ -73,12 +73,12 @@ def test_delete_records_multiple_topics_and_partitions(kafka_cluster): admin_client = kafka_cluster.admin() num_partitions = 3 # Create two topics with a single partition - topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records", + topic = kafka_cluster.create_topic_and_wait_propagation("test-del-records", { "num_partitions": num_partitions, "replication_factor": 1, }) - topic2 = kafka_cluster.create_topic_and_wait_propogation("test-del-records2", + topic2 = kafka_cluster.create_topic_and_wait_propagation("test-del-records2", { "num_partitions": num_partitions, "replication_factor": 1, diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index 3bfbfd88a..12f366eeb 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -204,7 +204,7 @@ def test_describe_operations(sasl_cluster): # Create Topic topic_config = {"compression.type": "gzip"} - our_topic = sasl_cluster.create_topic_and_wait_propogation(topic_prefix, + our_topic = sasl_cluster.create_topic_and_wait_propagation(topic_prefix, { "num_partitions": 1, "config": topic_config, diff --git a/tests/integration/admin/test_incremental_alter_configs.py b/tests/integration/admin/test_incremental_alter_configs.py index 43e43ebee..84ca8d4bb 100644 --- a/tests/integration/admin/test_incremental_alter_configs.py +++ b/tests/integration/admin/test_incremental_alter_configs.py @@ -56,13 +56,13 @@ def test_incremental_alter_configs(kafka_cluster): num_partitions = 2 topic_config = {"compression.type": "gzip"} - our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, + our_topic = kafka_cluster.create_topic_and_wait_propagation(topic_prefix, { "num_partitions": num_partitions, "config": topic_config, "replication_factor": 1, }) - our_topic2 = kafka_cluster.create_topic_and_wait_propogation(topic_prefix2, + our_topic2 = kafka_cluster.create_topic_and_wait_propagation(topic_prefix2, { "num_partitions": num_partitions, "config": topic_config, diff --git a/tests/integration/admin/test_list_offsets.py b/tests/integration/admin/test_list_offsets.py index f1448b267..b1189aa27 100644 --- a/tests/integration/admin/test_list_offsets.py +++ b/tests/integration/admin/test_list_offsets.py @@ -27,7 +27,7 @@ def test_list_offsets(kafka_cluster): admin_client = kafka_cluster.admin() # Create a topic with a single partition - topic = kafka_cluster.create_topic_and_wait_propogation("test-topic-verify-list-offsets", + topic = kafka_cluster.create_topic_and_wait_propagation("test-topic-verify-list-offsets", { "num_partitions": 1, "replication_factor": 1, diff --git a/tests/integration/cluster_fixture.py b/tests/integration/cluster_fixture.py index 9d92937a9..a57cd6562 100644 --- a/tests/integration/cluster_fixture.py +++ b/tests/integration/cluster_fixture.py @@ -247,9 +247,9 @@ def delete_topic(self, topic): except Exception as e: print("Failed to delete topic {}: {}".format(topic, e)) - def create_topic_and_wait_propogation(self, prefix, conf=None, **create_topic_kwargs): + def create_topic_and_wait_propagation(self, prefix, conf=None, **create_topic_kwargs): """ - Creates a new topic with this cluster. Wait for the topic to be propogated to all brokers. + Creates a new topic with this cluster. Wait for the topic to be propagated to all brokers. :param str prefix: topic name :param dict conf: additions/overrides to topic configuration. @@ -258,7 +258,7 @@ def create_topic_and_wait_propogation(self, prefix, conf=None, **create_topic_kw """ name = self.create_topic(prefix, conf, **create_topic_kwargs) - # wait for topic propogation across all the brokers. + # wait for topic propagation across all the brokers. # FIXME: find a better way to wait for topic creation # for all languages, given option to send request to # a specific broker isn't present everywhere. diff --git a/tests/integration/consumer/test_consumer_error.py b/tests/integration/consumer/test_consumer_error.py index 73f368e6f..2510315ea 100644 --- a/tests/integration/consumer/test_consumer_error.py +++ b/tests/integration/consumer/test_consumer_error.py @@ -29,7 +29,7 @@ def test_consume_error(kafka_cluster): Tests to ensure librdkafka errors are propagated as an instance of ConsumeError. """ - topic = kafka_cluster.create_topic_and_wait_propogation("test_commit_transaction") + topic = kafka_cluster.create_topic_and_wait_propagation("test_commit_transaction") consumer_conf = {'group.id': 'pytest', 'enable.partition.eof': True} producer = kafka_cluster.producer() @@ -59,7 +59,7 @@ def test_consume_error_commit(kafka_cluster): """ Tests to ensure that we handle messages with errors when commiting. """ - topic = kafka_cluster.create_topic_and_wait_propogation("test_commit_transaction") + topic = kafka_cluster.create_topic_and_wait_propagation("test_commit_transaction") consumer_conf = {'group.id': 'pytest', 'session.timeout.ms': 100} @@ -91,7 +91,7 @@ def test_consume_error_store_offsets(kafka_cluster): """ Tests to ensure that we handle messages with errors when storing offsets. """ - topic = kafka_cluster.create_topic_and_wait_propogation("test_commit_transaction") + topic = kafka_cluster.create_topic_and_wait_propagation("test_commit_transaction") consumer_conf = {'group.id': 'pytest', 'session.timeout.ms': 100, 'enable.auto.offset.store': True, diff --git a/tests/integration/consumer/test_consumer_memberid.py b/tests/integration/consumer/test_consumer_memberid.py index 9dbaf325d..445b420bb 100644 --- a/tests/integration/consumer/test_consumer_memberid.py +++ b/tests/integration/consumer/test_consumer_memberid.py @@ -28,7 +28,7 @@ def test_consumer_memberid(kafka_cluster): topic = "testmemberid" - kafka_cluster.create_topic_and_wait_propogation(topic) + kafka_cluster.create_topic_and_wait_propagation(topic) consumer = kafka_cluster.consumer(consumer_conf) diff --git a/tests/integration/consumer/test_consumer_topicpartition_metadata.py b/tests/integration/consumer/test_consumer_topicpartition_metadata.py index e24b76204..923a13ab6 100644 --- a/tests/integration/consumer/test_consumer_topicpartition_metadata.py +++ b/tests/integration/consumer/test_consumer_topicpartition_metadata.py @@ -30,7 +30,7 @@ def commit_and_check(consumer, topic, metadata): def test_consumer_topicpartition_metadata(kafka_cluster): - topic = kafka_cluster.create_topic_and_wait_propogation("test_topicpartition") + topic = kafka_cluster.create_topic_and_wait_propagation("test_topicpartition") consumer_conf = {'group.id': 'test_topicpartition'} c = kafka_cluster.consumer(consumer_conf) diff --git a/tests/integration/consumer/test_consumer_upgrade_downgrade.py b/tests/integration/consumer/test_consumer_upgrade_downgrade.py index 3d09c42b7..2be96cf22 100644 --- a/tests/integration/consumer/test_consumer_upgrade_downgrade.py +++ b/tests/integration/consumer/test_consumer_upgrade_downgrade.py @@ -79,7 +79,7 @@ def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy( Test consumer upgrade and downgrade. """ topic_name_prefix = f"{topic_prefix}_{partition_assignment_strategy}" - topic = kafka_cluster.create_topic_and_wait_propogation(topic_name_prefix, + topic = kafka_cluster.create_topic_and_wait_propagation(topic_name_prefix, { "num_partitions": number_of_partitions }) diff --git a/tests/integration/consumer/test_cooperative_rebalance_1.py b/tests/integration/consumer/test_cooperative_rebalance_1.py index 52c9b6c8f..f3ffb4bfb 100644 --- a/tests/integration/consumer/test_cooperative_rebalance_1.py +++ b/tests/integration/consumer/test_cooperative_rebalance_1.py @@ -60,8 +60,8 @@ def on_lost(self, consumer, partitions): reb = RebalanceState() - topic1 = kafka_cluster.create_topic_and_wait_propogation("topic1") - topic2 = kafka_cluster.create_topic_and_wait_propogation("topic2") + topic1 = kafka_cluster.create_topic_and_wait_propagation("topic1") + topic2 = kafka_cluster.create_topic_and_wait_propagation("topic2") consumer = kafka_cluster.consumer(consumer_conf) diff --git a/tests/integration/consumer/test_cooperative_rebalance_2.py b/tests/integration/consumer/test_cooperative_rebalance_2.py index e6e9eeb82..be9fd490b 100644 --- a/tests/integration/consumer/test_cooperative_rebalance_2.py +++ b/tests/integration/consumer/test_cooperative_rebalance_2.py @@ -52,7 +52,7 @@ def on_revoke(self, consumer, partitions): reb = RebalanceState() - topic1 = kafka_cluster.create_topic_and_wait_propogation("topic1") + topic1 = kafka_cluster.create_topic_and_wait_propagation("topic1") consumer = kafka_cluster.consumer(consumer_conf) diff --git a/tests/integration/consumer/test_incremental_assign.py b/tests/integration/consumer/test_incremental_assign.py index 2fa5140b8..94e0839b8 100644 --- a/tests/integration/consumer/test_incremental_assign.py +++ b/tests/integration/consumer/test_incremental_assign.py @@ -30,8 +30,8 @@ def test_incremental_assign(kafka_cluster): 'enable.auto.commit': 'false', 'auto.offset.reset': 'error'} - topic1 = kafka_cluster.create_topic_and_wait_propogation("topic1") - topic2 = kafka_cluster.create_topic_and_wait_propogation("topic2") + topic1 = kafka_cluster.create_topic_and_wait_propagation("topic1") + topic2 = kafka_cluster.create_topic_and_wait_propagation("topic2") kafka_cluster.seed_topic(topic1, value_source=[b'a']) kafka_cluster.seed_topic(topic2, value_source=[b'b']) diff --git a/tests/integration/producer/test_transactions.py b/tests/integration/producer/test_transactions.py index fcb77befc..60f272c0b 100644 --- a/tests/integration/producer/test_transactions.py +++ b/tests/integration/producer/test_transactions.py @@ -51,7 +51,7 @@ def delivery_err(err, msg): def test_commit_transaction(kafka_cluster): - output_topic = kafka_cluster.create_topic_and_wait_propogation("output_topic") + output_topic = kafka_cluster.create_topic_and_wait_propagation("output_topic") producer = kafka_cluster.producer({ 'transactional.id': 'example_transactional_id', @@ -66,7 +66,7 @@ def test_commit_transaction(kafka_cluster): def test_abort_transaction(kafka_cluster): - output_topic = kafka_cluster.create_topic_and_wait_propogation("output_topic") + output_topic = kafka_cluster.create_topic_and_wait_propagation("output_topic") producer = kafka_cluster.producer({ 'transactional.id': 'example_transactional_id', @@ -81,7 +81,7 @@ def test_abort_transaction(kafka_cluster): def test_abort_retry_commit_transaction(kafka_cluster): - output_topic = kafka_cluster.create_topic_and_wait_propogation("output_topic") + output_topic = kafka_cluster.create_topic_and_wait_propagation("output_topic") producer = kafka_cluster.producer({ 'transactional.id': 'example_transactional_id', @@ -99,8 +99,8 @@ def test_abort_retry_commit_transaction(kafka_cluster): def test_send_offsets_committed_transaction(kafka_cluster): - input_topic = kafka_cluster.create_topic_and_wait_propogation("input_topic") - output_topic = kafka_cluster.create_topic_and_wait_propogation("output_topic") + input_topic = kafka_cluster.create_topic_and_wait_propagation("input_topic") + output_topic = kafka_cluster.create_topic_and_wait_propagation("output_topic") error_cb = prefixed_error_cb('test_send_offsets_committed_transaction') producer = kafka_cluster.producer({ 'client.id': 'producer1', diff --git a/tests/integration/schema_registry/_async/test_avro_serializers.py b/tests/integration/schema_registry/_async/test_avro_serializers.py index 9cc871277..e6ba25489 100644 --- a/tests/integration/schema_registry/_async/test_avro_serializers.py +++ b/tests/integration/schema_registry/_async/test_avro_serializers.py @@ -151,7 +151,7 @@ async def _references_test_common(kafka_cluster, awarded_user, serializer_schema Args: kafka_cluster (KafkaClusterFixture): cluster fixture """ - topic = kafka_cluster.create_topic_and_wait_propogation("reference-avro") + topic = kafka_cluster.create_topic_and_wait_propagation("reference-avro") sr = kafka_cluster.async_schema_registry() value_serializer = await AsyncAvroSerializer( @@ -223,7 +223,7 @@ async def test_avro_record_serialization(kafka_cluster, load_file, avsc, data, r data (object): data to be serialized """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-avro") sr = kafka_cluster.async_schema_registry() schema_str = load_file(avsc) @@ -267,7 +267,7 @@ async def test_delivery_report_serialization(kafka_cluster, load_file, avsc, dat data (object): data to be serialized """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro-dr") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-avro-dr") sr = kafka_cluster.async_schema_registry() schema_str = load_file(avsc) @@ -314,7 +314,7 @@ async def test_avro_record_serialization_custom(kafka_cluster): kafka_cluster (KafkaClusterFixture): cluster fixture """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-avro") sr = kafka_cluster.async_schema_registry() user = User('Bowie', 47, 'purple') diff --git a/tests/integration/schema_registry/_async/test_json_serializers.py b/tests/integration/schema_registry/_async/test_json_serializers.py index 464b41836..ab1f015a3 100644 --- a/tests/integration/schema_registry/_async/test_json_serializers.py +++ b/tests/integration/schema_registry/_async/test_json_serializers.py @@ -257,7 +257,7 @@ async def test_json_record_serialization(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.async_schema_registry() schema_str = load_file("product.json") @@ -305,7 +305,7 @@ async def test_json_record_serialization_incompatible(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.async_schema_registry() schema_str = load_file("product.json") @@ -332,7 +332,7 @@ async def test_json_record_serialization_custom(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.async_schema_registry() schema_str = load_file("product.json") @@ -377,7 +377,7 @@ async def test_json_record_deserialization_mismatch(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.async_schema_registry() schema_str = load_file("contractor.json") @@ -418,7 +418,7 @@ async def _register_referenced_schemas(sr: AsyncSchemaRegistryClient, load_file) async def test_json_reference(kafka_cluster, load_file): - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.async_schema_registry() product = {"productId": 1, @@ -457,7 +457,7 @@ async def test_json_reference(kafka_cluster, load_file): async def test_json_reference_custom(kafka_cluster, load_file): - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.async_schema_registry() product = _TestProduct(product_id=1, diff --git a/tests/integration/schema_registry/_async/test_proto_serializers.py b/tests/integration/schema_registry/_async/test_proto_serializers.py index 0e65686e2..5dade6f81 100644 --- a/tests/integration/schema_registry/_async/test_proto_serializers.py +++ b/tests/integration/schema_registry/_async/test_proto_serializers.py @@ -52,7 +52,7 @@ async def test_protobuf_message_serialization(kafka_cluster, pb2, data): Validates that we get the same message back that we put in. """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto") sr = kafka_cluster.async_schema_registry() value_serializer = await AsyncProtobufSerializer(pb2, sr, {'use.deprecated.format': False}) @@ -85,7 +85,7 @@ async def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs """ sr = kafka_cluster.async_schema_registry() - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto-refs") serializer = await AsyncProtobufSerializer(pb2, sr, {'use.deprecated.format': False}) producer = kafka_cluster.async_producer(key_serializer=serializer) @@ -106,7 +106,7 @@ async def test_protobuf_serializer_type_mismatch(kafka_cluster): pb2_2 = NestedTestProto_pb2.NestedMessage sr = kafka_cluster.async_schema_registry() - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto-refs") serializer = await AsyncProtobufSerializer(pb2_1, sr, {'use.deprecated.format': False}) producer = kafka_cluster.async_producer(key_serializer=serializer) @@ -127,7 +127,7 @@ async def test_protobuf_deserializer_type_mismatch(kafka_cluster): pb2_2 = metadata_proto_pb2.HDFSOptions sr = kafka_cluster.async_schema_registry() - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto-refs") serializer = await AsyncProtobufSerializer(pb2_1, sr, {'use.deprecated.format': False}) deserializer = await AsyncProtobufDeserializer(pb2_2, {'use.deprecated.format': False}) diff --git a/tests/integration/schema_registry/_sync/test_avro_serializers.py b/tests/integration/schema_registry/_sync/test_avro_serializers.py index 9385c8fa4..87a57eeec 100644 --- a/tests/integration/schema_registry/_sync/test_avro_serializers.py +++ b/tests/integration/schema_registry/_sync/test_avro_serializers.py @@ -151,7 +151,7 @@ def _references_test_common(kafka_cluster, awarded_user, serializer_schema, dese Args: kafka_cluster (KafkaClusterFixture): cluster fixture """ - topic = kafka_cluster.create_topic_and_wait_propogation("reference-avro") + topic = kafka_cluster.create_topic_and_wait_propagation("reference-avro") sr = kafka_cluster.schema_registry() value_serializer = AvroSerializer( @@ -223,7 +223,7 @@ def test_avro_record_serialization(kafka_cluster, load_file, avsc, data, record_ data (object): data to be serialized """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-avro") sr = kafka_cluster.schema_registry() schema_str = load_file(avsc) @@ -267,7 +267,7 @@ def test_delivery_report_serialization(kafka_cluster, load_file, avsc, data, rec data (object): data to be serialized """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro-dr") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-avro-dr") sr = kafka_cluster.schema_registry() schema_str = load_file(avsc) @@ -314,7 +314,7 @@ def test_avro_record_serialization_custom(kafka_cluster): kafka_cluster (KafkaClusterFixture): cluster fixture """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-avro") sr = kafka_cluster.schema_registry() user = User('Bowie', 47, 'purple') diff --git a/tests/integration/schema_registry/_sync/test_json_serializers.py b/tests/integration/schema_registry/_sync/test_json_serializers.py index 0c4a2545d..01344b23e 100644 --- a/tests/integration/schema_registry/_sync/test_json_serializers.py +++ b/tests/integration/schema_registry/_sync/test_json_serializers.py @@ -257,7 +257,7 @@ def test_json_record_serialization(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.schema_registry() schema_str = load_file("product.json") @@ -305,7 +305,7 @@ def test_json_record_serialization_incompatible(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.schema_registry() schema_str = load_file("product.json") @@ -332,7 +332,7 @@ def test_json_record_serialization_custom(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.schema_registry() schema_str = load_file("product.json") @@ -377,7 +377,7 @@ def test_json_record_deserialization_mismatch(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.schema_registry() schema_str = load_file("contractor.json") @@ -418,7 +418,7 @@ def _register_referenced_schemas(sr: SchemaRegistryClient, load_file): def test_json_reference(kafka_cluster, load_file): - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.schema_registry() product = {"productId": 1, @@ -457,7 +457,7 @@ def test_json_reference(kafka_cluster, load_file): def test_json_reference_custom(kafka_cluster, load_file): - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.schema_registry() product = _TestProduct(product_id=1, diff --git a/tests/integration/schema_registry/_sync/test_proto_serializers.py b/tests/integration/schema_registry/_sync/test_proto_serializers.py index 9b3ca3197..20c909761 100644 --- a/tests/integration/schema_registry/_sync/test_proto_serializers.py +++ b/tests/integration/schema_registry/_sync/test_proto_serializers.py @@ -52,7 +52,7 @@ def test_protobuf_message_serialization(kafka_cluster, pb2, data): Validates that we get the same message back that we put in. """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto") sr = kafka_cluster.schema_registry() value_serializer = ProtobufSerializer(pb2, sr, {'use.deprecated.format': False}) @@ -85,7 +85,7 @@ def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs): """ sr = kafka_cluster.schema_registry() - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto-refs") serializer = ProtobufSerializer(pb2, sr, {'use.deprecated.format': False}) producer = kafka_cluster.producer(key_serializer=serializer) @@ -106,7 +106,7 @@ def test_protobuf_serializer_type_mismatch(kafka_cluster): pb2_2 = NestedTestProto_pb2.NestedMessage sr = kafka_cluster.schema_registry() - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto-refs") serializer = ProtobufSerializer(pb2_1, sr, {'use.deprecated.format': False}) producer = kafka_cluster.producer(key_serializer=serializer) @@ -127,7 +127,7 @@ def test_protobuf_deserializer_type_mismatch(kafka_cluster): pb2_2 = metadata_proto_pb2.HDFSOptions sr = kafka_cluster.schema_registry() - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto-refs") serializer = ProtobufSerializer(pb2_1, sr, {'use.deprecated.format': False}) deserializer = ProtobufDeserializer(pb2_2, {'use.deprecated.format': False}) diff --git a/tests/integration/serialization/test_serializers.py b/tests/integration/serialization/test_serializers.py index f8ac10d0e..fe5802b1b 100644 --- a/tests/integration/serialization/test_serializers.py +++ b/tests/integration/serialization/test_serializers.py @@ -45,7 +45,7 @@ def test_numeric_serialization(kafka_cluster, serializer, deserializer, data): data(object): input data """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-numeric") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-numeric") producer = kafka_cluster.producer(value_serializer=serializer) producer.produce(topic, value=data) @@ -77,7 +77,7 @@ def test_string_serialization(kafka_cluster, data, codec): codec (str): encoding type """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-string") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-string") producer = kafka_cluster.producer(value_serializer=StringSerializer(codec)) @@ -119,7 +119,7 @@ def test_mixed_serialization(kafka_cluster, key_serializer, value_serializer, value (object): value data """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-numeric") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-numeric") producer = kafka_cluster.producer(key_serializer=key_serializer, value_serializer=value_serializer) From 76cabdfdf9f1f2028e73ec843478a20bc7cb3590 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 24 Sep 2025 23:42:47 +0530 Subject: [PATCH 10/10] Upgraded clang-format from 10 to 18 --- tools/style-format.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/style-format.sh b/tools/style-format.sh index a686cc6f0..462d80bf1 100755 --- a/tools/style-format.sh +++ b/tools/style-format.sh @@ -27,8 +27,8 @@ else fi clang_format_version=$(${CLANG_FORMAT} --version | sed -Ee 's/.*version ([[:digit:]]+)\.[[:digit:]]+\.[[:digit:]]+.*/\1/') -if [[ $clang_format_version != "10" ]] ; then - echo "$0: clang-format version 10, '$clang_format_version' detected" +if [[ $clang_format_version != "18" ]] ; then + echo "$0: clang-format version 18, '$clang_format_version' detected" exit 1 fi