2121
2222from confluent_kafka .schema_registry import Schema
2323from confluent_kafka .schema_registry .error import SchemaRegistryError
24+ from tests .integration .conftest import kafka_cluster_fixture
25+
26+
27+ @pytest .fixture (scope = "module" )
28+ def kafka_cluster_cp_7_0_1 ():
29+ """
30+ Returns a Trivup cluster with CP version 7.0.1.
31+ SR version 7.0.1 is the last returning 500 instead of 422
32+ for the invalid schema passed to test_api_get_register_schema_invalid
33+ """
34+ for fixture in kafka_cluster_fixture (
35+ brokers_env = "BROKERS_7_0_1" ,
36+ sr_url_env = "SR_URL_7_0_1" ,
37+ trivup_cluster_conf = {'cp_version' : '7.0.1' }
38+ ):
39+ yield fixture
2440
2541
2642def _subject_name (prefix ):
@@ -157,15 +173,28 @@ def test_api_get_registration_subject_not_found(kafka_cluster, load_file):
157173 assert e .value .error_code == 40401
158174
159175
160- def test_api_get_register_schema_invalid (kafka_cluster , load_file ):
176+ @pytest .mark .parametrize ("kafka_cluster_name, http_status_code, error_code" , [
177+ ["kafka_cluster_cp_7_0_1" , 500 , 500 ],
178+ ["kafka_cluster" , 422 , 42201 ],
179+ ])
180+ def test_api_get_register_schema_invalid (
181+ kafka_cluster_name ,
182+ http_status_code ,
183+ error_code ,
184+ load_file ,
185+ request ):
161186 """
162187 Attempts to obtain registration information with an invalid schema
188+ with different CP versions.
163189
164190 Args:
165- kafka_cluster (KafkaClusterFixture): Kafka Cluster fixture
191+ kafka_cluster_name (str): name of the Kafka Cluster fixture to use
192+ http_status_code (int): HTTP status return code expected in this version
193+ error_code (int): error code expected in this version
166194 load_file (callable(str)): Schema fixture constructor
167-
195+ request (FixtureRequest): PyTest object giving access to the test context
168196 """
197+ kafka_cluster = request .getfixturevalue (kafka_cluster_name )
169198 sr = kafka_cluster .schema_registry ()
170199 subject = _subject_name ("registration_invalid_schema" )
171200 schema = Schema (load_file ('basic_schema.avsc' ), schema_type = 'AVRO' )
@@ -177,8 +206,8 @@ def test_api_get_register_schema_invalid(kafka_cluster, load_file):
177206 with pytest .raises (SchemaRegistryError , match = "Invalid schema" ) as e :
178207 sr .lookup_schema (subject , schema2 )
179208
180- assert e .value .http_status_code == 422
181- assert e .value .error_code == 42201
209+ assert e .value .http_status_code == http_status_code
210+ assert e .value .error_code == error_code
182211
183212
184213def test_api_get_subjects (kafka_cluster , load_file ):
0 commit comments