22
33import functools
44import logging
5- import sys
65from typing import Any , NamedTuple
76from uuid import UUID
87
9- if sys .version_info [1 ] < 8 : # for py37
10- from typing_extensions import Protocol
11- else :
12- from typing import Protocol
13-
148from aws_schema_registry .avro import AvroSchema
159from aws_schema_registry .client import SchemaRegistryClient
1610from aws_schema_registry .codec import decode , encode , UnknownEncodingException
@@ -33,15 +27,7 @@ class DataAndSchema(NamedTuple):
3327 schema : Schema
3428
3529
36- class Serializer (Protocol ):
37- def serialize (self , topic : str , record : DataAndSchema ): ...
38-
39-
40- class Deserializer (Protocol ):
41- def deserialize (self , topic : str , bytes_ : bytes ): ...
42-
43-
44- class SchemaRegistrySerializer :
30+ class KafkaSerializer :
4531 """Kafka serializer that uses the AWS Schema Registry.
4632
4733 Arguments:
@@ -94,7 +80,7 @@ def _get_schema_version(self, topic: str, schema: Schema) -> SchemaVersion:
9480 )
9581
9682
97- class SchemaRegistryDeserializer :
83+ class KafkaDeserializer :
9884 """Kafka serializer that uses the AWS Schema Registry.
9985
10086 Arguments:
@@ -106,14 +92,16 @@ class SchemaRegistryDeserializer:
10692 secondary_deserializer: optional deserializer to pass through
10793 to when processing values with an unrecognized encoding.
10894 This is primarily use to migrate from other schema
109- registries or handle schema-less data.
95+ registries or handle schema-less data. The secondary deserializer
96+ should either be a callable taking the same arguments as
97+ deserialize or an object with a matching deserialize method.
11098 """
11199
112100 def __init__ (
113101 self ,
114102 client : SchemaRegistryClient ,
115103 return_record_name : bool = False ,
116- secondary_deserializer : Deserializer = None
104+ secondary_deserializer = None
117105 ):
118106 self .client = client
119107 self .return_record_name = return_record_name
@@ -126,6 +114,8 @@ def deserialize(self, topic: str, bytes_: bytes):
126114 data_bytes , schema_version_id = decode (bytes_ )
127115 except UnknownEncodingException as e :
128116 if self .secondary_deserializer :
117+ if callable (self .secondary_deserializer ):
118+ return self .secondary_deserializer (topic , bytes_ )
129119 return self .secondary_deserializer .deserialize (topic , bytes_ )
130120 else :
131121 raise SchemaRegistryException (
0 commit comments