Skip to content

Commit 8c5976e

Browse files
authored
Fix use.schema.id, broken by signature change to sr.lookup_schema(). (confluentinc#2104)
* Fix use.schema.id, broken by signature change to sr.lookup_schema(). * Fix PEP-8 violations.
1 parent d4b8041 commit 8c5976e

File tree

15 files changed

+41
-33
lines changed

15 files changed

+41
-33
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})
106106

107107
def delivery_report(err, msg):
108108
""" Called once for each message produced to indicate delivery result.
109-
Triggered by poll() or flush(). """
109+
Triggered by poll() or flush()."""
110110
if err is not None:
111111
print('Message delivery failed: {}'.format(err))
112112
else:

src/confluent_kafka/schema_registry/_async/avro.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,11 +323,11 @@ async def __serialize(self, obj: object, ctx: Optional[SerializationContext] = N
323323
# a schema without a subject so we set the schema_id here to handle
324324
# the initial registration.
325325
registered_schema = await self._registry.register_schema_full_response(
326-
subject, self._schema, self._normalize_schemas)
326+
subject, self._schema, normalize_schemas=self._normalize_schemas)
327327
self._schema_id = SchemaId(AVRO_TYPE, registered_schema.schema_id, registered_schema.guid)
328328
else:
329329
registered_schema = await self._registry.lookup_schema(
330-
subject, self._schema, self._normalize_schemas)
330+
subject, self._schema, normalize_schemas=self._normalize_schemas)
331331
self._schema_id = SchemaId(AVRO_TYPE, registered_schema.schema_id, registered_schema.guid)
332332

333333
self._known_subjects.add(subject)

src/confluent_kafka/schema_registry/_async/json_schema.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,11 +332,11 @@ async def __serialize(self, obj: object, ctx: Optional[SerializationContext] = N
332332
# a schema without a subject so we set the schema_id here to handle
333333
# the initial registration.
334334
registered_schema = await self._registry.register_schema_full_response(
335-
subject, self._schema, self._normalize_schemas)
335+
subject, self._schema, normalize_schemas=self._normalize_schemas)
336336
self._schema_id = SchemaId(JSON_TYPE, registered_schema.schema_id, registered_schema.guid)
337337
else:
338338
registered_schema = await self._registry.lookup_schema(
339-
subject, self._schema, self._normalize_schemas)
339+
subject, self._schema, normalize_schemas=self._normalize_schemas)
340340
self._schema_id = SchemaId(JSON_TYPE, registered_schema.schema_id, registered_schema.guid)
341341

342342
self._known_subjects.add(subject)

src/confluent_kafka/schema_registry/_async/mock_schema_registry_client.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ def get_latest_version(self, subject_name: str) -> Optional[RegisteredSchema]:
9494
return None
9595

9696
def get_latest_with_metadata(
97-
self, subject_name: str,
98-
metadata: Dict[str, str]
97+
self, subject_name: str, metadata: Dict[str, str],
98+
deleted: bool = False, fmt: Optional[str] = None
9999
) -> Optional[RegisteredSchema]:
100100
with self.lock:
101101
if subject_name in self.subject_schemas:
@@ -155,7 +155,8 @@ async def register_schema(
155155
self, subject_name: str, schema: 'Schema',
156156
normalize_schemas: bool = False
157157
) -> int:
158-
registered_schema = await self.register_schema_full_response(subject_name, schema, normalize_schemas)
158+
registered_schema = await self.register_schema_full_response(
159+
subject_name, schema, normalize_schemas=normalize_schemas)
159160
return registered_schema.schema_id
160161

161162
async def register_schema_full_response(
@@ -202,7 +203,7 @@ async def get_schema_by_guid(
202203

203204
async def lookup_schema(
204205
self, subject_name: str, schema: 'Schema',
205-
normalize_schemas: bool = False, deleted: bool = False
206+
normalize_schemas: bool = False, fmt: Optional[str] = None, deleted: bool = False
206207
) -> 'RegisteredSchema':
207208

208209
registered_schema = self._store.get_registered_schema_by_schema(subject_name, schema)

src/confluent_kafka/schema_registry/_async/protobuf.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,11 +407,11 @@ async def __serialize(self, message: Message, ctx: Optional[SerializationContext
407407

408408
if self._auto_register:
409409
registered_schema = await self._registry.register_schema_full_response(
410-
subject, self._schema, self._normalize_schemas)
410+
subject, self._schema, normalize_schemas=self._normalize_schemas)
411411
self._schema_id = SchemaId(PROTOBUF_TYPE, registered_schema.schema_id, registered_schema.guid)
412412
else:
413413
registered_schema = await self._registry.lookup_schema(
414-
subject, self._schema, self._normalize_schemas)
414+
subject, self._schema, normalize_schemas=self._normalize_schemas)
415415
self._schema_id = SchemaId(PROTOBUF_TYPE, registered_schema.schema_id, registered_schema.guid)
416416

417417
self._known_subjects.add(subject)

src/confluent_kafka/schema_registry/_async/schema_registry_client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,8 @@ async def register_schema(
637637
`POST Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions>`_
638638
""" # noqa: E501
639639

