33import functools
44import logging
55import sys
6- from typing import Any , Dict , NamedTuple
6+ from typing import Any , NamedTuple
77from uuid import UUID
88
99if sys .version_info [1 ] < 8 : # for py37
@@ -109,8 +109,6 @@ class SchemaRegistryDeserializer:
109109 registries or handle schema-less data.
110110 """
111111
112- _writer_schemas : Dict [UUID , Schema ]
113-
114112 def __init__ (
115113 self ,
116114 client : SchemaRegistryClient ,
@@ -120,7 +118,6 @@ def __init__(
120118 self .client = client
121119 self .return_record_name = return_record_name
122120 self .secondary_deserializer = secondary_deserializer
123- self ._writer_schemas = {}
124121
125122 def deserialize (self , topic : str , bytes_ : bytes ):
126123 if bytes_ is None :
@@ -135,20 +132,18 @@ def deserialize(self, topic: str, bytes_: bytes):
135132 'no secondary deserializer provided to handle'
136133 ' unrecognized data encoding'
137134 ) from e
138- writer_schema = self ._writer_schemas .get (schema_version_id )
139- if not writer_schema :
140- LOG .info ('Schema version %s not found locally, fetching from'
141- ' registry...' , schema_version_id )
142- schema_version = self .client .get_schema_version (
143- version_id = schema_version_id
144- )
145- writer_schema = self ._create_writer_schema (schema_version )
146- self ._writer_schemas [schema_version_id ] = writer_schema
147- LOG .info ('Schema version %s fetched' , schema_version_id )
148- return writer_schema .read (data_bytes )
149-
150- def _create_writer_schema (self , schema_version : SchemaVersion ) -> Schema :
151- if schema_version .data_format == 'AVRO' :
152- return AvroSchema (schema_version .definition )
153- elif schema_version .data_format == 'JSON' :
135+ writer_schema_version = self ._get_schema_version (schema_version_id )
136+ writer_schema = self ._schema_for_version (writer_schema_version )
137+ return DataAndSchema (writer_schema .read (data_bytes ), writer_schema )
138+
139+ @functools .lru_cache (maxsize = None )
140+ def _get_schema_version (self , version_id : UUID ):
141+ LOG .info ('Fetching schema version %s...' , version_id )
142+ return self .client .get_schema_version (version_id )
143+
144+ @functools .lru_cache (maxsize = None )
145+ def _schema_for_version (self , version : SchemaVersion ) -> Schema :
146+ if version .data_format == 'AVRO' :
147+ return AvroSchema (version .definition )
148+ elif version .data_format == 'JSON' :
154149 raise NotImplementedError ('JSON schema not supported' )
0 commit comments