Skip to content

Commit 4606799

Browse files
authored
Add support for passing schema ID during serialization (#1949)
* Add support for passing schema ID during serialization * Fix formatting
1 parent 09772d5 commit 4606799

File tree

5 files changed

+77
-7
lines changed

5 files changed

+77
-7
lines changed

src/confluent_kafka/schema_registry/avro.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ class AvroSerializer(BaseSerializer):
133133
| ``normalize.schemas`` | bool | transform schemas to have a consistent format, |
134134
| | | including ordering properties and references. |
135135
+-----------------------------+----------+--------------------------------------------------+
136+
| | | Whether to use the given schema ID for |
137+
| ``use.schema.id`` | int | serialization. |
138+
| | | |
139+
+-----------------------------+----------+--------------------------------------------------+
136140
| | | Whether to use the latest subject version for |
137141
| ``use.latest.version`` | bool | serialization. |
138142
| | | |
@@ -143,7 +147,7 @@ class AvroSerializer(BaseSerializer):
143147
| | | Defaults to False. |
144148
+-----------------------------+----------+--------------------------------------------------+
145149
| | | Whether to use the latest subject version with |
146-
| ``use.latest.with.metadata``| bool | the given metadata. |
150+
| ``use.latest.with.metadata``| dict | the given metadata. |
147151
| | | |
148152
| | | WARNING: There is no check that the latest |
149153
| | | schema is backwards compatible with the object |
@@ -216,6 +220,7 @@ class AvroSerializer(BaseSerializer):
216220

217221
_default_conf = {'auto.register.schemas': True,
218222
'normalize.schemas': False,
223+
'use.schema.id': None,
219224
'use.latest.version': False,
220225
'use.latest.with.metadata': None,
221226
'subject.name.strategy': topic_subject_name_strategy}
@@ -261,6 +266,11 @@ def __init__(
261266
if not isinstance(self._normalize_schemas, bool):
262267
raise ValueError("normalize.schemas must be a boolean value")
263268

269+
self._use_schema_id = conf_copy.pop('use.schema.id')
270+
if (self._use_schema_id is not None and
271+
not isinstance(self._use_schema_id, int)):
272+
raise ValueError("use.schema.id must be an int value")
273+
264274
self._use_latest_version = conf_copy.pop('use.latest.version')
265275
if not isinstance(self._use_latest_version, bool):
266276
raise ValueError("use.latest.version must be a boolean value")
@@ -402,7 +412,7 @@ class AvroDeserializer(BaseDeserializer):
402412
| | | Defaults to False. |
403413
+-----------------------------+----------+--------------------------------------------------+
404414
| | | Whether to use the latest subject version with |
405-
| ``use.latest.with.metadata``| bool | the given metadata. |
415+
| ``use.latest.with.metadata``| dict | the given metadata. |
406416
| | | |
407417
| | | Defaults to None. |
408418
+-----------------------------+----------+--------------------------------------------------+
@@ -478,6 +488,7 @@ def __init__(
478488
self._registry = schema_registry_client
479489
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
480490
self._parsed_schemas = ParsedSchemaCache()
491+
self._use_schema_id = None
481492

482493
conf_copy = self._default_conf.copy()
483494
if conf is not None:

src/confluent_kafka/schema_registry/json_schema.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ class JSONSerializer(BaseSerializer):
128128
| ``normalize.schemas`` | bool | transform schemas to have a consistent format, |
129129
| | | including ordering properties and references. |
130130
+-----------------------------+----------+----------------------------------------------------+
131+
| | | Whether to use the given schema ID for |
132+
| ``use.schema.id`` | int | serialization. |
133+
| | | |
134+
+-----------------------------+----------+--------------------------------------------------+
131135
| | | Whether to use the latest subject version for |
132136
| ``use.latest.version`` | bool | serialization. |
133137
| | | |
@@ -138,7 +142,7 @@ class JSONSerializer(BaseSerializer):
138142
| | | Defaults to False. |
139143
+-----------------------------+----------+----------------------------------------------------+
140144
| | | Whether to use the latest subject version with |
141-
| ``use.latest.with.metadata``| bool | the given metadata. |
145+
| ``use.latest.with.metadata``| dict | the given metadata. |
142146
| | | |
143147
| | | WARNING: There is no check that the latest |
144148
| | | schema is backwards compatible with the object |
@@ -216,6 +220,7 @@ class JSONSerializer(BaseSerializer):
216220

217221
_default_conf = {'auto.register.schemas': True,
218222
'normalize.schemas': False,
223+
'use.schema.id': None,
219224
'use.latest.version': False,
220225
'use.latest.with.metadata': None,
221226
'subject.name.strategy': topic_subject_name_strategy,
@@ -267,6 +272,11 @@ def __init__(
267272
if not isinstance(self._normalize_schemas, bool):
268273
raise ValueError("normalize.schemas must be a boolean value")
269274

275+
self._use_schema_id = conf_copy.pop('use.schema.id')
276+
if (self._use_schema_id is not None and
277+
not isinstance(self._use_schema_id, int)):
278+
raise ValueError("use.schema.id must be an int value")
279+
270280
self._use_latest_version = conf_copy.pop('use.latest.version')
271281
if not isinstance(self._use_latest_version, bool):
272282
raise ValueError("use.latest.version must be a boolean value")
@@ -430,7 +440,7 @@ class JSONDeserializer(BaseDeserializer):
430440
| | | Defaults to False. |
431441
+-----------------------------+----------+----------------------------------------------------+
432442
| | | Whether to use the latest subject version with |
433-
| ``use.latest.with.metadata``| bool | the given metadata. |
443+
| ``use.latest.with.metadata``| dict | the given metadata. |
434444
| | | |
435445
| | | Defaults to None. |
436446
+-----------------------------+----------+----------------------------------------------------+
@@ -504,6 +514,7 @@ def __init__(
504514
self._parsed_schemas = ParsedSchemaCache()
505515
self._validators = LRUCache(1000)
506516
self._json_decode = json_decode or json.loads
517+
self._use_schema_id = None
507518

508519
conf_copy = self._default_conf.copy()
509520
if conf is not None:

src/confluent_kafka/schema_registry/protobuf.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,10 @@ class ProtobufSerializer(BaseSerializer):
268268
| ``normalize.schemas`` | bool | transform schemas to have a consistent format, |
269269
| | | including ordering properties and references. |
270270
+-------------------------------------+----------+------------------------------------------------------+
271+
| | | Whether to use the given schema ID for |
272+
| ``use.schema.id`` | int | serialization. |
273+
| | | |
274+
+-----------------------------------------+----------+--------------------------------------------------+
271275
| | | Whether to use the latest subject version for |
272276
| ``use.latest.version`` | bool | serialization. |
273277
| | | |
@@ -278,7 +282,7 @@ class ProtobufSerializer(BaseSerializer):
278282
| | | Defaults to False. |
279283
+-------------------------------------+----------+------------------------------------------------------+
280284
| | | Whether to use the latest subject version with |
281-
| ``use.latest.with.metadata`` | bool | the given metadata. |
285+
| ``use.latest.with.metadata`` | dict | the given metadata. |
282286
| | | |
283287
| | | WARNING: There is no check that the latest |
284288
| | | schema is backwards compatible with the object |
@@ -362,6 +366,7 @@ class ProtobufSerializer(BaseSerializer):
362366
_default_conf = {
363367
'auto.register.schemas': True,
364368
'normalize.schemas': False,
369+
'use.schema.id': None,
365370
'use.latest.version': False,
366371
'use.latest.with.metadata': None,
367372
'skip.known.types': True,
@@ -399,6 +404,11 @@ def __init__(
399404
if not isinstance(self._normalize_schemas, bool):
400405
raise ValueError("normalize.schemas must be a boolean value")
401406

407+
self._use_schema_id = conf_copy.pop('use.schema.id')
408+
if (self._use_schema_id is not None and
409+
not isinstance(self._use_schema_id, int)):
410+
raise ValueError("use.schema.id must be an int value")
411+
402412
self._use_latest_version = conf_copy.pop('use.latest.version')
403413
if not isinstance(self._use_latest_version, bool):
404414
raise ValueError("use.latest.version must be a boolean value")
@@ -634,7 +644,7 @@ class ProtobufDeserializer(BaseDeserializer):
634644
| | | Defaults to False. |
635645
+-------------------------------------+----------+------------------------------------------------------+
636646
| | | Whether to use the latest subject version with |
637-
| ``use.latest.with.metadata`` | bool | the given metadata. |
647+
| ``use.latest.with.metadata`` | dict | the given metadata. |
638648
| | | |
639649
| | | Defaults to None. |
640650
+-------------------------------------+----------+------------------------------------------------------+
@@ -686,6 +696,7 @@ def __init__(
686696
self._registry = schema_registry_client
687697
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
688698
self._parsed_schemas = ParsedSchemaCache()
699+
self._use_schema_id = None
689700

690701
# Require use.deprecated.format to be explicitly configured
691702
# during a transitionary period since old/new format are

src/confluent_kafka/schema_registry/serde.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,14 @@ def __init__(
268268

269269

270270
class BaseSerde(object):
271-
__slots__ = ['_use_latest_version', '_use_latest_with_metadata',
271+
__slots__ = ['_use_schema_id', '_use_latest_version', '_use_latest_with_metadata',
272272
'_registry', '_rule_registry', '_subject_name_func',
273273
'_field_transformer']
274274

275275
def _get_reader_schema(self, subject: str, fmt: Optional[str] = None) -> Optional[RegisteredSchema]:
276+
if self._use_schema_id is not None:
277+
schema = self._registry.get_schema(self._use_schema_id, subject, fmt)
278+
return self._registry.lookup_schema(subject, schema, False, True)
276279
if self._use_latest_with_metadata is not None:
277280
return self._registry.get_latest_with_metadata(
278281
subject, self._use_latest_with_metadata, True, fmt)

tests/schema_registry/test_avro_serdes.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,40 @@ def test_avro_basic_serialization():
130130
assert obj == obj2
131131

132132

133+
def test_avro_serialize_use_schema_id():
134+
conf = {'url': _BASE_URL}
135+
client = SchemaRegistryClient.new_client(conf)
136+
ser_conf = {'auto.register.schemas': False, 'use.schema.id': 1}
137+
138+
obj = {
139+
'intField': 123,
140+
'doubleField': 45.67,
141+
'stringField': 'hi',
142+
'booleanField': True,
143+
'bytesField': b'foobar',
144+
}
145+
schema = {
146+
'type': 'record',
147+
'name': 'ref',
148+
'fields': [
149+
{'name': 'intField', 'type': 'int'},
150+
{'name': 'doubleField', 'type': 'double'},
151+
{'name': 'stringField', 'type': 'string'},
152+
{'name': 'booleanField', 'type': 'boolean'},
153+
{'name': 'bytesField', 'type': 'bytes'},
154+
]
155+
}
156+
client.register_schema(_SUBJECT, Schema(json.dumps(schema), 'AVRO'))
157+
158+
ser = AvroSerializer(client, schema_str=None, conf=ser_conf)
159+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
160+
obj_bytes = ser(obj, ser_ctx)
161+
162+
deser = AvroDeserializer(client)
163+
obj2 = deser(obj_bytes, ser_ctx)
164+
assert obj == obj2
165+
166+
133167
def test_avro_serialize_nested():
134168
conf = {'url': _BASE_URL}
135169
client = SchemaRegistryClient.new_client(conf)

0 commit comments

Comments
 (0)