@@ -64,21 +64,20 @@ def __init__(
64
64
key_schema : str | None = None ,
65
65
key_output_serializer : Any | None = None ,
66
66
):
67
- # Validate schema requirements for value
68
- if value_schema_type in ["AVRO" , "PROTOBUF" ] and value_schema is None :
69
- raise KafkaConsumerMissingSchemaError (
70
- f"value_schema must be provided when value_schema_type is { value_schema_type } " ,
71
- )
72
-
73
- # Validate schema requirements for key
74
- if key_schema_type in ["AVRO" , "PROTOBUF" ] and key_schema is None :
75
- raise KafkaConsumerMissingSchemaError (
76
- f"value_schema must be provided when key_schema_type is { key_schema_type } " ,
77
- )
67
+ # Validate schema requirements
68
+ self ._validate_schema_requirements (value_schema_type , value_schema , "value" )
69
+ self ._validate_schema_requirements (key_schema_type , key_schema , "key" )
78
70
79
71
self .value_schema_type = value_schema_type
80
72
self .value_schema = value_schema
81
73
self .value_output_serializer = value_output_serializer
82
74
self .key_schema_type = key_schema_type
83
75
self .key_schema = key_schema
84
76
self .key_output_serializer = key_output_serializer
77
+
78
+ def _validate_schema_requirements (self , schema_type : str | None , schema : str | None , prefix : str ) -> None :
79
+ """Validate that schema is provided when required by schema_type."""
80
+ if schema_type in ["AVRO" , "PROTOBUF" ] and schema is None :
81
+ raise KafkaConsumerMissingSchemaError (
82
+ f"{ prefix } _schema must be provided when { prefix } _schema_type is { schema_type } " ,
83
+ )
0 commit comments