Skip to content

Commit 77dceb1

Browse files
authored
Enhance serializers with use.latest.version config (#1133)
* Add configs for use.latest.version and ignore.known.types * Add use.latest.version test * Check that both use.latest.version and auto.register.schemas are not enabled * Add warnings to use.latest.version Also add additional checks that both use.latest.version and auto.register.schemas are not set
1 parent e1c3227 commit 77dceb1

File tree

5 files changed

+136
-40
lines changed

5 files changed

+136
-40
lines changed

src/confluent_kafka/schema_registry/avro.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ class AvroSerializer(Serializer):
8383
| ``auto.register.schemas`` | bool | previously associated with a particular subject. |
8484
| | | Defaults to True. |
8585
+---------------------------+----------+--------------------------------------------------+
86+
| | | Whether to use the latest subject version for |
87+
| ``use.latest.version`` | bool | serialization. |
88+
| | | WARNING: There is no check that the latest |
89+
| | | schema is backwards compatible with the object |
90+
| | | being serialized. |
91+
| | | Defaults to False. |
92+
+-------------------------------------+----------+----------------------------------------+
8693
| | | Callable(SerializationContext, str) -> str |
8794
| | | |
8895
| ``subject.name.strategy`` | callable | Instructs the AvroSerializer on how to construct |
@@ -131,12 +138,13 @@ class AvroSerializer(Serializer):
131138
conf (dict): AvroSerializer configuration.
132139
133140
""" # noqa: E501
134-
__slots__ = ['_hash', '_auto_register', '_known_subjects', '_parsed_schema',
141+
__slots__ = ['_hash', '_auto_register', '_use_latest_version', '_known_subjects', '_parsed_schema',
135142
'_registry', '_schema', '_schema_id', '_schema_name',
136143
'_subject_name_func', '_to_dict']
137144

138145
# default configuration
139146
_default_conf = {'auto.register.schemas': True,
147+
'use.latest.version': False,
140148
'subject.name.strategy': topic_subject_name_strategy}
141149

142150
def __init__(self, schema_registry_client, schema_str,
@@ -161,6 +169,12 @@ def __init__(self, schema_registry_client, schema_str,
161169
if not isinstance(self._auto_register, bool):
162170
raise ValueError("auto.register.schemas must be a boolean value")
163171

172+
self._use_latest_version = conf_copy.pop('use.latest.version')
173+
if not isinstance(self._use_latest_version, bool):
174+
raise ValueError("use.latest.version must be a boolean value")
175+
if self._use_latest_version and self._auto_register:
176+
raise ValueError("cannot enable both use.latest.version and auto.register.schemas")
177+
164178
self._subject_name_func = conf_copy.pop('subject.name.strategy')
165179
if not callable(self._subject_name_func):
166180
raise ValueError("subject.name.strategy must be callable")
@@ -208,18 +222,23 @@ def __call__(self, obj, ctx):
208222

209223
subject = self._subject_name_func(ctx, self._schema_name)
210224

211-
# Check to ensure this schema has been registered under subject_name.
212-
if self._auto_register and subject not in self._known_subjects:
213-
# The schema name will always be the same. We can't however register
214-
# a schema without a subject so we set the schema_id here to handle
215-
# the initial registration.
216-
self._schema_id = self._registry.register_schema(subject,
217-
self._schema)
218-
self._known_subjects.add(subject)
219-
elif not self._auto_register and subject not in self._known_subjects:
220-
registered_schema = self._registry.lookup_schema(subject,
221-
self._schema)
222-
self._schema_id = registered_schema.schema_id
225+
if subject not in self._known_subjects:
226+
if self._use_latest_version:
227+
latest_schema = self._registry.get_latest_version(subject)
228+
self._schema_id = latest_schema.schema_id
229+
230+
else:
231+
# Check to ensure this schema has been registered under subject_name.
232+
if self._auto_register:
233+
# The schema name will always be the same. We can't however register
234+
# a schema without a subject so we set the schema_id here to handle
235+
# the initial registration.
236+
self._schema_id = self._registry.register_schema(subject,
237+
self._schema)
238+
else:
239+
registered_schema = self._registry.lookup_schema(subject,
240+
self._schema)
241+
self._schema_id = registered_schema.schema_id
223242
self._known_subjects.add(subject)
224243

225244
if self._to_dict is not None:

src/confluent_kafka/schema_registry/json_schema.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ class JSONSerializer(Serializer):
5959
| ``auto.register.schemas`` | bool | previously associated with a particular subject. |
6060
| | | Defaults to True. |
6161
+---------------------------+----------+--------------------------------------------------+
62+
| | | Whether to use the latest subject version for |
63+
| ``use.latest.version`` | bool | serialization. |
64+
| | | WARNING: There is no check that the latest |
65+
| | | schema is backwards compatible with the object |
66+
| | | being serialized. |
67+
| | | Defaults to False. |
68+
+-------------------------------------+----------+----------------------------------------+
6269
| | | Callable(SerializationContext, str) -> str |
6370
| | | |
6471
| ``subject.name.strategy`` | callable | Instructs the JsonSerializer on how to construct |
@@ -109,12 +116,13 @@ class JSONSerializer(Serializer):
109116
conf (dict): JsonSerializer configuration.
110117
111118
""" # noqa: E501
112-
__slots__ = ['_hash', '_auto_register', '_known_subjects', '_parsed_schema',
119+
__slots__ = ['_hash', '_auto_register', '_use_latest_version', '_known_subjects', '_parsed_schema',
113120
'_registry', '_schema', '_schema_id', '_schema_name',
114121
'_subject_name_func', '_to_dict']
115122

116123
# default configuration
117124
_default_conf = {'auto.register.schemas': True,
125+
'use.latest.version': False,
118126
'subject.name.strategy': topic_subject_name_strategy}
119127

120128
def __init__(self, schema_str, schema_registry_client, to_dict=None,
@@ -139,6 +147,12 @@ def __init__(self, schema_str, schema_registry_client, to_dict=None,
139147
if not isinstance(self._auto_register, bool):
140148
raise ValueError("auto.register.schemas must be a boolean value")
141149

150+
self._use_latest_version = conf_copy.pop('use.latest.version')
151+
if not isinstance(self._use_latest_version, bool):
152+
raise ValueError("use.latest.version must be a boolean value")
153+
if self._use_latest_version and self._auto_register:
154+
raise ValueError("cannot enable both use.latest.version and auto.register.schemas")
155+
142156
self._subject_name_func = conf_copy.pop('subject.name.strategy')
143157
if not callable(self._subject_name_func):
144158
raise ValueError("subject.name.strategy must be callable")
@@ -182,18 +196,23 @@ def __call__(self, obj, ctx):
182196

183197
subject = self._subject_name_func(ctx, self._schema_name)
184198

185-
# Check to ensure this schema has been registered under subject_name.
186-
if self._auto_register and subject not in self._known_subjects:
187-
# The schema name will always be the same. We can't however register
188-
# a schema without a subject so we set the schema_id here to handle
189-
# the initial registration.
190-
self._schema_id = self._registry.register_schema(subject,
191-
self._schema)
192-
self._known_subjects.add(subject)
193-
elif not self._auto_register and subject not in self._known_subjects:
194-
registered_schema = self._registry.lookup_schema(subject,
195-
self._schema)
196-
self._schema_id = registered_schema.schema_id
199+
if subject not in self._known_subjects:
200+
if self._use_latest_version:
201+
latest_schema = self._registry.get_latest_version(subject)
202+
self._schema_id = latest_schema.schema_id
203+
204+
else:
205+
# Check to ensure this schema has been registered under subject_name.
206+
if self._auto_register:
207+
# The schema name will always be the same. We can't however register
208+
# a schema without a subject so we set the schema_id here to handle
209+
# the initial registration.
210+
self._schema_id = self._registry.register_schema(subject,
211+
self._schema)
212+
else:
213+
registered_schema = self._registry.lookup_schema(subject,
214+
self._schema)
215+
self._schema_id = registered_schema.schema_id
197216
self._known_subjects.add(subject)
198217

199218
if self._to_dict is not None:

src/confluent_kafka/schema_registry/protobuf.py

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,17 @@ class ProtobufSerializer(object):
146146
| ``auto.register.schemas`` | bool | previously associated with a particular subject. |
147147
| | | Defaults to True. |
148148
+-------------------------------------+----------+------------------------------------------------------+
149+
| | | Whether to use the latest subject version for |
150+
| ``use.latest.version`` | bool | serialization. |
151+
| | | WARNING: There is no check that the latest |
152+
| | | schema is backwards compatible with the object |
153+
| | | being serialized. |
154+
| | | Defaults to False. |
155+
+-------------------------------------+----------+------------------------------------------------------+
156+
| | | Whether to skip known types when resolving schema |
157+
| ``skip.known.types`` | bool | dependencies. |
158+
| | | Defaults to False. |
159+
+-------------------------------------+----------+------------------------------------------------------+
149160
| | | Callable(SerializationContext, str) -> str |
150161
| | | |
151162
| ``subject.name.strategy`` | callable | Instructs the ProtobufSerializer on how to construct |
@@ -194,12 +205,15 @@ class ProtobufSerializer(object):
194205
`Protobuf API reference <https://googleapis.dev/python/protobuf/latest/google/protobuf.html>`_
195206
196207
""" # noqa: E501
197-
__slots__ = ['_auto_register', '_registry', '_known_subjects',
208+
__slots__ = ['_auto_register', '_use_latest_version', '_skip_known_types',
209+
'_registry', '_known_subjects',
198210
'_msg_class', '_msg_index', '_schema', '_schema_id',
199211
'_ref_reference_subject_func', '_subject_name_func']
200212
# default configuration
201213
_default_conf = {
202214
'auto.register.schemas': True,
215+
'use.latest.version': False,
216+
'skip.known.types': False,
203217
'subject.name.strategy': topic_subject_name_strategy,
204218
'reference.subject.name.strategy': reference_subject_name_strategy
205219
}
@@ -214,6 +228,16 @@ def __init__(self, msg_type, schema_registry_client, conf=None):
214228
if not isinstance(self._auto_register, bool):
215229
raise ValueError("auto.register.schemas must be a boolean value")
216230

231+
self._use_latest_version = conf_copy.pop('use.latest.version')
232+
if not isinstance(self._use_latest_version, bool):
233+
raise ValueError("use.latest.version must be a boolean value")
234+
if self._use_latest_version and self._auto_register:
235+
raise ValueError("cannot enable both use.latest.version and auto.register.schemas")
236+
237+
self._skip_known_types = conf_copy.pop('skip.known.types')
238+
if not isinstance(self._skip_known_types, bool):
239+
raise ValueError("skip.known.types must be a boolean value")
240+
217241
self._subject_name_func = conf_copy.pop('subject.name.strategy')
218242
if not callable(self._subject_name_func):
219243
raise ValueError("subject.name.strategy must be callable")
@@ -266,6 +290,8 @@ def _resolve_dependencies(self, ctx, file_desc):
266290
"""
267291
schema_refs = []
268292
for dep in file_desc.dependencies:
293+
if self._skip_known_types and dep.name.startswith("google/protobuf/"):
294+
continue
269295
dep_refs = self._resolve_dependencies(ctx, dep)
270296
subject = self._ref_reference_subject_func(ctx, dep)
271297
schema = Schema(_schema_to_str(dep),
@@ -313,15 +339,22 @@ def __call__(self, message_type, ctx):
313339
message_type.DESCRIPTOR.full_name)
314340

315341
if subject not in self._known_subjects:
316-
self._schema.references = self._resolve_dependencies(
317-
ctx, message_type.DESCRIPTOR.file)
342+
if self._use_latest_version:
343+
latest_schema = self._registry.get_latest_version(subject)
344+
self._schema_id = latest_schema.schema_id
318345

319-
if self._auto_register:
320-
self._schema_id = self._registry.register_schema(subject,
321-
self._schema)
322346
else:
323-
self._schema_id = self._registry.lookup_schema(
324-
subject, self._schema).schema_id
347+
self._schema.references = self._resolve_dependencies(
348+
ctx, message_type.DESCRIPTOR.file)
349+
350+
if self._auto_register:
351+
self._schema_id = self._registry.register_schema(subject,
352+
self._schema)
353+
else:
354+
self._schema_id = self._registry.lookup_schema(
355+
subject, self._schema).schema_id
356+
357+
self._known_subjects.add(subject)
325358

326359
with _ContextStringIO() as fo:
327360
# Write the magic byte and schema ID in network byte order

tests/schema_registry/conftest.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -297,12 +297,13 @@ def get_subject_version_callback(self, request, context):
297297
path_match = re.match(self.subject_versions, request.path)
298298
subject = path_match.group(1)
299299
version = path_match.group(2)
300+
version_num = -1 if version == 'latest' else int(version)
300301

301-
if int(version) == 404:
302+
if version_num == 404:
302303
context.status_code = 404
303304
return {'error_code': 40402,
304305
'message': "Version not found"}
305-
if int(version) == 422:
306+
if version_num == 422:
306307
context.status_code = 422
307308
return {'error_code': 42202,
308309
'message': "Invalid version"}
@@ -313,7 +314,7 @@ def get_subject_version_callback(self, request, context):
313314
context.status_code = 200
314315
return {'subject': subject,
315316
'id': self.SCHEMA_ID,
316-
'version': int(version),
317+
'version': version_num,
317318
'schema': self._load_avsc(self.SCHEMA)}
318319

319320
def delete_subject_version_callback(self, request, context):
@@ -322,13 +323,14 @@ def delete_subject_version_callback(self, request, context):
322323
path_match = re.match(self.subject_versions, request.path)
323324
subject = path_match.group(1)
324325
version = path_match.group(2)
326+
version_num = -1 if version == 'latest' else int(version)
325327

326-
if int(version) == 404:
328+
if version_num == 404:
327329
context.status_code = 404
328330
return {"error_code": 40402,
329331
"message": "Version not found"}
330332

331-
if int(version) == 422:
333+
if version_num == 422:
332334
context.status_code = 422
333335
return {"error_code": 42202,
334336
"message": "Invalid version"}
@@ -339,7 +341,7 @@ def delete_subject_version_callback(self, request, context):
339341
"message": "Subject not found"}
340342

341343
context.status_code = 200
342-
return int(version)
344+
return version_num
343345

344346
def post_subject_version_callback(self, request, context):
345347
self.counter['POST'][request.path] += 1

tests/schema_registry/test_avro_serializer.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,29 @@ def test_avro_serializer_config_auto_register_schemas_false(mock_schema_registry
7474
assert test_client.counter['POST'].get('/subjects/{}'.format(subject)) == 1
7575

7676

77+
def test_avro_serializer_config_use_latest_version(mock_schema_registry):
78+
"""
79+
Ensures auto.register.schemas=False does not register schema
80+
"""
81+
conf = {'url': TEST_URL}
82+
test_client = mock_schema_registry(conf)
83+
topic = "test-use-latest-version"
84+
subject = topic + '-key'
85+
86+
test_serializer = AvroSerializer(test_client, 'string',
87+
conf={'auto.register.schemas': False, 'use.latest.version': True})
88+
89+
test_serializer("test",
90+
SerializationContext("test-use-latest-version",
91+
MessageField.KEY))
92+
93+
register_count = test_client.counter['POST'].get('/subjects/{}/versions'
94+
.format(subject), 0)
95+
assert register_count == 0
96+
# Ensure latest was requested
97+
assert test_client.counter['GET'].get('/subjects/{}/versions/latest'.format(subject)) == 1
98+
99+
77100
def test_avro_serializer_config_subject_name_strategy():
78101
"""
79102
Ensures subject.name.strategy is applied

0 commit comments

Comments
 (0)