Skip to content

Commit 7d71dda

Browse files
committed
Deserialize to DataAndSchema instead of just data
1 parent 6978f18 commit 7d71dda

File tree

5 files changed

+33
-27
lines changed

5 files changed

+33
-27
lines changed

README.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ from kafka import KafkaConsumer
6666
# Create the schema registry client, which is a façade around the boto3 glue client
6767
client = SchemaRegistryClient(glue_client,
6868
registry_name='my-registry')
69-
69+
7070
# Create the serializer
7171
serializer = SchemaRegistrySerializer(client)
7272

@@ -77,7 +77,7 @@ producer = KafkaProducer(value_serializer=serializer)
7777
# In this example we're using Avro, so we'll load an .avsc file.
7878
with open('user.avsc', 'r') as schema_file:
7979
schema = AvroSchema(schema_file.read())
80-
80+
8181
# Send message data along with schema
8282
data = {
8383
'name': 'John Doe',
@@ -101,7 +101,7 @@ from kafka import KafkaConsumer
101101
# Create the schema registry client, which is a façade around the boto3 glue client
102102
client = SchemaRegistryClient(glue_client,
103103
registry_name='my-registry')
104-
104+
105105
# Create the deserializer
106106
deserializer = SchemaRegistryDeserializer(client)
107107

@@ -110,7 +110,10 @@ consumer = KafkaConsumer('my-topic', value_deserializer=deserializer)
110110

111111
# Now use the consumer normally
112112
for message in consumer:
113-
# `message.value` will be the deserialized value
113+
# The deserializer produces DataAndSchema instances
114+
value: DataAndSchema = message.value
115+
value.data
116+
value.schema
114117
```
115118

116119
## Contributing

src/aws_schema_registry/schema.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,6 @@ class SchemaVersion:
6464
data_format: DataFormat
6565
status: SchemaVersionStatus
6666
version_number: Optional[int] = None
67+
68+
def __hash__(self):
69+
return hash(self.definition)

src/aws_schema_registry/serde.py

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import functools
44
import logging
55
import sys
6-
from typing import Any, Dict, NamedTuple
6+
from typing import Any, NamedTuple
77
from uuid import UUID
88

99
if sys.version_info[1] < 8: # for py37
@@ -109,8 +109,6 @@ class SchemaRegistryDeserializer:
109109
registries or handle schema-less data.
110110
"""
111111

112-
_writer_schemas: Dict[UUID, Schema]
113-
114112
def __init__(
115113
self,
116114
client: SchemaRegistryClient,
@@ -120,7 +118,6 @@ def __init__(
120118
self.client = client
121119
self.return_record_name = return_record_name
122120
self.secondary_deserializer = secondary_deserializer
123-
self._writer_schemas = {}
124121

125122
def deserialize(self, topic: str, bytes_: bytes):
126123
if bytes_ is None:
@@ -135,20 +132,18 @@ def deserialize(self, topic: str, bytes_: bytes):
135132
'no secondary deserializer provided to handle'
136133
' unrecognized data encoding'
137134
) from e
138-
writer_schema = self._writer_schemas.get(schema_version_id)
139-
if not writer_schema:
140-
LOG.info('Schema version %s not found locally, fetching from'
141-
' registry...', schema_version_id)
142-
schema_version = self.client.get_schema_version(
143-
version_id=schema_version_id
144-
)
145-
writer_schema = self._create_writer_schema(schema_version)
146-
self._writer_schemas[schema_version_id] = writer_schema
147-
LOG.info('Schema version %s fetched', schema_version_id)
148-
return writer_schema.read(data_bytes)
149-
150-
def _create_writer_schema(self, schema_version: SchemaVersion) -> Schema:
151-
if schema_version.data_format == 'AVRO':
152-
return AvroSchema(schema_version.definition)
153-
elif schema_version.data_format == 'JSON':
135+
writer_schema_version = self._get_schema_version(schema_version_id)
136+
writer_schema = self._schema_for_version(writer_schema_version)
137+
return DataAndSchema(writer_schema.read(data_bytes), writer_schema)
138+
139+
@functools.lru_cache(maxsize=None)
140+
def _get_schema_version(self, version_id: UUID):
141+
LOG.info('Fetching schema version %s...', version_id)
142+
return self.client.get_schema_version(version_id)
143+
144+
@functools.lru_cache(maxsize=None)
145+
def _schema_for_version(self, version: SchemaVersion) -> Schema:
146+
if version.data_format == 'AVRO':
147+
return AvroSchema(version.definition)
148+
elif version.data_format == 'JSON':
154149
raise NotImplementedError('JSON schema not supported')

tests/integration/java/test_java_integration.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@ def test_interop_with_java_library(glue_client, registry, boto_session):
5454
}
5555
)
5656
proc.check_returncode()
57-
assert deserializer.deserialize('test', proc.stdout) == data
57+
deserialized = deserializer.deserialize('test', proc.stdout)
58+
assert deserialized
59+
assert deserialized.data == data
60+
assert deserialized.schema == SCHEMA
5861

5962

6063
def compile_java():

tests/integration/kafka/test_kafka_integration.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,5 +94,7 @@ def test_produce_consume_with_ser_de_schema_registry(
9494
assert len(batch) == 1
9595
messages = batch[list(batch.keys())[0]]
9696
assert len(messages) == 2
97-
assert messages[0].value == data1
98-
assert messages[1].value == data2
97+
assert messages[0].value.data == data1
98+
assert messages[0].value.schema == SCHEMA_V1
99+
assert messages[1].value.data == data2
100+
assert messages[1].value.schema == SCHEMA_V2

0 commit comments

Comments
 (0)