Skip to content

Commit e65febf

Browse files
authored
Add get_by_version to CachedSchemaRegistryClient (#867)
* Add get_by_version to cached schema registry client * update docstring * Fix tests and docker context
1 parent d9669df commit e65febf

File tree

4 files changed

+64
-2
lines changed

4 files changed

+64
-2
lines changed

src/confluent_kafka/avro/cached_schema_registry_client.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ def get_by_id(self, schema_id):
321321

322322
def get_latest_schema(self, subject):
323323
"""
324-
GET /subjects/(string: subject)/versions/(versionId: version)
324+
GET /subjects/(string: subject)/versions/latest
325325
326326
Return the latest 3-tuple of:
327327
(the schema id, the parsed avro schema, the schema version)
@@ -334,7 +334,25 @@ def get_latest_schema(self, subject):
334334
:returns: (schema_id, schema, version)
335335
:rtype: (string, schema, int)
336336
"""
337-
url = '/'.join([self.url, 'subjects', subject, 'versions', 'latest'])
337+
return self.get_by_version(subject, 'latest')
338+
339+
def get_by_version(self, subject, version):
340+
"""
341+
GET /subjects/(string: subject)/versions/(versionId: version)
342+
343+
Return the 3-tuple of:
344+
(the schema id, the parsed avro schema, the schema version)
345+
for a particular subject and version.
346+
347+
This call always contacts the registry.
348+
349+
If the subject is not found, (None,None,None) is returned.
350+
:param str subject: subject name
351+
:param int version: version number
352+
:returns: (schema_id, schema, version)
353+
:rtype: (string, schema, int)
354+
"""
355+
url = '/'.join([self.url, 'subjects', subject, 'versions', str(version)])
338356

339357
result, code = self._send_request(url)
340358
if code == 404:

tests/avro/mock_registry.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def __init__(self, *args, **kwargs):
5959
self.all_routes = {
6060
'GET': [
6161
(r"/schemas/ids/(\d+)", 'get_schema_by_id'),
62+
(r"/subjects/(\w+)/versions/(\d+)", 'get_by_version'),
6263
(r"/subjects/(\w+)/versions/latest", 'get_latest')
6364
],
6465
'POST': [
@@ -170,6 +171,21 @@ def get_latest(self, req, groups):
170171
}
171172
return (200, result)
172173

174+
def get_by_version(self, req, groups):
175+
subject = groups[0]
176+
version = int(groups[1])
177+
178+
schema_id, avro_schema, version = self.registry.get_by_version(subject, version)
179+
if schema_id is None:
180+
return self._create_error("Not found", 404)
181+
result = {
182+
"schema": json.dumps(avro_schema.to_json()),
183+
"subject": subject,
184+
"id": schema_id,
185+
"version": version
186+
}
187+
return (200, result)
188+
173189
def add_count(self, path):
174190
if path not in self.counts:
175191
self.counts[path] = 0

tests/avro/mock_schema_registry_client.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,28 @@ def get_by_id(self, schema_id):
135135
"""Retrieve a parsed avro schema by id or None if not found"""
136136
return self.id_to_schema.get(schema_id, None)
137137

138+
def get_by_version(self, subject, version):
139+
"""
140+
Return the 3-tuple of:
141+
(the schema id, the parsed avro schema, the schema version)
142+
for a particular subject and version.
143+
144+
If the subject or version are not found, (None,None,None) is returned.
145+
"""
146+
schema_versions = self.subject_to_schema_versions.get(subject) or []
147+
# get schema+version by version
148+
schema_versions = [x for x in schema_versions.items() if x[1] == version]
149+
schema = None
150+
schema_version = None
151+
if len(schema_versions) > 0:
152+
schema = schema_versions[0][0]
153+
schema_version = schema_versions[0][1]
154+
155+
# get schema_id by schema
156+
schema_id = self.subject_to_schema_ids.get(subject).get(schema)
157+
158+
return (schema_id, schema, schema_version)
159+
138160
def get_latest_schema(self, subject):
139161
"""
140162
Return the latest 3-tuple of:

tests/avro/test_cached_client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,12 @@ def test_multi_register(self):
129129
# latest should not change with a re-reg
130130
self.assertEqual(latest2, latest3)
131131

132+
schema1 = client.get_by_version(subject, v1)
133+
schema2 = client.get_by_version(subject, v2)
134+
self.assertEqual(schema1, latest1)
135+
self.assertEqual(schema2, latest2)
136+
self.assertEqual(schema2, latest3)
137+
132138
def hash_func(self):
133139
return hash(str(self))
134140

0 commit comments

Comments
 (0)