2020import json
2121import struct
2222
23- from jsonschema import validate , ValidationError
23+ from jsonschema import validate , ValidationError , RefResolver
2424
2525from confluent_kafka .schema_registry import (_MAGIC_BYTE ,
2626 Schema ,
@@ -43,6 +43,25 @@ def __exit__(self, *args):
4343 return False
4444
4545
46+ def _resolve_named_schema (schema , schema_registry_client , named_schemas = None ):
47+ """
48+ Resolves named schemas referenced by the provided schema recursively.
49+ :param schema: Schema to resolve named schemas for.
50+ :param schema_registry_client: SchemaRegistryClient to use for retrieval.
51+ :param named_schemas: Dict of named schemas resolved recursively.
52+ :return: named_schemas dict.
53+ """
54+ if named_schemas is None :
55+ named_schemas = {}
56+ if schema .references is not None :
57+ for ref in schema .references :
58+ referenced_schema = schema_registry_client .get_version (ref .subject , ref .version )
59+ _resolve_named_schema (referenced_schema .schema , schema_registry_client , named_schemas )
60+ referenced_schema_dict = json .loads (referenced_schema .schema .schema_str )
61+ named_schemas [ref .name ] = referenced_schema_dict
62+ return named_schemas
63+
64+
4665class JSONSerializer (Serializer ):
4766 """
4867 Serializer that outputs JSON encoded data with Confluent Schema Registry framing.
@@ -122,7 +141,7 @@ class JSONSerializer(Serializer):
122141 callable with JSONSerializer.
123142
124143 Args:
125- schema_str (str): `JSON Schema definition. <https://json-schema.org/understanding-json-schema/reference/generic.html>`_
144+ schema_str (str, Schema ): `JSON Schema definition. <https://json-schema.org/understanding-json-schema/reference/generic.html>`_ Accepts schema as either a string or a `Schema`(Schema) instance. Note that string definitions cannot reference other schemas. For referencing other schemas, use a Schema instance.
126145
127146 schema_registry_client (SchemaRegistryClient): Schema Registry
128147 client instance.
@@ -134,14 +153,23 @@ class JSONSerializer(Serializer):
134153 """ # noqa: E501
135154 __slots__ = ['_hash' , '_auto_register' , '_normalize_schemas' , '_use_latest_version' ,
136155 '_known_subjects' , '_parsed_schema' , '_registry' , '_schema' , '_schema_id' ,
137- '_schema_name' , '_subject_name_func' , '_to_dict' ]
156+ '_schema_name' , '_subject_name_func' , '_to_dict' , '_are_references_provided' ]
138157
139158 _default_conf = {'auto.register.schemas' : True ,
140159 'normalize.schemas' : False ,
141160 'use.latest.version' : False ,
142161 'subject.name.strategy' : topic_subject_name_strategy }
143162
144163 def __init__ (self , schema_str , schema_registry_client , to_dict = None , conf = None ):
164+ self ._are_references_provided = False
165+ if isinstance (schema_str , str ):
166+ self ._schema = Schema (schema_str , schema_type = "JSON" )
167+ elif isinstance (schema_str , Schema ):
168+ self ._schema = schema_str
169+ self ._are_references_provided = bool (schema_str .references )
170+ else :
171+ raise TypeError ('You must pass either str or Schema' )
172+
145173 self ._registry = schema_registry_client
146174 self ._schema_id = None
147175 self ._known_subjects = set ()
@@ -178,14 +206,13 @@ def __init__(self, schema_str, schema_registry_client, to_dict=None, conf=None):
178206 raise ValueError ("Unrecognized properties: {}"
179207 .format (", " .join (conf_copy .keys ())))
180208
181- schema_dict = json .loads (schema_str )
209+ schema_dict = json .loads (self . _schema . schema_str )
182210 schema_name = schema_dict .get ('title' , None )
183211 if schema_name is None :
184212 raise ValueError ("Missing required JSON schema annotation title" )
185213
186214 self ._schema_name = schema_name
187215 self ._parsed_schema = schema_dict
188- self ._schema = Schema (schema_str , schema_type = "JSON" )
189216
190217 def __call__ (self , obj , ctx ):
191218 """
@@ -238,7 +265,14 @@ def __call__(self, obj, ctx):
238265 value = obj
239266
240267 try :
241- validate (instance = value , schema = self ._parsed_schema )
268+ if self ._are_references_provided :
269+ named_schemas = _resolve_named_schema (self ._schema , self ._registry )
270+ validate (instance = value , schema = self ._parsed_schema ,
271+ resolver = RefResolver (self ._parsed_schema .get ('$id' ),
272+ self ._parsed_schema ,
273+ store = named_schemas ))
274+ else :
275+ validate (instance = value , schema = self ._parsed_schema )
242276 except ValidationError as ve :
243277 raise SerializationError (ve .message )
244278
@@ -258,16 +292,32 @@ class JSONDeserializer(Deserializer):
258292 framing.
259293
260294 Args:
261- schema_str (str): `JSON schema definition <https://json-schema.org/understanding-json-schema/reference/generic.html>`_ use for validating records .
295+ schema_str (str, Schema ): `JSON schema definition <https://json-schema.org/understanding-json-schema/reference/generic.html>`_ Accepts schema as either a string or a `Schema`(Schema) instance. Note that string definitions cannot reference other schemas. For referencing other schemas, use a Schema instance .
262296
263297 from_dict (callable, optional): Callable(dict, SerializationContext) -> object.
264298 Converts a dict to a Python object instance.
299+
300+ schema_registry_client (SchemaRegistryClient, optional): Schema Registry client instance. Needed if ``schema_str`` is a schema referencing other schemas.
265301 """ # noqa: E501
266302
267- __slots__ = ['_parsed_schema' , '_from_dict' ]
303+ __slots__ = ['_parsed_schema' , '_from_dict' , '_registry' , '_are_references_provided' , '_schema' ]
304+
305+ def __init__ (self , schema_str , from_dict = None , schema_registry_client = None ):
306+ self ._are_references_provided = False
307+ if isinstance (schema_str , str ):
308+ schema = Schema (schema_str , schema_type = "JSON" )
309+ elif isinstance (schema_str , Schema ):
310+ schema = schema_str
311+ self ._are_references_provided = bool (schema_str .references )
312+ if self ._are_references_provided and schema_registry_client is None :
313+ raise ValueError (
314+ """schema_registry_client must be provided if "schema_str" is a Schema instance with references""" )
315+ else :
316+ raise TypeError ('You must pass either str or Schema' )
268317
269- def __init__ (self , schema_str , from_dict = None ):
270- self ._parsed_schema = json .loads (schema_str )
318+ self ._parsed_schema = json .loads (schema .schema_str )
319+ self ._schema = schema
320+ self ._registry = schema_registry_client
271321
272322 if from_dict is not None and not callable (from_dict ):
273323 raise ValueError ("from_dict must be callable with the signature"
@@ -313,7 +363,14 @@ def __call__(self, data, ctx):
313363 obj_dict = json .loads (payload .read ())
314364
315365 try :
316- validate (instance = obj_dict , schema = self ._parsed_schema )
366+ if self ._are_references_provided :
367+ named_schemas = _resolve_named_schema (self ._schema , self ._registry )
368+ validate (instance = obj_dict ,
369+ schema = self ._parsed_schema , resolver = RefResolver (self ._parsed_schema .get ('$id' ),
370+ self ._parsed_schema ,
371+ store = named_schemas ))
372+ else :
373+ validate (instance = obj_dict , schema = self ._parsed_schema )
317374 except ValidationError as ve :
318375 raise SerializationError (ve .message )
319376
0 commit comments