diff --git a/tests/common/__init__.py b/tests/common/__init__.py index 3d9ec5c7a..0bf9c54ea 100644 --- a/tests/common/__init__.py +++ b/tests/common/__init__.py @@ -55,22 +55,28 @@ def use_group_protocol_consumer(): @staticmethod def update_conf_group_protocol(conf=None): - if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer(): + if TestUtils.can_upgrade_group_protocol_to_consumer(conf): conf['group.protocol'] = 'consumer' + @staticmethod + def can_upgrade_group_protocol_to_consumer(conf): + return (conf is not None and 'group.id' in conf and + 'group.protocol' not in conf and TestUtils.use_group_protocol_consumer()) + @staticmethod def remove_forbidden_conf_group_protocol_consumer(conf): - if conf is None: + if (conf is None or + not TestUtils.use_group_protocol_consumer() or + conf.get('group.protocol', 'consumer') != 'consumer'): return - if TestUtils.use_group_protocol_consumer(): - forbidden_conf_properties = ["session.timeout.ms", - "partition.assignment.strategy", - "heartbeat.interval.ms", - "group.protocol.type"] - for prop in forbidden_conf_properties: - if prop in conf: - print("Skipping setting forbidden configuration {prop} for `CONSUMER` protocol") - del conf[prop] + forbidden_conf_properties = ["session.timeout.ms", + "partition.assignment.strategy", + "heartbeat.interval.ms", + "group.protocol.type"] + for prop in forbidden_conf_properties: + if prop in conf: + print(f"Skipping setting forbidden configuration {prop} for `CONSUMER` protocol") + del conf[prop] class TestConsumer(Consumer): diff --git a/tests/integration/admin/test_basic_operations.py b/tests/integration/admin/test_basic_operations.py index 9889409de..7efa92855 100644 --- a/tests/integration/admin/test_basic_operations.py +++ b/tests/integration/admin/test_basic_operations.py @@ -208,7 +208,7 @@ def test_basic_operations(kafka_cluster): # Second iteration: create topic. # for validate in (True, False): - our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, + our_topic = kafka_cluster.create_topic_and_wait_propagation(topic_prefix, { "num_partitions": num_partitions, "config": topic_config, diff --git a/tests/integration/admin/test_delete_records.py b/tests/integration/admin/test_delete_records.py index 2c58e36c4..2717013a6 100644 --- a/tests/integration/admin/test_delete_records.py +++ b/tests/integration/admin/test_delete_records.py @@ -25,7 +25,7 @@ def test_delete_records(kafka_cluster): admin_client = kafka_cluster.admin() # Create a topic with a single partition - topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records", + topic = kafka_cluster.create_topic_and_wait_propagation("test-del-records", { "num_partitions": 1, "replication_factor": 1, @@ -73,12 +73,12 @@ def test_delete_records_multiple_topics_and_partitions(kafka_cluster): admin_client = kafka_cluster.admin() num_partitions = 3 # Create two topics with a single partition - topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records", + topic = kafka_cluster.create_topic_and_wait_propagation("test-del-records", { "num_partitions": num_partitions, "replication_factor": 1, }) - topic2 = kafka_cluster.create_topic_and_wait_propogation("test-del-records2", + topic2 = kafka_cluster.create_topic_and_wait_propagation("test-del-records2", { "num_partitions": num_partitions, "replication_factor": 1, diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index 3bfbfd88a..12f366eeb 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -204,7 +204,7 @@ def test_describe_operations(sasl_cluster): # Create Topic topic_config = {"compression.type": "gzip"} - our_topic = sasl_cluster.create_topic_and_wait_propogation(topic_prefix, + our_topic = sasl_cluster.create_topic_and_wait_propagation(topic_prefix, { "num_partitions": 1, "config": topic_config, diff --git a/tests/integration/admin/test_incremental_alter_configs.py b/tests/integration/admin/test_incremental_alter_configs.py index 43e43ebee..84ca8d4bb 100644 --- a/tests/integration/admin/test_incremental_alter_configs.py +++ b/tests/integration/admin/test_incremental_alter_configs.py @@ -56,13 +56,13 @@ def test_incremental_alter_configs(kafka_cluster): num_partitions = 2 topic_config = {"compression.type": "gzip"} - our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, + our_topic = kafka_cluster.create_topic_and_wait_propagation(topic_prefix, { "num_partitions": num_partitions, "config": topic_config, "replication_factor": 1, }) - our_topic2 = kafka_cluster.create_topic_and_wait_propogation(topic_prefix2, + our_topic2 = kafka_cluster.create_topic_and_wait_propagation(topic_prefix2, { "num_partitions": num_partitions, "config": topic_config, diff --git a/tests/integration/admin/test_list_offsets.py b/tests/integration/admin/test_list_offsets.py index f1448b267..b1189aa27 100644 --- a/tests/integration/admin/test_list_offsets.py +++ b/tests/integration/admin/test_list_offsets.py @@ -27,7 +27,7 @@ def test_list_offsets(kafka_cluster): admin_client = kafka_cluster.admin() # Create a topic with a single partition - topic = kafka_cluster.create_topic_and_wait_propogation("test-topic-verify-list-offsets", + topic = kafka_cluster.create_topic_and_wait_propagation("test-topic-verify-list-offsets", { "num_partitions": 1, "replication_factor": 1, diff --git a/tests/integration/cluster_fixture.py b/tests/integration/cluster_fixture.py index 0d441ca1d..a57cd6562 100644 --- a/tests/integration/cluster_fixture.py +++ b/tests/integration/cluster_fixture.py @@ -234,9 +234,22 @@ def create_topic(self, prefix, conf=None, **create_topic_kwargs): future_topic.get(name).result() return name - def create_topic_and_wait_propogation(self, prefix, conf=None, **create_topic_kwargs): + def delete_topic(self, topic): """ - Creates a new topic with this cluster. Wait for the topic to be propogated to all brokers. + Deletes a topic with this cluster. + + :param str topic: topic name + """ + future = self.admin().delete_topics([topic]) + try: + future.get(topic).result() + print("Topic {} deleted".format(topic)) + except Exception as e: + print("Failed to delete topic {}: {}".format(topic, e)) + + def create_topic_and_wait_propagation(self, prefix, conf=None, **create_topic_kwargs): + """ + Creates a new topic with this cluster. Wait for the topic to be propagated to all brokers. :param str prefix: topic name :param dict conf: additions/overrides to topic configuration. @@ -245,7 +258,7 @@ def create_topic_and_wait_propogation(self, prefix, conf=None, **create_topic_kw """ name = self.create_topic(prefix, conf, **create_topic_kwargs) - # wait for topic propogation across all the brokers. + # wait for topic propagation across all the brokers. # FIXME: find a better way to wait for topic creation # for all languages, given option to send request to # a specific broker isn't present everywhere. diff --git a/tests/integration/consumer/test_consumer_error.py b/tests/integration/consumer/test_consumer_error.py index 6376b32d2..2510315ea 100644 --- a/tests/integration/consumer/test_consumer_error.py +++ b/tests/integration/consumer/test_consumer_error.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. # import pytest @@ -29,7 +29,7 @@ def test_consume_error(kafka_cluster): Tests to ensure librdkafka errors are propagated as an instance of ConsumeError. """ - topic = kafka_cluster.create_topic_and_wait_propogation("test_commit_transaction") + topic = kafka_cluster.create_topic_and_wait_propagation("test_commit_transaction") consumer_conf = {'group.id': 'pytest', 'enable.partition.eof': True} producer = kafka_cluster.producer() @@ -59,7 +59,7 @@ def test_consume_error_commit(kafka_cluster): """ Tests to ensure that we handle messages with errors when commiting. """ - topic = kafka_cluster.create_topic_and_wait_propogation("test_commit_transaction") + topic = kafka_cluster.create_topic_and_wait_propagation("test_commit_transaction") consumer_conf = {'group.id': 'pytest', 'session.timeout.ms': 100} @@ -91,7 +91,7 @@ def test_consume_error_store_offsets(kafka_cluster): """ Tests to ensure that we handle messages with errors when storing offsets. """ - topic = kafka_cluster.create_topic_and_wait_propogation("test_commit_transaction") + topic = kafka_cluster.create_topic_and_wait_propagation("test_commit_transaction") consumer_conf = {'group.id': 'pytest', 'session.timeout.ms': 100, 'enable.auto.offset.store': True, diff --git a/tests/integration/consumer/test_consumer_memberid.py b/tests/integration/consumer/test_consumer_memberid.py index 68d545fcf..445b420bb 100644 --- a/tests/integration/consumer/test_consumer_memberid.py +++ b/tests/integration/consumer/test_consumer_memberid.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. import pytest from tests.common import TestUtils @@ -28,7 +28,7 @@ def test_consumer_memberid(kafka_cluster): topic = "testmemberid" - kafka_cluster.create_topic_and_wait_propogation(topic) + kafka_cluster.create_topic_and_wait_propagation(topic) consumer = kafka_cluster.consumer(consumer_conf) diff --git a/tests/integration/consumer/test_consumer_topicpartition_metadata.py b/tests/integration/consumer/test_consumer_topicpartition_metadata.py index a8c3734c4..923a13ab6 100644 --- a/tests/integration/consumer/test_consumer_topicpartition_metadata.py +++ b/tests/integration/consumer/test_consumer_topicpartition_metadata.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. from confluent_kafka import TopicPartition @@ -30,7 +30,7 @@ def commit_and_check(consumer, topic, metadata): def test_consumer_topicpartition_metadata(kafka_cluster): - topic = kafka_cluster.create_topic_and_wait_propogation("test_topicpartition") + topic = kafka_cluster.create_topic_and_wait_propagation("test_topicpartition") consumer_conf = {'group.id': 'test_topicpartition'} c = kafka_cluster.consumer(consumer_conf) diff --git a/tests/integration/consumer/test_consumer_upgrade_downgrade.py b/tests/integration/consumer/test_consumer_upgrade_downgrade.py new file mode 100644 index 000000000..2be96cf22 --- /dev/null +++ b/tests/integration/consumer/test_consumer_upgrade_downgrade.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from enum import Enum +from confluent_kafka import ConsumerGroupType, KafkaException +from tests.common import TestUtils + +topic_prefix = "test_consumer_upgrade_downgrade_" +number_of_partitions = 10 + + +def get_group_protocol_type(a, group_id): + futureMap = a.describe_consumer_groups([group_id]) + try: + future = futureMap[group_id] + g = future.result() + return g.type + except KafkaException as e: + print("Error while describing group id '{}': {}".format(group_id, e)) + except Exception: + raise + + +def check_consumer(kafka_cluster, consumers, admin_client, group_id, topic, expected_protocol): + no_of_messages = 100 + total_msg_read = 0 + expected_partitions_per_consumer = number_of_partitions // len(consumers) + while len(consumers[-1].assignment()) != expected_partitions_per_consumer: + for consumer in consumers: + consumer.poll(0.1) + + all_assignments = set() + for consumer in consumers: + assignment = consumer.assignment() + all_assignments.update(assignment) + assert len(assignment) == expected_partitions_per_consumer + assert len(all_assignments) == number_of_partitions + + assert get_group_protocol_type(admin_client, group_id) == expected_protocol + + # Produce some messages to the topic + test_data = ['test-data{}'.format(i) for i in range(0, no_of_messages)] + test_keys = ['test-key{}'.format(i) for i in range(0, no_of_messages)] # we want each partition to have data + kafka_cluster.seed_topic(topic, test_data, test_keys) + + while total_msg_read < no_of_messages: + for consumer in consumers: + # Poll for messages + msg = consumer.poll(0.1) + if msg is not None: + total_msg_read += 1 + + assert total_msg_read == no_of_messages, f"Expected to read {no_of_messages} messages, but read {total_msg_read}" + + +class Operation(Enum): + ADD = 0 + REMOVE = 1 + + +def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy( + kafka_cluster, partition_assignment_strategy): + """ + Test consumer upgrade and downgrade. + """ + topic_name_prefix = f"{topic_prefix}_{partition_assignment_strategy}" + topic = kafka_cluster.create_topic_and_wait_propagation(topic_name_prefix, + { + "num_partitions": number_of_partitions + }) + admin_client = kafka_cluster.admin() + + consumer_conf = {'group.id': topic, + 'auto.offset.reset': 'earliest'} + consumer_conf_classic = { + 'group.protocol': 'classic', + 'partition.assignment.strategy': partition_assignment_strategy, + **consumer_conf + } + consumer_conf_consumer = { + 'group.protocol': 'consumer', + **consumer_conf + } + + test_scenarios = [(Operation.ADD, consumer_conf_classic, ConsumerGroupType.CLASSIC), + (Operation.ADD, consumer_conf_consumer, ConsumerGroupType.CONSUMER), + (Operation.REMOVE, None, ConsumerGroupType.CONSUMER), + (Operation.ADD, consumer_conf_classic, ConsumerGroupType.CONSUMER), + (Operation.REMOVE, None, ConsumerGroupType.CLASSIC)] + consumers = [] + + for operation, conf, expected_protocol in test_scenarios: + if operation == Operation.ADD: + consumer = kafka_cluster.consumer(conf) + assert consumer is not None + consumer.subscribe([topic]) + consumers.append(consumer) + elif operation == Operation.REMOVE: + consumer_to_remove = consumers.pop(0) + consumer_to_remove.close() + check_consumer(kafka_cluster, consumers, admin_client, topic, topic, expected_protocol) + + assert len(consumers) == 1 + consumers[0].close() + kafka_cluster.delete_topic(topic) + + +@pytest.mark.skipif(not TestUtils.use_group_protocol_consumer(), + reason="Skipping test as group protocol consumer is not enabled") +def test_consumer_upgrade_downgrade(kafka_cluster): + perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'roundrobin') + perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'range') + perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'cooperative-sticky') diff --git a/tests/integration/consumer/test_cooperative_rebalance_1.py b/tests/integration/consumer/test_cooperative_rebalance_1.py index 2645c7028..f3ffb4bfb 100644 --- a/tests/integration/consumer/test_cooperative_rebalance_1.py +++ b/tests/integration/consumer/test_cooperative_rebalance_1.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. import pytest import time @@ -60,8 +60,8 @@ def on_lost(self, consumer, partitions): reb = RebalanceState() - topic1 = kafka_cluster.create_topic_and_wait_propogation("topic1") - topic2 = kafka_cluster.create_topic_and_wait_propogation("topic2") + topic1 = kafka_cluster.create_topic_and_wait_propagation("topic1") + topic2 = kafka_cluster.create_topic_and_wait_propagation("topic2") consumer = kafka_cluster.consumer(consumer_conf) diff --git a/tests/integration/consumer/test_cooperative_rebalance_2.py b/tests/integration/consumer/test_cooperative_rebalance_2.py index f3a6df91b..be9fd490b 100644 --- a/tests/integration/consumer/test_cooperative_rebalance_2.py +++ b/tests/integration/consumer/test_cooperative_rebalance_2.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. import pytest import time @@ -52,7 +52,7 @@ def on_revoke(self, consumer, partitions): reb = RebalanceState() - topic1 = kafka_cluster.create_topic_and_wait_propogation("topic1") + topic1 = kafka_cluster.create_topic_and_wait_propagation("topic1") consumer = kafka_cluster.consumer(consumer_conf) diff --git a/tests/integration/consumer/test_incremental_assign.py b/tests/integration/consumer/test_incremental_assign.py index e2d9e8941..94e0839b8 100644 --- a/tests/integration/consumer/test_incremental_assign.py +++ b/tests/integration/consumer/test_incremental_assign.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. import pytest from uuid import uuid1 @@ -30,8 +30,8 @@ def test_incremental_assign(kafka_cluster): 'enable.auto.commit': 'false', 'auto.offset.reset': 'error'} - topic1 = kafka_cluster.create_topic_and_wait_propogation("topic1") - topic2 = kafka_cluster.create_topic_and_wait_propogation("topic2") + topic1 = kafka_cluster.create_topic_and_wait_propagation("topic1") + topic2 = kafka_cluster.create_topic_and_wait_propagation("topic2") kafka_cluster.seed_topic(topic1, value_source=[b'a']) kafka_cluster.seed_topic(topic2, value_source=[b'b']) diff --git a/tests/integration/producer/test_transactions.py b/tests/integration/producer/test_transactions.py index ff83fea70..60f272c0b 100644 --- a/tests/integration/producer/test_transactions.py +++ b/tests/integration/producer/test_transactions.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. # import inspect import sys @@ -51,7 +51,7 @@ def delivery_err(err, msg): def test_commit_transaction(kafka_cluster): - output_topic = kafka_cluster.create_topic_and_wait_propogation("output_topic") + output_topic = kafka_cluster.create_topic_and_wait_propagation("output_topic") producer = kafka_cluster.producer({ 'transactional.id': 'example_transactional_id', @@ -66,7 +66,7 @@ def test_commit_transaction(kafka_cluster): def test_abort_transaction(kafka_cluster): - output_topic = kafka_cluster.create_topic_and_wait_propogation("output_topic") + output_topic = kafka_cluster.create_topic_and_wait_propagation("output_topic") producer = kafka_cluster.producer({ 'transactional.id': 'example_transactional_id', @@ -81,7 +81,7 @@ def test_abort_transaction(kafka_cluster): def test_abort_retry_commit_transaction(kafka_cluster): - output_topic = kafka_cluster.create_topic_and_wait_propogation("output_topic") + output_topic = kafka_cluster.create_topic_and_wait_propagation("output_topic") producer = kafka_cluster.producer({ 'transactional.id': 'example_transactional_id', @@ -99,8 +99,8 @@ def test_abort_retry_commit_transaction(kafka_cluster): def test_send_offsets_committed_transaction(kafka_cluster): - input_topic = kafka_cluster.create_topic_and_wait_propogation("input_topic") - output_topic = kafka_cluster.create_topic_and_wait_propogation("output_topic") + input_topic = kafka_cluster.create_topic_and_wait_propagation("input_topic") + output_topic = kafka_cluster.create_topic_and_wait_propagation("output_topic") error_cb = prefixed_error_cb('test_send_offsets_committed_transaction') producer = kafka_cluster.producer({ 'client.id': 'producer1', diff --git a/tests/integration/schema_registry/_async/test_avro_serializers.py b/tests/integration/schema_registry/_async/test_avro_serializers.py index 9cc871277..e6ba25489 100644 --- a/tests/integration/schema_registry/_async/test_avro_serializers.py +++ b/tests/integration/schema_registry/_async/test_avro_serializers.py @@ -151,7 +151,7 @@ async def _references_test_common(kafka_cluster, awarded_user, serializer_schema Args: kafka_cluster (KafkaClusterFixture): cluster fixture """ - topic = kafka_cluster.create_topic_and_wait_propogation("reference-avro") + topic = kafka_cluster.create_topic_and_wait_propagation("reference-avro") sr = kafka_cluster.async_schema_registry() value_serializer = await AsyncAvroSerializer( @@ -223,7 +223,7 @@ async def test_avro_record_serialization(kafka_cluster, load_file, avsc, data, r data (object): data to be serialized """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-avro") sr = kafka_cluster.async_schema_registry() schema_str = load_file(avsc) @@ -267,7 +267,7 @@ async def test_delivery_report_serialization(kafka_cluster, load_file, avsc, dat data (object): data to be serialized """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro-dr") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-avro-dr") sr = kafka_cluster.async_schema_registry() schema_str = load_file(avsc) @@ -314,7 +314,7 @@ async def test_avro_record_serialization_custom(kafka_cluster): kafka_cluster (KafkaClusterFixture): cluster fixture """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-avro") sr = kafka_cluster.async_schema_registry() user = User('Bowie', 47, 'purple') diff --git a/tests/integration/schema_registry/_async/test_json_serializers.py b/tests/integration/schema_registry/_async/test_json_serializers.py index 464b41836..ab1f015a3 100644 --- a/tests/integration/schema_registry/_async/test_json_serializers.py +++ b/tests/integration/schema_registry/_async/test_json_serializers.py @@ -257,7 +257,7 @@ async def test_json_record_serialization(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.async_schema_registry() schema_str = load_file("product.json") @@ -305,7 +305,7 @@ async def test_json_record_serialization_incompatible(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.async_schema_registry() schema_str = load_file("product.json") @@ -332,7 +332,7 @@ async def test_json_record_serialization_custom(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.async_schema_registry() schema_str = load_file("product.json") @@ -377,7 +377,7 @@ async def test_json_record_deserialization_mismatch(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.async_schema_registry() schema_str = load_file("contractor.json") @@ -418,7 +418,7 @@ async def _register_referenced_schemas(sr: AsyncSchemaRegistryClient, load_file) async def test_json_reference(kafka_cluster, load_file): - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.async_schema_registry() product = {"productId": 1, @@ -457,7 +457,7 @@ async def test_json_reference(kafka_cluster, load_file): async def test_json_reference_custom(kafka_cluster, load_file): - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.async_schema_registry() product = _TestProduct(product_id=1, diff --git a/tests/integration/schema_registry/_async/test_proto_serializers.py b/tests/integration/schema_registry/_async/test_proto_serializers.py index 0e65686e2..5dade6f81 100644 --- a/tests/integration/schema_registry/_async/test_proto_serializers.py +++ b/tests/integration/schema_registry/_async/test_proto_serializers.py @@ -52,7 +52,7 @@ async def test_protobuf_message_serialization(kafka_cluster, pb2, data): Validates that we get the same message back that we put in. """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto") sr = kafka_cluster.async_schema_registry() value_serializer = await AsyncProtobufSerializer(pb2, sr, {'use.deprecated.format': False}) @@ -85,7 +85,7 @@ async def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs """ sr = kafka_cluster.async_schema_registry() - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto-refs") serializer = await AsyncProtobufSerializer(pb2, sr, {'use.deprecated.format': False}) producer = kafka_cluster.async_producer(key_serializer=serializer) @@ -106,7 +106,7 @@ async def test_protobuf_serializer_type_mismatch(kafka_cluster): pb2_2 = NestedTestProto_pb2.NestedMessage sr = kafka_cluster.async_schema_registry() - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto-refs") serializer = await AsyncProtobufSerializer(pb2_1, sr, {'use.deprecated.format': False}) producer = kafka_cluster.async_producer(key_serializer=serializer) @@ -127,7 +127,7 @@ async def test_protobuf_deserializer_type_mismatch(kafka_cluster): pb2_2 = metadata_proto_pb2.HDFSOptions sr = kafka_cluster.async_schema_registry() - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto-refs") serializer = await AsyncProtobufSerializer(pb2_1, sr, {'use.deprecated.format': False}) deserializer = await AsyncProtobufDeserializer(pb2_2, {'use.deprecated.format': False}) diff --git a/tests/integration/schema_registry/_sync/test_avro_serializers.py b/tests/integration/schema_registry/_sync/test_avro_serializers.py index 9385c8fa4..87a57eeec 100644 --- a/tests/integration/schema_registry/_sync/test_avro_serializers.py +++ b/tests/integration/schema_registry/_sync/test_avro_serializers.py @@ -151,7 +151,7 @@ def _references_test_common(kafka_cluster, awarded_user, serializer_schema, dese Args: kafka_cluster (KafkaClusterFixture): cluster fixture """ - topic = kafka_cluster.create_topic_and_wait_propogation("reference-avro") + topic = kafka_cluster.create_topic_and_wait_propagation("reference-avro") sr = kafka_cluster.schema_registry() value_serializer = AvroSerializer( @@ -223,7 +223,7 @@ def test_avro_record_serialization(kafka_cluster, load_file, avsc, data, record_ data (object): data to be serialized """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-avro") sr = kafka_cluster.schema_registry() schema_str = load_file(avsc) @@ -267,7 +267,7 @@ def test_delivery_report_serialization(kafka_cluster, load_file, avsc, data, rec data (object): data to be serialized """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro-dr") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-avro-dr") sr = kafka_cluster.schema_registry() schema_str = load_file(avsc) @@ -314,7 +314,7 @@ def test_avro_record_serialization_custom(kafka_cluster): kafka_cluster (KafkaClusterFixture): cluster fixture """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-avro") sr = kafka_cluster.schema_registry() user = User('Bowie', 47, 'purple') diff --git a/tests/integration/schema_registry/_sync/test_json_serializers.py b/tests/integration/schema_registry/_sync/test_json_serializers.py index 0c4a2545d..01344b23e 100644 --- a/tests/integration/schema_registry/_sync/test_json_serializers.py +++ b/tests/integration/schema_registry/_sync/test_json_serializers.py @@ -257,7 +257,7 @@ def test_json_record_serialization(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.schema_registry() schema_str = load_file("product.json") @@ -305,7 +305,7 @@ def test_json_record_serialization_incompatible(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.schema_registry() schema_str = load_file("product.json") @@ -332,7 +332,7 @@ def test_json_record_serialization_custom(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.schema_registry() schema_str = load_file("product.json") @@ -377,7 +377,7 @@ def test_json_record_deserialization_mismatch(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.schema_registry() schema_str = load_file("contractor.json") @@ -418,7 +418,7 @@ def _register_referenced_schemas(sr: SchemaRegistryClient, load_file): def test_json_reference(kafka_cluster, load_file): - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.schema_registry() product = {"productId": 1, @@ -457,7 +457,7 @@ def test_json_reference(kafka_cluster, load_file): def test_json_reference_custom(kafka_cluster, load_file): - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-json") sr = kafka_cluster.schema_registry() product = _TestProduct(product_id=1, diff --git a/tests/integration/schema_registry/_sync/test_proto_serializers.py b/tests/integration/schema_registry/_sync/test_proto_serializers.py index 9b3ca3197..20c909761 100644 --- a/tests/integration/schema_registry/_sync/test_proto_serializers.py +++ b/tests/integration/schema_registry/_sync/test_proto_serializers.py @@ -52,7 +52,7 @@ def test_protobuf_message_serialization(kafka_cluster, pb2, data): Validates that we get the same message back that we put in. """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto") sr = kafka_cluster.schema_registry() value_serializer = ProtobufSerializer(pb2, sr, {'use.deprecated.format': False}) @@ -85,7 +85,7 @@ def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs): """ sr = kafka_cluster.schema_registry() - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto-refs") serializer = ProtobufSerializer(pb2, sr, {'use.deprecated.format': False}) producer = kafka_cluster.producer(key_serializer=serializer) @@ -106,7 +106,7 @@ def test_protobuf_serializer_type_mismatch(kafka_cluster): pb2_2 = NestedTestProto_pb2.NestedMessage sr = kafka_cluster.schema_registry() - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto-refs") serializer = ProtobufSerializer(pb2_1, sr, {'use.deprecated.format': False}) producer = kafka_cluster.producer(key_serializer=serializer) @@ -127,7 +127,7 @@ def test_protobuf_deserializer_type_mismatch(kafka_cluster): pb2_2 = metadata_proto_pb2.HDFSOptions sr = kafka_cluster.schema_registry() - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-proto-refs") serializer = ProtobufSerializer(pb2_1, sr, {'use.deprecated.format': False}) deserializer = ProtobufDeserializer(pb2_2, {'use.deprecated.format': False}) diff --git a/tests/integration/serialization/test_serializers.py b/tests/integration/serialization/test_serializers.py index f8ac10d0e..fe5802b1b 100644 --- a/tests/integration/serialization/test_serializers.py +++ b/tests/integration/serialization/test_serializers.py @@ -45,7 +45,7 @@ def test_numeric_serialization(kafka_cluster, serializer, deserializer, data): data(object): input data """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-numeric") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-numeric") producer = kafka_cluster.producer(value_serializer=serializer) producer.produce(topic, value=data) @@ -77,7 +77,7 @@ def test_string_serialization(kafka_cluster, data, codec): codec (str): encoding type """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-string") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-string") producer = kafka_cluster.producer(value_serializer=StringSerializer(codec)) @@ -119,7 +119,7 @@ def test_mixed_serialization(kafka_cluster, key_serializer, value_serializer, value (object): value data """ - topic = kafka_cluster.create_topic_and_wait_propogation("serialization-numeric") + topic = kafka_cluster.create_topic_and_wait_propagation("serialization-numeric") producer = kafka_cluster.producer(key_serializer=key_serializer, value_serializer=value_serializer) diff --git a/tests/test_error.py b/tests/test_error.py index a06dfc0ad..7a502c9d6 100644 --- a/tests/test_error.py +++ b/tests/test_error.py @@ -13,7 +13,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limit +# limitations under the License. # from confluent_kafka import KafkaError diff --git a/tools/style-format.sh b/tools/style-format.sh index a686cc6f0..462d80bf1 100755 --- a/tools/style-format.sh +++ b/tools/style-format.sh @@ -27,8 +27,8 @@ else fi clang_format_version=$(${CLANG_FORMAT} --version | sed -Ee 's/.*version ([[:digit:]]+)\.[[:digit:]]+\.[[:digit:]]+.*/\1/') -if [[ $clang_format_version != "10" ]] ; then - echo "$0: clang-format version 10, '$clang_format_version' detected" +if [[ $clang_format_version != "18" ]] ; then + echo "$0: clang-format version 18, '$clang_format_version' detected" exit 1 fi