Skip to content

Commit 26d40be

Browse files
slominskiranchitj
andauthored
Avro schema references support (#1088)
* Initial attempt at schema references support: #974 * flake8 line length * Update tests to use Schema * flake8 whitespace after comma * Imports should not be from src * Updating test cases to use Schema object * Convert Registry Schema to fastavro Schema (dict) * Updated docs to indicate Schema object, not Schema String arg * fastavro 1.4.0 renamed _named_schemas * primitive types must be JSON formatted * Fixes #974 * Fixed table formatting * flake8 fixes * Removed named_schemas from schema_registry_client * Added support for nested references * PR Feedback * PR Feedback * Remove unneeded changes * Update unit tests * PR Feedback * PR Feedback * PR Feedback * Fix unit test * PR Feedback * PR Feedback --------- Co-authored-by: Anchit Jain <[email protected]>
1 parent c771e17 commit 26d40be

File tree

5 files changed

+249
-25
lines changed

5 files changed

+249
-25
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ v2.1.0 is a feature release with the following features, fixes and enhancements:
1111
- Added support for password protected private key in CachedSchemaRegistryClient.
1212
- Add reference support in Schema Registry client. (@RickTalken, #1304)
1313
- Migrated travis jobs to Semaphore CI (#1503)
14-
- Add support for passing schema references in JSONSerializer and JSONDeserializer. (#1514)
14+
- Added support for schema references. (#1514 and @slominskir #1088)
1515

1616
confluent-kafka-python is based on librdkafka v2.1.0, see the
1717
[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v2.1.0)

src/confluent_kafka/schema_registry/avro.py

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,24 @@ def _schema_loads(schema_str):
6767
return Schema(schema_str, schema_type='AVRO')
6868

6969

70+
def _resolve_named_schema(schema, schema_registry_client, named_schemas=None):
71+
"""
72+
Resolves named schemas referenced by the provided schema recursively.
73+
:param schema: Schema to resolve named schemas for.
74+
:param schema_registry_client: SchemaRegistryClient to use for retrieval.
75+
:param named_schemas: Dict of named schemas resolved recursively.
76+
:return: named_schemas dict.
77+
"""
78+
if named_schemas is None:
79+
named_schemas = {}
80+
if schema.references is not None:
81+
for ref in schema.references:
82+
referenced_schema = schema_registry_client.get_version(ref.subject, ref.version)
83+
_resolve_named_schema(referenced_schema.schema, schema_registry_client, named_schemas)
84+
parse_schema(loads(referenced_schema.schema.schema_str), named_schemas=named_schemas)
85+
return named_schemas
86+
87+
7088
class AvroSerializer(Serializer):
7189
"""
7290
Serializer that outputs Avro binary encoded data with Confluent Schema Registry framing.
@@ -146,7 +164,7 @@ class AvroSerializer(Serializer):
146164
Args:
147165
schema_registry_client (SchemaRegistryClient): Schema Registry client instance.
148166
149-
schema_str (str): Avro `Schema Declaration. <https://avro.apache.org/docs/current/spec.html#schemas>`_
167+
schema_str (str or Schema): Avro `Schema Declaration. <https://avro.apache.org/docs/current/spec.html#schemas>`_ Accepts either a string or a `Schema`(Schema) instance. Note that string definitions cannot reference other schemas. For referencing other schemas, use a Schema instance.
150168
151169
to_dict (callable, optional): Callable(object, SerializationContext) -> dict. Converts object to a dict.
152170
@@ -155,15 +173,21 @@ class AvroSerializer(Serializer):
155173
__slots__ = ['_hash', '_auto_register', '_normalize_schemas', '_use_latest_version',
156174
'_known_subjects', '_parsed_schema',
157175
'_registry', '_schema', '_schema_id', '_schema_name',
158-
'_subject_name_func', '_to_dict']
176+
'_subject_name_func', '_to_dict', '_named_schemas']
159177

160178
_default_conf = {'auto.register.schemas': True,
161179
'normalize.schemas': False,
162180
'use.latest.version': False,
163181
'subject.name.strategy': topic_subject_name_strategy}
164182

165-
def __init__(self, schema_registry_client, schema_str,
166-
to_dict=None, conf=None):
183+
def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):
184+
if isinstance(schema_str, str):
185+
schema = _schema_loads(schema_str)
186+
elif isinstance(schema_str, Schema):
187+
schema = schema_str
188+
else:
189+
raise TypeError('You must pass either schema string or schema object')
190+
167191
self._registry = schema_registry_client
168192
self._schema_id = None
169193
self._known_subjects = set()
@@ -200,9 +224,9 @@ def __init__(self, schema_registry_client, schema_str,
200224
raise ValueError("Unrecognized properties: {}"
201225
.format(", ".join(conf_copy.keys())))
202226

203-
schema = _schema_loads(schema_str)
204227
schema_dict = loads(schema.schema_str)
205-
parsed_schema = parse_schema(schema_dict)
228+
self._named_schemas = _resolve_named_schema(schema, schema_registry_client)
229+
parsed_schema = parse_schema(schema_dict, named_schemas=self._named_schemas)
206230

207231
if isinstance(parsed_schema, list):
208232
# if parsed_schema is a list, we have an Avro union and there
@@ -299,8 +323,9 @@ class AvroDeserializer(Deserializer):
299323
schema_registry_client (SchemaRegistryClient): Confluent Schema Registry
300324
client instance.
301325
302-
schema_str (str, optional): The reader schema.
303-
If not provided, the writer schema will be used as the reader schema.
326+
schema_str (str, Schema, optional): Avro reader schema declaration Accepts either a string or a `Schema`(
327+
Schema) instance. If not provided, the writer schema will be used as the reader schema. Note that string
328+
definitions cannot reference other schemas. For referencing other schemas, use a Schema instance.
304329
305330
from_dict (callable, optional): Callable(dict, SerializationContext) -> object.
306331
Converts a dict to an instance of some object.
@@ -315,13 +340,31 @@ class AvroDeserializer(Deserializer):
315340
`Apache Avro Schema Resolution <https://avro.apache.org/docs/1.8.2/spec.html#Schema+Resolution>`_
316341
"""
317342

318-
__slots__ = ['_reader_schema', '_registry', '_from_dict', '_writer_schemas', '_return_record_name']
343+
__slots__ = ['_reader_schema', '_registry', '_from_dict', '_writer_schemas', '_return_record_name', '_schema',
344+
'_named_schemas']
319345

320346
def __init__(self, schema_registry_client, schema_str=None, from_dict=None, return_record_name=False):
347+
schema = None
348+
if schema_str is not None:
349+
if isinstance(schema_str, str):
350+
schema = _schema_loads(schema_str)
351+
elif isinstance(schema_str, Schema):
352+
schema = schema_str
353+
else:
354+
raise TypeError('You must pass either schema string or schema object')
355+
356+
self._schema = schema
321357
self._registry = schema_registry_client
322358
self._writer_schemas = {}
323359

324-
self._reader_schema = parse_schema(loads(schema_str)) if schema_str else None
360+
if schema:
361+
schema_dict = loads(self._schema.schema_str)
362+
self._named_schemas = _resolve_named_schema(self._schema, schema_registry_client)
363+
self._reader_schema = parse_schema(schema_dict,
364+
named_schemas=self._named_schemas)
365+
else:
366+
self._named_schemas = None
367+
self._reader_schema = None
325368

326369
if from_dict is not None and not callable(from_dict):
327370
raise ValueError("from_dict must be callable with the signature "
@@ -370,10 +413,11 @@ def __call__(self, data, ctx):
370413
writer_schema = self._writer_schemas.get(schema_id, None)
371414

372415
if writer_schema is None:
373-
schema = self._registry.get_schema(schema_id)
374-
prepared_schema = _schema_loads(schema.schema_str)
416+
registered_schema = self._registry.get_schema(schema_id)
417+
self._named_schemas = _resolve_named_schema(registered_schema, self._registry)
418+
prepared_schema = _schema_loads(registered_schema.schema_str)
375419
writer_schema = parse_schema(loads(
376-
prepared_schema.schema_str))
420+
prepared_schema.schema_str), named_schemas=self._named_schemas)
377421
self._writer_schemas[schema_id] = writer_schema
378422

379423
obj_dict = schemaless_reader(payload,

src/confluent_kafka/schema_registry/schema_registry_client.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -686,12 +686,11 @@ class Schema(object):
686686
Args:
687687
schema_str (str): String representation of the schema.
688688
689-
references ([SchemaReference]): SchemaReferences used in this schema.
690-
691689
schema_type (str): The schema type: AVRO, PROTOBUF or JSON.
692-
"""
693690
694-
__slots__ = ['schema_str', 'references', 'schema_type', '_hash']
691+
references ([SchemaReference]): SchemaReferences used in this schema.
692+
"""
693+
__slots__ = ['schema_str', 'schema_type', 'references', '_hash']
695694

696695
def __init__(self, schema_str, schema_type, references=[]):
697696
super(Schema, self).__init__()

tests/integration/schema_registry/test_avro_serializers.py

Lines changed: 162 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
#
18-
1918
import pytest
2019

2120
from confluent_kafka import TopicPartition
2221
from confluent_kafka.serialization import (MessageField,
2322
SerializationContext)
2423
from confluent_kafka.schema_registry.avro import (AvroSerializer,
2524
AvroDeserializer)
25+
from confluent_kafka.schema_registry import Schema, SchemaReference
2626

2727

2828
class User(object):
@@ -51,6 +51,145 @@ def __eq__(self, other):
5151
self.favorite_color == other.favorite_color])
5252

5353

54+
class AwardProperties(object):
55+
schema_str = """
56+
{
57+
"namespace": "confluent.io.examples.serialization.avro",
58+
"name": "AwardProperties",
59+
"type": "record",
60+
"fields": [
61+
{"name": "year", "type": "int"},
62+
{"name": "points", "type": "int"}
63+
]
64+
}
65+
"""
66+
67+
def __init__(self, points, year):
68+
self.points = points
69+
self.year = year
70+
71+
def __eq__(self, other):
72+
return all([
73+
self.points == other.points,
74+
self.year == other.year
75+
])
76+
77+
78+
class Award(object):
79+
schema_str = """
80+
{
81+
"namespace": "confluent.io.examples.serialization.avro",
82+
"name": "Award",
83+
"type": "record",
84+
"fields": [
85+
{"name": "name", "type": "string"},
86+
{"name": "properties", "type": "AwardProperties"}
87+
]
88+
}
89+
"""
90+
91+
def __init__(self, name, properties):
92+
self.name = name
93+
self.properties = properties
94+
95+
def __eq__(self, other):
96+
return all([
97+
self.name == other.name,
98+
self.properties == other.properties
99+
])
100+
101+
102+
class AwardedUser(object):
103+
schema_str = """
104+
{
105+
"namespace": "confluent.io.examples.serialization.avro",
106+
"name": "AwardedUser",
107+
"type": "record",
108+
"fields": [
109+
{"name": "award", "type": "Award"},
110+
{"name": "user", "type": "User"}
111+
]
112+
}
113+
"""
114+
115+
def __init__(self, award, user):
116+
self.award = award
117+
self.user = user
118+
119+
def __eq__(self, other):
120+
return all([
121+
self.award == other.award,
122+
self.user == other.user
123+
])
124+
125+
126+
def _register_avro_schemas_and_build_awarded_user_schema(kafka_cluster):
127+
sr = kafka_cluster.schema_registry()
128+
129+
user = User('Bowie', 47, 'purple')
130+
award_properties = AwardProperties(10, 2023)
131+
award = Award("Best In Show", award_properties)
132+
awarded_user = AwardedUser(award, user)
133+
134+
user_schema_ref = SchemaReference("confluent.io.examples.serialization.avro.User", "user", 1)
135+
award_properties_schema_ref = SchemaReference("confluent.io.examples.serialization.avro.AwardProperties",
136+
"award_properties", 1)
137+
award_schema_ref = SchemaReference("confluent.io.examples.serialization.avro.Award", "award", 1)
138+
139+
sr.register_schema("user", Schema(User.schema_str, 'AVRO'))
140+
sr.register_schema("award_properties", Schema(AwardProperties.schema_str, 'AVRO'))
141+
sr.register_schema("award", Schema(Award.schema_str, 'AVRO', [award_properties_schema_ref]))
142+
143+
references = [user_schema_ref, award_schema_ref]
144+
schema = Schema(AwardedUser.schema_str, 'AVRO', references)
145+
return awarded_user, schema
146+
147+
148+
def _references_test_common(kafka_cluster, awarded_user, serializer_schema, deserializer_schema):
149+
"""
150+
Common (both reader and writer) avro schema reference test.
151+
Args:
152+
kafka_cluster (KafkaClusterFixture): cluster fixture
153+
"""
154+
topic = kafka_cluster.create_topic("reference-avro")
155+
sr = kafka_cluster.schema_registry()
156+
157+
value_serializer = AvroSerializer(sr, serializer_schema,
158+
lambda user, ctx:
159+
dict(award=dict(name=user.award.name,
160+
properties=dict(year=user.award.properties.year,
161+
points=user.award.properties.points)),
162+
user=dict(name=user.user.name,
163+
favorite_number=user.user.favorite_number,
164+
favorite_color=user.user.favorite_color)))
165+
166+
value_deserializer = \
167+
AvroDeserializer(sr, deserializer_schema,
168+
lambda user, ctx:
169+
AwardedUser(award=Award(name=user.get('award').get('name'),
170+
properties=AwardProperties(
171+
year=user.get('award').get('properties').get(
172+
'year'),
173+
points=user.get('award').get('properties').get(
174+
'points'))),
175+
user=User(name=user.get('user').get('name'),
176+
favorite_number=user.get('user').get('favorite_number'),
177+
favorite_color=user.get('user').get('favorite_color'))))
178+
179+
producer = kafka_cluster.producer(value_serializer=value_serializer)
180+
181+
producer.produce(topic, value=awarded_user, partition=0)
182+
producer.flush()
183+
184+
consumer = kafka_cluster.consumer(value_deserializer=value_deserializer)
185+
consumer.assign([TopicPartition(topic, 0)])
186+
187+
msg = consumer.poll()
188+
awarded_user2 = msg.value()
189+
190+
assert awarded_user2 == awarded_user
191+
192+
54193
@pytest.mark.parametrize("avsc, data, record_type",
55194
[('basic_schema.avsc', {'name': 'abc'}, "record"),
56195
('primitive_string.avsc', u'Jämtland', "string"),
@@ -185,3 +324,25 @@ def test_avro_record_serialization_custom(kafka_cluster):
185324
user2 = msg.value()
186325

187326
assert user2 == user
327+
328+
329+
def test_avro_reference(kafka_cluster):
330+
"""
331+
Tests Avro schema reference with both serializer and deserializer schemas provided.
332+
Args:
333+
kafka_cluster (KafkaClusterFixture): cluster fixture
334+
"""
335+
awarded_user, schema = _register_avro_schemas_and_build_awarded_user_schema(kafka_cluster)
336+
337+
_references_test_common(kafka_cluster, awarded_user, schema, schema)
338+
339+
340+
def test_avro_reference_deserializer_none(kafka_cluster):
341+
"""
342+
Tests Avro schema reference with serializer schema provided and deserializer schema set to None.
343+
Args:
344+
kafka_cluster (KafkaClusterFixture): cluster fixture
345+
"""
346+
awarded_user, schema = _register_avro_schemas_and_build_awarded_user_schema(kafka_cluster)
347+
348+
_references_test_common(kafka_cluster, awarded_user, schema, None)

0 commit comments

Comments
 (0)