640-
registered_schema = await self.register_schema_full_response(subject_name, schema, normalize_schemas)
640+
registered_schema = await self.register_schema_full_response(
641+
subject_name, schema, normalize_schemas=normalize_schemas)
641642
return registered_schema.schema_id
642643

643644
async def register_schema_full_response(

src/confluent_kafka/schema_registry/_async/serde.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,11 @@ class AsyncBaseSerde(object):
4747
async def _get_reader_schema(self, subject: str, fmt: Optional[str] = None) -> Optional[RegisteredSchema]:
4848
if self._use_schema_id is not None:
4949
schema = await self._registry.get_schema(self._use_schema_id, subject, fmt)
50-
return await self._registry.lookup_schema(subject, schema, False, True)
50+
return await self._registry.lookup_schema(
51+
subject, schema, normalize_schemas=False, deleted=True)
5152
if self._use_latest_with_metadata is not None:
5253
return await self._registry.get_latest_with_metadata(
53-
subject, self._use_latest_with_metadata, True, fmt)
54+
subject, self._use_latest_with_metadata, deleted=True, fmt=fmt)
5455
if self._use_latest_version:
5556
return await self._registry.get_latest_version(subject, fmt)
5657
return None
@@ -238,7 +239,8 @@ async def _get_migrations(
238239
self, subject: str, source_info: Schema,
239240
target: RegisteredSchema, fmt: Optional[str]
240241
) -> List[Migration]:
241-
source = await self._registry.lookup_schema(subject, source_info, False, True)
242+
source = await self._registry.lookup_schema(
243+
subject, source_info, normalize_schemas=False, deleted=True)
242244
migrations = []
243245
if source.version < target.version:
244246
migration_mode = RuleMode.UPGRADE

src/confluent_kafka/schema_registry/_sync/avro.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,11 +323,11 @@ def __serialize(self, obj: object, ctx: Optional[SerializationContext] = None) -
323323
# a schema without a subject so we set the schema_id here to handle
324324
# the initial registration.
325325
registered_schema = self._registry.register_schema_full_response(
326-
subject, self._schema, self._normalize_schemas)
326+
subject, self._schema, normalize_schemas=self._normalize_schemas)
327327
self._schema_id = SchemaId(AVRO_TYPE, registered_schema.schema_id, registered_schema.guid)
328328
else:
329329
registered_schema = self._registry.lookup_schema(
330-
subject, self._schema, self._normalize_schemas)
330+
subject, self._schema, normalize_schemas=self._normalize_schemas)
331331
self._schema_id = SchemaId(AVRO_TYPE, registered_schema.schema_id, registered_schema.guid)
332332

333333
self._known_subjects.add(subject)

src/confluent_kafka/schema_registry/_sync/json_schema.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,11 +332,11 @@ def __serialize(self, obj: object, ctx: Optional[SerializationContext] = None) -
332332
# a schema without a subject so we set the schema_id here to handle
333333
# the initial registration.
334334
registered_schema = self._registry.register_schema_full_response(
335-
subject, self._schema, self._normalize_schemas)
335+
subject, self._schema, normalize_schemas=self._normalize_schemas)
336336
self._schema_id = SchemaId(JSON_TYPE, registered_schema.schema_id, registered_schema.guid)
337337
else:
338338
registered_schema = self._registry.lookup_schema(
339-
subject, self._schema, self._normalize_schemas)
339+
subject, self._schema, normalize_schemas=self._normalize_schemas)
340340
self._schema_id = SchemaId(JSON_TYPE, registered_schema.schema_id, registered_schema.guid)
341341

342342
self._known_subjects.add(subject)

src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ def get_latest_version(self, subject_name: str) -> Optional[RegisteredSchema]:
9494
return None
9595

9696
def get_latest_with_metadata(
97-
self, subject_name: str,
98-
metadata: Dict[str, str]
97+
self, subject_name: str, metadata: Dict[str, str],
98+
deleted: bool = False, fmt: Optional[str] = None
9999
) -> Optional[RegisteredSchema]:
100100
with self.lock:
101101
if subject_name in self.subject_schemas:
@@ -155,7 +155,8 @@ def register_schema(
155155
self, subject_name: str, schema: 'Schema',
156156
normalize_schemas: bool = False
157157
) -> int:
158-
registered_schema = self.register_schema_full_response(subject_name, schema, normalize_schemas)
158+
registered_schema = self.register_schema_full_response(
159+
subject_name, schema, normalize_schemas=normalize_schemas)
159160
return registered_schema.schema_id
160161

161162
def register_schema_full_response(
@@ -202,7 +203,7 @@ def get_schema_by_guid(
202203

203204
def lookup_schema(
204205
self, subject_name: str, schema: 'Schema',
205-
normalize_schemas: bool = False, deleted: bool = False
206+
normalize_schemas: bool = False, fmt: Optional[str] = None, deleted: bool = False
206207
) -> 'RegisteredSchema':
207208

208209
registered_schema = self._store.get_registered_schema_by_schema(subject_name, schema)

0 commit comments

Comments
 (0)