@@ -207,9 +207,6 @@ def set(self, schema_id, schema, subject_name=None):
207207 schema (Schema): Schema instance
208208
209209 subject_name(str): Optional, subject schema is registered under
210-
211- Returns:
212- int: The schema_id
213210 """
214211
215212 with self .lock :
@@ -229,7 +226,8 @@ def get_schema(self, schema_id):
229226 Schema: The schema if known; else None
230227 """
231228
232- return self .schema_id_index .get (schema_id , None )
229+ with self .lock :
230+ return self .schema_id_index .get (schema_id , None )
233231
234232 def get_schema_id_by_subject (self , subject , schema ):
235233 """
@@ -249,6 +247,73 @@ def get_schema_id_by_subject(self, subject, schema):
249247 return self .schema_index .get (schema , None )
250248
251249
250+ class _RegisteredSchemaCache (object ):
251+ """
252+ Thread-safe cache for use with the Schema Registry Client.
253+
254+ This cache may be used to retrieve registered schemas based on subject_name/version/schema
255+ - Get registered schema based on subject name + version
256+ - Get registered schema based on subject name + schema
257+ """
258+
259+ def __init__ (self ):
260+ self .lock = Lock ()
261+ self .schema_version_index = defaultdict (dict )
262+ self .schema_index = defaultdict (dict )
263+
264+ def set (self , subject_name , schema , version , registered_schema ):
265+ """
266+ Add a Schema identified by schema_id to the cache.
267+
268+ Args:
269+ subject_name (str): The subject name this registered schema is associated with
270+
271+ schema (Schema): The schema this registered schema is associated with
272+
273+ version (int): The version this registered schema is associated with
274+
275+ registered_schema (RegisteredSchema): The registered schema instance
276+ """
277+
278+ with self .lock :
279+ if schema is not None :
280+ self .schema_index [subject_name ][schema ] = registered_schema
281+ elif version is not None :
282+ self .schema_version_index [subject_name ][version ] = registered_schema
283+
284+ def get_registered_schema_by_version (self , subject_name , version ):
285+ """
286+ Get the registered schema instance associated with version from the cache.
287+
288+ Args:
289+ subject_name (str): The subject name this registered schema is associated with
290+
291+ version (int): The version this registered schema is associated with
292+
293+ Returns:
294+ RegisteredSchema: The registered schema if known; else None
295+ """
296+
297+ with self .lock :
298+ return self .schema_version_index .get (subject_name , {}).get (version , None )
299+
300+ def get_registered_schema_by_schema (self , subject_name , schema ):
301+ """
302+ Get the registered schema instance associated with schema from the cache.
303+
304+ Args:
305+ subject_name (str): The subject name this registered schema is associated with
306+
307+ schema (Schema): The schema this registered schema is associated with
308+
309+ Returns:
310+ RegisteredSchema: The registered schema if known; else None
311+ """
312+
313+ with self .lock :
314+ return self .schema_index .get (subject_name , {}).get (schema , None )
315+
316+
252317class SchemaRegistryClient (object ):
253318 """
254319 A Confluent Schema Registry client.
@@ -292,6 +357,7 @@ class SchemaRegistryClient(object):
292357 def __init__ (self , conf ):
293358 self ._rest_client = _RestClient (conf )
294359 self ._cache = _SchemaCache ()
360+ self ._metadata_cache = _RegisteredSchemaCache ()
295361
296362 def __enter__ (self ):
297363 return self
@@ -398,6 +464,10 @@ def lookup_schema(self, subject_name, schema, normalize_schemas=False):
398464 `POST Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions>`_
399465 """ # noqa: E501
400466
467+ registered_schema = self ._metadata_cache .get_registered_schema_by_schema (subject_name , schema )
468+ if registered_schema is not None :
469+ return registered_schema
470+
401471 request = {'schema' : schema .schema_str }
402472
403473 # CP 5.5 adds new fields (for JSON and Protobuf).
@@ -414,17 +484,25 @@ def lookup_schema(self, subject_name, schema, normalize_schemas=False):
414484
415485 schema_type = response .get ('schemaType' , 'AVRO' )
416486
417- return RegisteredSchema (schema_id = response ['id' ],
418- schema = Schema (response ['schema' ],
419- schema_type ,
420- [
421- SchemaReference (name = ref ['name' ],
422- subject = ref ['subject' ],
423- version = ref ['version' ])
424- for ref in response .get ('references' , [])
425- ]),
426- subject = response ['subject' ],
427- version = response ['version' ])
487+ registered_schema = RegisteredSchema (
488+ schema_id = response ['id' ],
489+ schema = Schema (
490+ response ['schema' ],
491+ schema_type ,
492+ [
493+ SchemaReference (
494+ name = ref ['name' ],
495+ subject = ref ['subject' ],
496+ version = ref ['version' ]
497+ ) for ref in response .get ('references' , [])
498+ ]
499+ ),
500+ subject = response ['subject' ],
501+ version = response ['version' ]
502+ )
503+ self ._metadata_cache .set (subject_name , schema , None , registered_schema )
504+
505+ return registered_schema
428506
429507 def get_subjects (self ):
430508 """
@@ -524,22 +602,34 @@ def get_version(self, subject_name, version):
524602 `GET Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)>`_
525603 """ # noqa: E501
526604
605+ registered_schema = self ._metadata_cache .get_registered_schema_by_version (subject_name , version )
606+ if registered_schema is not None :
607+ return registered_schema
608+
527609 response = self ._rest_client .get ('subjects/{}/versions/{}'
528610 .format (_urlencode (subject_name ),
529611 version ))
530612
531613 schema_type = response .get ('schemaType' , 'AVRO' )
532- return RegisteredSchema (schema_id = response ['id' ],
533- schema = Schema (response ['schema' ],
534- schema_type ,
535- [
536- SchemaReference (name = ref ['name' ],
537- subject = ref ['subject' ],
538- version = ref ['version' ])
539- for ref in response .get ('references' , [])
540- ]),
541- subject = response ['subject' ],
542- version = response ['version' ])
614+ registered_schema = RegisteredSchema (
615+ schema_id = response ['id' ],
616+ schema = Schema (
617+ response ['schema' ],
618+ schema_type ,
619+ [
620+ SchemaReference (
621+ name = ref ['name' ],
622+ subject = ref ['subject' ],
623+ version = ref ['version' ]
624+ ) for ref in response .get ('references' , [])
625+ ]
626+ ),
627+ subject = response ['subject' ],
628+ version = response ['version' ]
629+ )
630+ self ._metadata_cache .set (subject_name , None , version , registered_schema )
631+
632+ return registered_schema
543633
544634 def get_versions (self , subject_name ):
545635 """
0 commit comments