Skip to content

Commit d04f861

Browse files
committed
fix
1 parent 62f5c5b commit d04f861

File tree

6 files changed

+113
-53
lines changed

6 files changed

+113
-53
lines changed

src/confluent_kafka/schema_registry/_async/avro.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# limitations under the License.
1717
import io
1818
import json
19-
from typing import Dict, Union, Optional, Callable
19+
from typing import Any, Coroutine, Dict, Union, Optional, Callable, cast
2020

2121
from fastavro import schemaless_reader, schemaless_writer
2222
from confluent_kafka.schema_registry.common import asyncinit
@@ -206,7 +206,7 @@ async def __init_impl(
206206
self._registry = schema_registry_client
207207
self._schema_id = None
208208
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
209-
self._known_subjects = set()
209+
self._known_subjects: set[str] = set()
210210
self._parsed_schemas = ParsedSchemaCache()
211211

212212
if to_dict is not None and not callable(to_dict):
@@ -243,11 +243,17 @@ async def __init_impl(
243243
not isinstance(self._use_latest_with_metadata, dict)):
244244
raise ValueError("use.latest.with.metadata must be a dict value")
245245

246-
self._subject_name_func = conf_copy.pop('subject.name.strategy')
246+
self._subject_name_func = cast(
247+
Callable[[Optional[SerializationContext], Optional[str]], Optional[str]],
248+
conf_copy.pop('subject.name.strategy')
249+
)
247250
if not callable(self._subject_name_func):
248251
raise ValueError("subject.name.strategy must be callable")
249252

250-
self._schema_id_serializer = conf_copy.pop('schema.id.serializer')
253+
self._schema_id_serializer = cast(
254+
Callable[[bytes, Optional[SerializationContext], Any], bytes],
255+
conf_copy.pop('schema.id.serializer')
256+
)
251257
if not callable(self._schema_id_serializer):
252258
raise ValueError("schema.id.serializer must be callable")
253259

@@ -286,7 +292,7 @@ async def __init_impl(
286292

287293
__init__ = __init_impl
288294

289-
def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
295+
def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> Coroutine[Any, Any, Optional[bytes]]:
290296
return self.__serialize(obj, ctx)
291297

292298
async def __serialize(self, obj: object, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
@@ -486,11 +492,17 @@ async def __init_impl(
486492
not isinstance(self._use_latest_with_metadata, dict)):
487493
raise ValueError("use.latest.with.metadata must be a dict value")
488494

489-
self._subject_name_func = conf_copy.pop('subject.name.strategy')
495+
self._subject_name_func = cast(
496+
Callable[[Optional[SerializationContext], Optional[str]], Optional[str]],
497+
conf_copy.pop('subject.name.strategy')
498+
)
490499
if not callable(self._subject_name_func):
491500
raise ValueError("subject.name.strategy must be callable")
492501

493-
self._schema_id_deserializer = conf_copy.pop('schema.id.deserializer')
502+
self._schema_id_deserializer = cast(
503+
Callable[[bytes, Optional[SerializationContext], Any], io.BytesIO],
504+
conf_copy.pop('schema.id.deserializer')
505+
)
494506
if not callable(self._schema_id_deserializer):
495507
raise ValueError("schema.id.deserializer must be callable")
496508

@@ -518,11 +530,11 @@ async def __init_impl(
518530

519531
__init__ = __init_impl
520532

521-
def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Union[dict, object, None]:
533+
def __call__(self, data: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Coroutine[Any, Any, Union[dict, object, None]]:
522534
return self.__deserialize(data, ctx)
523535

524536
async def __deserialize(
525-
self, data: bytes, ctx: Optional[SerializationContext] = None) -> Union[dict, object, None]:
537+
self, data: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Union[dict, object, None]:
526538
"""
527539
Deserialize Avro binary encoded data with Confluent Schema Registry framing to
528540
a dict, or object instance according to from_dict, if specified.

src/confluent_kafka/schema_registry/_async/json_schema.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# limitations under the License.
1717
import io
1818
import orjson
19-
from typing import Union, Optional, Tuple, Callable
19+
from typing import Any, Coroutine, Union, Optional, Tuple, Callable, cast
2020

2121
from cachetools import LRUCache
2222
from jsonschema import ValidationError
@@ -226,7 +226,7 @@ async def __init_impl(
226226
rule_registry if rule_registry else RuleRegistry.get_global_instance()
227227
)
228228
self._schema_id = None
229-
self._known_subjects = set()
229+
self._known_subjects: set[str] = set()
230230
self._parsed_schemas = ParsedSchemaCache()
231231
self._validators = LRUCache(1000)
232232

@@ -264,11 +264,17 @@ async def __init_impl(
264264
not isinstance(self._use_latest_with_metadata, dict)):
265265
raise ValueError("use.latest.with.metadata must be a dict value")
266266

267-
self._subject_name_func = conf_copy.pop('subject.name.strategy')
267+
self._subject_name_func = cast(
268+
Callable[[Optional[SerializationContext], Optional[str]], Optional[str]],
269+
conf_copy.pop('subject.name.strategy')
270+
)
268271
if not callable(self._subject_name_func):
269272
raise ValueError("subject.name.strategy must be callable")
270273

271-
self._schema_id_serializer = conf_copy.pop('schema.id.serializer')
274+
self._schema_id_serializer = cast(
275+
Callable[[bytes, Optional[SerializationContext], Any], bytes],
276+
conf_copy.pop('schema.id.serializer')
277+
)
272278
if not callable(self._schema_id_serializer):
273279
raise ValueError("schema.id.serializer must be callable")
274280

@@ -296,7 +302,7 @@ async def __init_impl(
296302

297303
__init__ = __init_impl
298304

299-
def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
305+
def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> Coroutine[Any, Any, Optional[bytes]]:
300306
return self.__serialize(obj, ctx)
301307

302308
async def __serialize(self, obj: object, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
@@ -525,12 +531,18 @@ async def __init_impl(
525531
not isinstance(self._use_latest_with_metadata, dict)):
526532
raise ValueError("use.latest.with.metadata must be a dict value")
527533

528-
self._subject_name_func = conf_copy.pop('subject.name.strategy')
534+
self._subject_name_func = cast(
535+
Callable[[Optional[SerializationContext], Optional[str]], Optional[str]],
536+
conf_copy.pop('subject.name.strategy')
537+
)
529538
if not callable(self._subject_name_func):
530539
raise ValueError("subject.name.strategy must be callable")
531540

532-
self._schema_id_deserializer = conf_copy.pop('schema.id.deserializer')
533-
if not callable(self._subject_name_func):
541+
self._schema_id_deserializer = cast(
542+
Callable[[bytes, Optional[SerializationContext], Any], io.BytesIO],
543+
conf_copy.pop('schema.id.deserializer')
544+
)
545+
if not callable(self._schema_id_deserializer):
534546
raise ValueError("schema.id.deserializer must be callable")
535547

536548
self._validate = conf_copy.pop('validate')
@@ -558,10 +570,10 @@ async def __init_impl(
558570

559571
__init__ = __init_impl
560572

561-
def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
573+
def __call__(self, data: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Coroutine[Any, Any, Optional[bytes]]:
562574
return self.__deserialize(data, ctx)
563575

564-
async def __deserialize(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
576+
async def __deserialize(self, data: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
565577
"""
566578
Deserialize a JSON encoded record with Confluent Schema Registry framing to
567579
a dict, or object instance according to from_dict if from_dict is specified.

src/confluent_kafka/schema_registry/_async/protobuf.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# limitations under the License.
1717

1818
import io
19-
from typing import Set, List, Union, Optional, Tuple
19+
from typing import Any, Coroutine, Set, List, Union, Optional, Tuple, Callable, cast
2020

2121
from google.protobuf import json_format, descriptor_pb2
2222
from google.protobuf.descriptor_pool import DescriptorPool
@@ -272,7 +272,7 @@ async def __init_impl(
272272
self._registry = schema_registry_client
273273
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
274274
self._schema_id = None
275-
self._known_subjects = set()
275+
self._known_subjects: set[str] = set()
276276
self._msg_class = msg_type
277277
self._parsed_schemas = ParsedSchemaCache()
278278

@@ -360,7 +360,7 @@ async def _resolve_dependencies(
360360
reference.version))
361361
return schema_refs
362362

363-
def __call__(self, message: Message, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
363+
def __call__(self, message: Message, ctx: Optional[SerializationContext] = None) -> Coroutine[Any, Any, Optional[bytes]]:
364364
return self.__serialize(message, ctx)
365365

366366
async def __serialize(self, message: Message, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
@@ -535,11 +535,17 @@ async def __init_impl(
535535
not isinstance(self._use_latest_with_metadata, dict)):
536536
raise ValueError("use.latest.with.metadata must be a dict value")
537537

538-
self._subject_name_func = conf_copy.pop('subject.name.strategy')
538+
self._subject_name_func = cast(
539+
Callable[[Optional[SerializationContext], Optional[str]], Optional[str]],
540+
conf_copy.pop('subject.name.strategy')
541+
)
539542
if not callable(self._subject_name_func):
540543
raise ValueError("subject.name.strategy must be callable")
541544

542-
self._schema_id_deserializer = conf_copy.pop('schema.id.deserializer')
545+
self._schema_id_deserializer = cast(
546+
Callable[[bytes, Optional[SerializationContext], Any], io.BytesIO],
547+
conf_copy.pop('schema.id.deserializer')
548+
)
543549
if not callable(self._schema_id_deserializer):
544550
raise ValueError("schema.id.deserializer must be callable")
545551

@@ -558,10 +564,10 @@ async def __init_impl(
558564

559565
__init__ = __init_impl
560566

561-
def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
567+
def __call__(self, data: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Coroutine[Any, Any, Optional[bytes]]:
562568
return self.__deserialize(data, ctx)
563569

564-
async def __deserialize(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
570+
async def __deserialize(self, data: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
565571
"""
566572
Deserialize a serialized protobuf message with Confluent Schema Registry
567573
framing.
@@ -601,7 +607,7 @@ async def __deserialize(self, data: bytes, ctx: Optional[SerializationContext] =
601607
if subject is None:
602608
subject = self._subject_name_func(ctx, writer_desc.full_name)
603609
if subject is not None:
604-
latest_schema = self._get_reader_schema(subject, fmt='serialized')
610+
latest_schema = await self._get_reader_schema(subject, fmt='serialized')
605611
else:
606612
writer_schema_raw = None
607613
writer_schema = None

src/confluent_kafka/schema_registry/_sync/avro.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# limitations under the License.
1717
import io
1818
import json
19-
from typing import Dict, Union, Optional, Callable
19+
from typing import Any, Coroutine, Dict, Union, Optional, Callable, cast
2020

2121
from fastavro import schemaless_reader, schemaless_writer
2222

@@ -206,7 +206,7 @@ def __init_impl(
206206
self._registry = schema_registry_client
207207
self._schema_id = None
208208
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
209-
self._known_subjects = set()
209+
self._known_subjects: set[str] = set()
210210
self._parsed_schemas = ParsedSchemaCache()
211211

212212
if to_dict is not None and not callable(to_dict):
@@ -243,11 +243,17 @@ def __init_impl(
243243
not isinstance(self._use_latest_with_metadata, dict)):
244244
raise ValueError("use.latest.with.metadata must be a dict value")
245245

246-
self._subject_name_func = conf_copy.pop('subject.name.strategy')
246+
self._subject_name_func = cast(
247+
Callable[[Optional[SerializationContext], Optional[str]], Optional[str]],
248+
conf_copy.pop('subject.name.strategy')
249+
)
247250
if not callable(self._subject_name_func):
248251
raise ValueError("subject.name.strategy must be callable")
249252

250-
self._schema_id_serializer = conf_copy.pop('schema.id.serializer')
253+
self._schema_id_serializer = cast(
254+
Callable[[bytes, Optional[SerializationContext], Any], bytes],
255+
conf_copy.pop('schema.id.serializer')
256+
)
251257
if not callable(self._schema_id_serializer):
252258
raise ValueError("schema.id.serializer must be callable")
253259

@@ -286,7 +292,7 @@ def __init_impl(
286292

287293
__init__ = __init_impl
288294

289-
def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
295+
def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> Coroutine[Any, Any, Optional[bytes]]:
290296
return self.__serialize(obj, ctx)
291297

292298
def __serialize(self, obj: object, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
@@ -486,11 +492,17 @@ def __init_impl(
486492
not isinstance(self._use_latest_with_metadata, dict)):
487493
raise ValueError("use.latest.with.metadata must be a dict value")
488494

489-
self._subject_name_func = conf_copy.pop('subject.name.strategy')
495+
self._subject_name_func = cast(
496+
Callable[[Optional[SerializationContext], Optional[str]], Optional[str]],
497+
conf_copy.pop('subject.name.strategy')
498+
)
490499
if not callable(self._subject_name_func):
491500
raise ValueError("subject.name.strategy must be callable")
492501

493-
self._schema_id_deserializer = conf_copy.pop('schema.id.deserializer')
502+
self._schema_id_deserializer = cast(
503+
Callable[[bytes, Optional[SerializationContext], Any], io.BytesIO],
504+
conf_copy.pop('schema.id.deserializer')
505+
)
494506
if not callable(self._schema_id_deserializer):
495507
raise ValueError("schema.id.deserializer must be callable")
496508

@@ -518,11 +530,11 @@ def __init_impl(
518530

519531
__init__ = __init_impl
520532

521-
def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Union[dict, object, None]:
533+
def __call__(self, data: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Coroutine[Any, Any, Union[dict, object, None]]:
522534
return self.__deserialize(data, ctx)
523535

524536
def __deserialize(
525-
self, data: bytes, ctx: Optional[SerializationContext] = None) -> Union[dict, object, None]:
537+
self, data: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Union[dict, object, None]:
526538
"""
527539
Deserialize Avro binary encoded data with Confluent Schema Registry framing to
528540
a dict, or object instance according to from_dict, if specified.

src/confluent_kafka/schema_registry/_sync/json_schema.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# limitations under the License.
1717
import io
1818
import orjson
19-
from typing import Union, Optional, Tuple, Callable
19+
from typing import Any, Coroutine, Union, Optional, Tuple, Callable, cast
2020

2121
from cachetools import LRUCache
2222
from jsonschema import ValidationError
@@ -226,7 +226,7 @@ def __init_impl(
226226
rule_registry if rule_registry else RuleRegistry.get_global_instance()
227227
)
228228
self._schema_id = None
229-
self._known_subjects = set()
229+
self._known_subjects: set[str] = set()
230230
self._parsed_schemas = ParsedSchemaCache()
231231
self._validators = LRUCache(1000)
232232

@@ -264,11 +264,17 @@ def __init_impl(
264264
not isinstance(self._use_latest_with_metadata, dict)):
265265
raise ValueError("use.latest.with.metadata must be a dict value")
266266

267-
self._subject_name_func = conf_copy.pop('subject.name.strategy')
267+
self._subject_name_func = cast(
268+
Callable[[Optional[SerializationContext], Optional[str]], Optional[str]],
269+
conf_copy.pop('subject.name.strategy')
270+
)
268271
if not callable(self._subject_name_func):
269272
raise ValueError("subject.name.strategy must be callable")
270273

271-
self._schema_id_serializer = conf_copy.pop('schema.id.serializer')
274+
self._schema_id_serializer = cast(
275+
Callable[[bytes, Optional[SerializationContext], Any], bytes],
276+
conf_copy.pop('schema.id.serializer')
277+
)
272278
if not callable(self._schema_id_serializer):
273279
raise ValueError("schema.id.serializer must be callable")
274280

@@ -296,7 +302,7 @@ def __init_impl(
296302

297303
__init__ = __init_impl
298304

299-
def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
305+
def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> Coroutine[Any, Any, Optional[bytes]]:
300306
return self.__serialize(obj, ctx)
301307

302308
def __serialize(self, obj: object, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
@@ -525,12 +531,18 @@ def __init_impl(
525531
not isinstance(self._use_latest_with_metadata, dict)):
526532
raise ValueError("use.latest.with.metadata must be a dict value")
527533

528-
self._subject_name_func = conf_copy.pop('subject.name.strategy')
534+
self._subject_name_func = cast(
535+
Callable[[Optional[SerializationContext], Optional[str]], Optional[str]],
536+
conf_copy.pop('subject.name.strategy')
537+
)
529538
if not callable(self._subject_name_func):
530539
raise ValueError("subject.name.strategy must be callable")
531540

532-
self._schema_id_deserializer = conf_copy.pop('schema.id.deserializer')
533-
if not callable(self._subject_name_func):
541+
self._schema_id_deserializer = cast(
542+
Callable[[bytes, Optional[SerializationContext], Any], io.BytesIO],
543+
conf_copy.pop('schema.id.deserializer')
544+
)
545+
if not callable(self._schema_id_deserializer):
534546
raise ValueError("schema.id.deserializer must be callable")
535547

536548
self._validate = conf_copy.pop('validate')
@@ -558,10 +570,10 @@ def __init_impl(
558570

559571
__init__ = __init_impl
560572

561-
def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
573+
def __call__(self, data: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Coroutine[Any, Any, Optional[bytes]]:
562574
return self.__deserialize(data, ctx)
563575

564-
def __deserialize(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
576+
def __deserialize(self, data: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
565577
"""
566578
Deserialize a JSON encoded record with Confluent Schema Registry framing to
567579
a dict, or object instance according to from_dict if from_dict is specified.

0 commit comments

Comments
 (0)