Skip to content

Commit 5713a95

Browse files
committed
add
1 parent a3caf39 commit 5713a95

28 files changed

+723
-544
lines changed

pyproject.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,21 @@ Homepage = "https://github.com/confluentinc/confluent-kafka-python"
2727
[tool.mypy]
2828
ignore_missing_imports = true
2929

30+
[[tool.mypy.overrides]]
31+
module = [
32+
"confluent_kafka.schema_registry.avro",
33+
"confluent_kafka.schema_registry.json_schema",
34+
"confluent_kafka.schema_registry.protobuf",
35+
]
36+
disable_error_code = ["assignment", "no-redef"]
37+
38+
[[tool.mypy.overrides]]
39+
module = [
40+
"confluent_kafka.schema_registry.confluent.meta_pb2",
41+
"confluent_kafka.schema_registry.confluent.types.decimal_pb2",
42+
]
43+
ignore_errors = true
44+
3045
[tool.setuptools]
3146
include-package-data = false
3247

requirements/requirements-tests.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# core test requirements
22
urllib3<3
33
flake8
4+
mypy
5+
types-cachetools
46
orjson
57
pytest
68
pytest-timeout

src/confluent_kafka/schema_registry/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ def dual_schema_id_deserializer(payload: bytes, ctx: Optional[SerializationConte
205205

206206
# Parse schema ID from determined source and return appropriate payload
207207
if header_value is not None:
208-
schema_id.from_bytes(io.BytesIO(header_value))
208+
schema_id.from_bytes(io.BytesIO(header_value)) # type: ignore[arg-type]
209209
return io.BytesIO(payload) # Return full payload when schema ID is in header
210210
else:
211211
return schema_id.from_bytes(io.BytesIO(payload)) # Parse from payload, return remainder

src/confluent_kafka/schema_registry/_async/avro.py

Lines changed: 52 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,13 @@ async def _resolve_named_schema(
5757
named_schemas = {}
5858
if schema.references is not None:
5959
for ref in schema.references:
60-
referenced_schema = await schema_registry_client.get_version(ref.subject, ref.version, True)
61-
ref_named_schemas = await _resolve_named_schema(referenced_schema.schema, schema_registry_client)
60+
# References in registered schemas are validated by server to be complete
61+
referenced_schema = await schema_registry_client.get_version(ref.subject, ref.version, True) # type: ignore[arg-type]
62+
ref_named_schemas = await _resolve_named_schema(referenced_schema.schema, schema_registry_client) # type: ignore[arg-type]
6263
parsed_schema = parse_schema_with_repo(
63-
referenced_schema.schema.schema_str, named_schemas=ref_named_schemas)
64+
referenced_schema.schema.schema_str, named_schemas=ref_named_schemas) # type: ignore[union-attr,arg-type]
6465
named_schemas.update(ref_named_schemas)
65-
named_schemas[ref.name] = parsed_schema
66+
named_schemas[ref.name] = parsed_schema # type: ignore[index]
6667
return named_schemas
6768

6869

@@ -204,7 +205,7 @@ async def __init_impl(
204205
schema = None
205206

206207
self._registry = schema_registry_client
207-
self._schema_id = None
208+
self._schema_id: Optional[SchemaId] = None
208209
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
209210
self._known_subjects: set[str] = set()
210211
self._parsed_schemas = ParsedSchemaCache()
@@ -219,26 +220,26 @@ async def __init_impl(
219220
if conf is not None:
220221
conf_copy.update(conf)
221222

222-
self._auto_register = conf_copy.pop('auto.register.schemas')
223+
self._auto_register = cast(bool, conf_copy.pop('auto.register.schemas'))
223224
if not isinstance(self._auto_register, bool):
224225
raise ValueError("auto.register.schemas must be a boolean value")
225226

226-
self._normalize_schemas = conf_copy.pop('normalize.schemas')
227+
self._normalize_schemas = cast(bool, conf_copy.pop('normalize.schemas'))
227228
if not isinstance(self._normalize_schemas, bool):
228229
raise ValueError("normalize.schemas must be a boolean value")
229230

230-
self._use_schema_id = conf_copy.pop('use.schema.id')
231+
self._use_schema_id = cast(Optional[int], conf_copy.pop('use.schema.id'))
231232
if (self._use_schema_id is not None and
232233
not isinstance(self._use_schema_id, int)):
233234
raise ValueError("use.schema.id must be an int value")
234235

235-
self._use_latest_version = conf_copy.pop('use.latest.version')
236+
self._use_latest_version = cast(bool, conf_copy.pop('use.latest.version'))
236237
if not isinstance(self._use_latest_version, bool):
237238
raise ValueError("use.latest.version must be a boolean value")
238239
if self._use_latest_version and self._auto_register:
239240
raise ValueError("cannot enable both use.latest.version and auto.register.schemas")
240241

241-
self._use_latest_with_metadata = conf_copy.pop('use.latest.with.metadata')
242+
self._use_latest_with_metadata = cast(Optional[dict], conf_copy.pop('use.latest.with.metadata'))
242243
if (self._use_latest_with_metadata is not None and
243244
not isinstance(self._use_latest_with_metadata, dict)):
244245
raise ValueError("use.latest.with.metadata must be a dict value")
@@ -276,8 +277,11 @@ async def __init_impl(
276277
# i.e. {"type": "string"} has a name of string.
277278
# This function does not comply.
278279
# https://github.com/fastavro/fastavro/issues/415
279-
schema_dict = json.loads(schema.schema_str)
280-
schema_name = parsed_schema.get("name", schema_dict.get("type"))
280+
if schema.schema_str is not None:
281+
schema_dict = json.loads(schema.schema_str)
282+
schema_name = parsed_schema.get("name", schema_dict.get("type")) # type: ignore[union-attr]
283+
else:
284+
schema_name = None
281285
else:
282286
schema_name = None
283287
parsed_schema = None
@@ -292,7 +296,7 @@ async def __init_impl(
292296

293297
__init__ = __init_impl
294298

295-
def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> Coroutine[Any, Any, Optional[bytes]]:
299+
def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> Coroutine[Any, Any, Optional[bytes]]: # type: ignore[override]
296300
return self.__serialize(obj, ctx)
297301

298302
async def __serialize(self, obj: object, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
@@ -319,10 +323,10 @@ async def __serialize(self, obj: object, ctx: Optional[SerializationContext] = N
319323
return None
320324

321325
subject = self._subject_name_func(ctx, self._schema_name)
322-
latest_schema = await self._get_reader_schema(subject)
326+
latest_schema = await self._get_reader_schema(subject) if subject else None # type: ignore[arg-type]
323327
if latest_schema is not None:
324328
self._schema_id = SchemaId(AVRO_TYPE, latest_schema.schema_id, latest_schema.guid)
325-
elif subject not in self._known_subjects:
329+
elif subject is not None and subject not in self._known_subjects:
326330
# Check to ensure this schema has been registered under subject_name.
327331
if self._auto_register:
328332
# The schema name will always be the same. We can't however register
@@ -339,26 +343,26 @@ async def __serialize(self, obj: object, ctx: Optional[SerializationContext] = N
339343
self._known_subjects.add(subject)
340344

341345
if self._to_dict is not None:
342-
value = self._to_dict(obj, ctx)
346+
value = self._to_dict(obj, ctx) # type: ignore[arg-type]
343347
else:
344-
value = obj
348+
value = obj # type: ignore[assignment]
345349

346350
if latest_schema is not None:
347-
parsed_schema = await self._get_parsed_schema(latest_schema.schema)
351+
parsed_schema = await self._get_parsed_schema(latest_schema.schema) # type: ignore[arg-type]
348352
def field_transformer(rule_ctx, field_transform, msg): return ( # noqa: E731
349353
transform(rule_ctx, parsed_schema, msg, field_transform))
350-
value = self._execute_rules(ctx, subject, RuleMode.WRITE, None,
354+
value = self._execute_rules(ctx, subject, RuleMode.WRITE, None, # type: ignore[arg-type]
351355
latest_schema.schema, value, get_inline_tags(parsed_schema),
352356
field_transformer)
353357
else:
354-
parsed_schema = self._parsed_schema
358+
parsed_schema = self._parsed_schema # type: ignore[assignment]
355359

356360
with _ContextStringIO() as fo:
357361
# write the record to the rest of the buffer
358362
schemaless_writer(fo, parsed_schema, value)
359363
buffer = fo.getvalue()
360364

361-
if latest_schema is not None:
365+
if latest_schema is not None and ctx is not None and subject is not None:
362366
buffer = self._execute_rules_with_phase(
363367
ctx, subject, RulePhase.ENCODING, RuleMode.WRITE,
364368
None, latest_schema.schema, buffer, None, None)
@@ -371,7 +375,11 @@ async def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
371375
return parsed_schema
372376

373377
named_schemas = await _resolve_named_schema(schema, self._registry)
378+
if schema.schema_str is None:
379+
raise ValueError("Schema string cannot be None")
374380
prepared_schema = _schema_loads(schema.schema_str)
381+
if prepared_schema.schema_str is None:
382+
raise ValueError("Prepared schema string cannot be None")
375383
parsed_schema = parse_schema_with_repo(
376384
prepared_schema.schema_str, named_schemas=named_schemas)
377385

@@ -482,11 +490,11 @@ async def __init_impl(
482490
if conf is not None:
483491
conf_copy.update(conf)
484492

485-
self._use_latest_version = conf_copy.pop('use.latest.version')
493+
self._use_latest_version = cast(bool, conf_copy.pop('use.latest.version'))
486494
if not isinstance(self._use_latest_version, bool):
487495
raise ValueError("use.latest.version must be a boolean value")
488496

489-
self._use_latest_with_metadata = conf_copy.pop('use.latest.with.metadata')
497+
self._use_latest_with_metadata = cast(Optional[dict], conf_copy.pop('use.latest.with.metadata'))
490498
if (self._use_latest_with_metadata is not None and
491499
not isinstance(self._use_latest_with_metadata, dict)):
492500
raise ValueError("use.latest.with.metadata must be a dict value")
@@ -510,9 +518,9 @@ async def __init_impl(
510518
.format(", ".join(conf_copy.keys())))
511519

512520
if schema:
513-
self._reader_schema = await self._get_parsed_schema(self._schema)
521+
self._reader_schema = await self._get_parsed_schema(self._schema) # type: ignore[arg-type]
514522
else:
515-
self._reader_schema = None
523+
self._reader_schema = None # type: ignore[assignment]
516524

517525
if from_dict is not None and not callable(from_dict):
518526
raise ValueError("from_dict must be callable with the signature "
@@ -570,23 +578,24 @@ async def __deserialize(
570578
payload = self._schema_id_deserializer(data, ctx, schema_id)
571579

572580
writer_schema_raw = await self._get_writer_schema(schema_id, subject)
573-
writer_schema = await self._get_parsed_schema(writer_schema_raw)
581+
writer_schema = await self._get_parsed_schema(writer_schema_raw) # type: ignore[arg-type]
574582

575583
if subject is None:
576-
subject = self._subject_name_func(ctx, writer_schema.get("name")) if ctx else None
584+
subject = self._subject_name_func(ctx, writer_schema.get("name")) if ctx else None # type: ignore[union-attr]
577585
if subject is not None:
578586
latest_schema = await self._get_reader_schema(subject)
579587

580-
payload = self._execute_rules_with_phase(
581-
ctx, subject, RulePhase.ENCODING, RuleMode.READ,
582-
None, writer_schema_raw, payload, None, None)
588+
if ctx is not None and subject is not None:
589+
payload = self._execute_rules_with_phase(
590+
ctx, subject, RulePhase.ENCODING, RuleMode.READ,
591+
None, writer_schema_raw, payload, None, None)
583592
if isinstance(payload, bytes):
584593
payload = io.BytesIO(payload)
585594

586-
if latest_schema is not None:
587-
migrations = await self._get_migrations(subject, writer_schema_raw, latest_schema, None)
595+
if latest_schema is not None and subject is not None:
596+
migrations = await self._get_migrations(subject, writer_schema_raw, latest_schema, None) # type: ignore[arg-type]
588597
reader_schema_raw = latest_schema.schema
589-
reader_schema = await self._get_parsed_schema(latest_schema.schema)
598+
reader_schema = await self._get_parsed_schema(latest_schema.schema) # type: ignore[arg-type]
590599
elif self._schema is not None:
591600
migrations = None
592601
reader_schema_raw = self._schema
@@ -596,7 +605,7 @@ async def __deserialize(
596605
reader_schema_raw = writer_schema_raw
597606
reader_schema = writer_schema
598607

599-
if migrations:
608+
if migrations and ctx is not None and subject is not None:
600609
obj_dict = schemaless_reader(payload,
601610
writer_schema,
602611
None,
@@ -610,12 +619,13 @@ async def __deserialize(
610619

611620
def field_transformer(rule_ctx, field_transform, message): return ( # noqa: E731
612621
transform(rule_ctx, reader_schema, message, field_transform))
613-
obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None,
614-
reader_schema_raw, obj_dict, get_inline_tags(reader_schema),
615-
field_transformer)
622+
if ctx is not None and subject is not None:
623+
obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None, # type: ignore[arg-type]
624+
reader_schema_raw, obj_dict, get_inline_tags(reader_schema),
625+
field_transformer)
616626

617627
if self._from_dict is not None:
618-
return self._from_dict(obj_dict, ctx)
628+
return self._from_dict(obj_dict, ctx) # type: ignore[arg-type]
619629

620630
return obj_dict
621631

@@ -625,7 +635,11 @@ async def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
625635
return parsed_schema
626636

627637
named_schemas = await _resolve_named_schema(schema, self._registry)
638+
if schema.schema_str is None:
639+
raise ValueError("Schema string cannot be None")
628640
prepared_schema = _schema_loads(schema.schema_str)
641+
if prepared_schema.schema_str is None:
642+
raise ValueError("Prepared schema string cannot be None")
629643
parsed_schema = parse_schema_with_repo(
630644
prepared_schema.schema_str, named_schemas=named_schemas)
631645

0 commit comments

Comments
 (0)