Skip to content

Commit 0aee8a0

Browse files
committed
Rename SchemaRegistry(De)Serializer
Kafka(De)Serializer describes the interface, SchemaRegistry just idenfities the implementation
1 parent f0955a2 commit 0aee8a0

File tree

6 files changed

+28
-40
lines changed

6 files changed

+28
-40
lines changed

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,15 @@ from aws_schema_registry.avro import AvroSchema
6767
# In this example we will use kafka-python as our Kafka client,
6868
# so we need to have the `kafka-python` extras installed and use
6969
# the kafka adapter.
70-
from aws_schema_registry.adapter.kafka import SchemaRegistrySerializer
70+
from aws_schema_registry.adapter.kafka import KafkaSerializer
7171
from kafka import KafkaConsumer
7272

7373
# Create the schema registry client, which is a façade around the boto3 glue client
7474
client = SchemaRegistryClient(glue_client,
7575
registry_name='my-registry')
7676

7777
# Create the serializer
78-
serializer = SchemaRegistrySerializer(client)
78+
serializer = KafkaSerializer(client)
7979

8080
# Create the producer
8181
producer = KafkaProducer(value_serializer=serializer)
@@ -91,7 +91,7 @@ data = {
9191
'favorite_number': 6
9292
}
9393
producer.send('my-topic', value=(data, schema))
94-
# the value MUST be a tuple when we're using the SchemaRegistrySerializer
94+
# the value MUST be a tuple when we're using the KafkaSerializer
9595
```
9696

9797
Read Kafka messages with `SchemaRegistryDeserializer`:
@@ -102,15 +102,15 @@ from aws_schema_registry import SchemaRegistryClient
102102
# In this example we will use kafka-python as our Kafka client,
103103
# so we need to have the `kafka-python` extras installed and use
104104
# the kafka adapter.
105-
from aws_schema_registry.adapter.kafka import SchemaRegistryDeserializer
105+
from aws_schema_registry.adapter.kafka import KafkaDeserializer
106106
from kafka import KafkaConsumer
107107

108108
# Create the schema registry client, which is a façade around the boto3 glue client
109109
client = SchemaRegistryClient(glue_client,
110110
registry_name='my-registry')
111111

112112
# Create the deserializer
113-
deserializer = SchemaRegistryDeserializer(client)
113+
deserializer = KafkaDeserializer(client)
114114

115115
# Create the consumer
116116
consumer = KafkaConsumer('my-topic', value_deserializer=deserializer)

src/aws_schema_registry/__init__.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,19 @@
33
from .schema import (
44
CompatibilityMode, DataFormat, Schema, SchemaVersion, ValidationError
55
)
6-
from .serde import (
7-
DataAndSchema, SchemaRegistryDeserializer, SchemaRegistrySerializer
8-
)
6+
from .serde import DataAndSchema, KafkaDeserializer, KafkaSerializer
97

108
__version__ = '1.0.0rc5'
119

1210
__all__ = [
1311
'CompatibilityMode',
1412
'DataAndSchema',
1513
'DataFormat',
14+
'KafkaDeserializer',
15+
'KafkaSerializer',
1616
'Schema',
1717
'SchemaRegistryClient',
18-
'SchemaRegistryDeserializer',
1918
'SchemaRegistryException',
20-
'SchemaRegistrySerializer',
2119
'SchemaVersion',
2220
'ValidationError'
2321
]

src/aws_schema_registry/adapter/kafka.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,22 @@
66
from kafka import Serializer, Deserializer
77

88
from aws_schema_registry import (
9-
SchemaRegistrySerializer as _SchemaRegistrySerializer,
10-
SchemaRegistryDeserializer as _SchemaRegistryDeserializer
9+
KafkaSerializer as _KafkaSerializer,
10+
KafkaDeserializer as _KafkaDeserializer
1111
)
1212

1313

14-
class SchemaRegistrySerializer(Serializer):
14+
class KafkaSerializer(Serializer):
1515
def __init__(self, *args, **kwargs):
16-
self._serializer = _SchemaRegistrySerializer(*args, **kwargs)
16+
self._serializer = _KafkaSerializer(*args, **kwargs)
1717

1818
def serialize(self, topic, value):
1919
return self._serializer.serialize(topic, value)
2020

2121

22-
class SchemaRegistryDeserializer(Deserializer):
22+
class KafkaDeserializer(Deserializer):
2323
def __init__(self, *args, **kwargs):
24-
self._deserializer = _SchemaRegistryDeserializer(*args, **kwargs)
24+
self._deserializer = _KafkaDeserializer(*args, **kwargs)
2525

2626
def deserialize(self, topic, bytes_):
2727
return self._deserializer.deserialize(topic, bytes_)

src/aws_schema_registry/serde.py

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,9 @@
22

33
import functools
44
import logging
5-
import sys
65
from typing import Any, NamedTuple
76
from uuid import UUID
87

9-
if sys.version_info[1] < 8: # for py37
10-
from typing_extensions import Protocol
11-
else:
12-
from typing import Protocol
13-
148
from aws_schema_registry.avro import AvroSchema
159
from aws_schema_registry.client import SchemaRegistryClient
1610
from aws_schema_registry.codec import decode, encode, UnknownEncodingException
@@ -33,15 +27,7 @@ class DataAndSchema(NamedTuple):
3327
schema: Schema
3428

3529

36-
class Serializer(Protocol):
37-
def serialize(self, topic: str, record: DataAndSchema): ...
38-
39-
40-
class Deserializer(Protocol):
41-
def deserialize(self, topic: str, bytes_: bytes): ...
42-
43-
44-
class SchemaRegistrySerializer:
30+
class KafkaSerializer:
4531
"""Kafka serializer that uses the AWS Schema Registry.
4632
4733
Arguments:
@@ -94,7 +80,7 @@ def _get_schema_version(self, topic: str, schema: Schema) -> SchemaVersion:
9480
)
9581

9682

97-
class SchemaRegistryDeserializer:
83+
class KafkaDeserializer:
9884
"""Kafka serializer that uses the AWS Schema Registry.
9985
10086
Arguments:
@@ -106,14 +92,16 @@ class SchemaRegistryDeserializer:
10692
secondary_deserializer: optional deserializer to pass through
10793
to when processing values with an unrecognized encoding.
10894
This is primarily use to migrate from other schema
109-
registries or handle schema-less data.
95+
registries or handle schema-less data. The secondary deserializer
96+
should either be a callable taking the same arguments as
97+
deserialize or an object with a matching deserialize method.
11098
"""
11199

112100
def __init__(
113101
self,
114102
client: SchemaRegistryClient,
115103
return_record_name: bool = False,
116-
secondary_deserializer: Deserializer = None
104+
secondary_deserializer=None
117105
):
118106
self.client = client
119107
self.return_record_name = return_record_name
@@ -126,6 +114,8 @@ def deserialize(self, topic: str, bytes_: bytes):
126114
data_bytes, schema_version_id = decode(bytes_)
127115
except UnknownEncodingException as e:
128116
if self.secondary_deserializer:
117+
if callable(self.secondary_deserializer):
118+
return self.secondary_deserializer(topic, bytes_)
129119
return self.secondary_deserializer.deserialize(topic, bytes_)
130120
else:
131121
raise SchemaRegistryException(

tests/integration/java/test_java_integration.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from aws_schema_registry import DataAndSchema, SchemaRegistryClient
66
from aws_schema_registry.avro import AvroSchema
77
from aws_schema_registry.adapter.kafka import (
8-
SchemaRegistryDeserializer, SchemaRegistrySerializer
8+
KafkaDeserializer, KafkaSerializer
99
)
1010

1111
LOG = logging.getLogger(__name__)
@@ -23,8 +23,8 @@
2323

2424
def test_interop_with_java_library(glue_client, registry, boto_session):
2525
client = SchemaRegistryClient(glue_client, registry_name=registry)
26-
serializer = SchemaRegistrySerializer(client)
27-
deserializer = SchemaRegistryDeserializer(client)
26+
serializer = KafkaSerializer(client)
27+
deserializer = KafkaDeserializer(client)
2828

2929
data = {
3030
'name': 'John Doe',

tests/integration/kafka/test_kafka_integration.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from aws_schema_registry import DataAndSchema, SchemaRegistryClient
99
from aws_schema_registry.avro import AvroSchema
1010
from aws_schema_registry.adapter.kafka import (
11-
SchemaRegistryDeserializer, SchemaRegistrySerializer
11+
KafkaDeserializer, KafkaSerializer
1212
)
1313
from aws_schema_registry.naming import record_name_strategy
1414

@@ -62,10 +62,10 @@ def test_produce_consume_with_ser_de_schema_registry(
6262
client = SchemaRegistryClient(
6363
glue_client, registry_name=registry
6464
)
65-
serializer = SchemaRegistrySerializer(
65+
serializer = KafkaSerializer(
6666
client, schema_naming_strategy=record_name_strategy
6767
)
68-
deserializer = SchemaRegistryDeserializer(client)
68+
deserializer = KafkaDeserializer(client)
6969

7070
producer = KafkaProducer(
7171
value_serializer=serializer,

0 commit comments

Comments
 (0)