|
1 | | -import inspect |
2 | | -import sys |
| 1 | +from typing import Any, Iterable, Type, TypeVar |
3 | 2 |
|
4 | 3 | __all__ = [ |
5 | 4 | # aiokafka custom errors |
@@ -83,7 +82,7 @@ class KafkaError(RuntimeError): |
83 | 82 | # whether metadata should be refreshed on error |
84 | 83 | invalid_metadata = False |
85 | 84 |
|
86 | | - def __str__(self): |
| 85 | + def __str__(self) -> str: |
87 | 86 | if not self.args: |
88 | 87 | return self.__class__.__name__ |
89 | 88 | return f"{self.__class__.__name__}: {super().__str__()}" |
@@ -140,7 +139,7 @@ class IncompatibleBrokerVersion(KafkaError): |
140 | 139 |
|
141 | 140 |
|
142 | 141 | class CommitFailedError(KafkaError): |
143 | | - def __init__(self, *args, **kwargs): |
| 142 | + def __init__(self, *args: Any, **kwargs: Any) -> None: |
144 | 143 | super().__init__( |
145 | 144 | """Commit cannot be completed since the group has already |
146 | 145 | rebalanced and assigned the partitions to another member. |
@@ -223,19 +222,21 @@ class ProducerFenced(KafkaError): |
223 | 222 |
|
224 | 223 | def __init__( |
225 | 224 | self, |
226 | | - msg="There is a newer producer using the same transactional_id or" |
227 | | - "transaction timeout occurred (check that processing time is " |
228 | | - "below transaction_timeout_ms)", |
229 | | - ): |
| 225 | + msg: str = ( |
| 226 | + "There is a newer producer using the same transactional_id or" |
| 227 | + "transaction timeout occurred (check that processing time is " |
| 228 | + "below transaction_timeout_ms)" |
| 229 | + ), |
| 230 | + ) -> None: |
230 | 231 | super().__init__(msg) |
231 | 232 |
|
232 | 233 |
|
233 | 234 | class BrokerResponseError(KafkaError): |
234 | | - errno = None |
235 | | - message = None |
236 | | - description = None |
| 235 | + errno: int |
| 236 | + message: str |
| 237 | + description: str = "" |
237 | 238 |
|
238 | | - def __str__(self): |
| 239 | + def __str__(self) -> str: |
239 | 240 | """Add errno to standard KafkaError str""" |
240 | 241 | return f"[Error {self.errno}] {super().__str__()}" |
241 | 242 |
|
@@ -859,18 +860,17 @@ class MemberIdRequired(BrokerResponseError): |
859 | 860 | ) |
860 | 861 |
|
861 | 862 |
|
862 | | -def _iter_broker_errors(): |
863 | | - for _, obj in inspect.getmembers(sys.modules[__name__]): |
864 | | - if ( |
865 | | - inspect.isclass(obj) |
866 | | - and issubclass(obj, BrokerResponseError) |
867 | | - and obj != BrokerResponseError |
868 | | - ): |
869 | | - yield obj |
| 863 | +_T = TypeVar("_T", bound=type) |
870 | 864 |
|
871 | 865 |
|
872 | | -kafka_errors = {x.errno: x for x in _iter_broker_errors()} |
| 866 | +def _iter_subclasses(cls: _T) -> Iterable[_T]: |
| 867 | + for subclass in cls.__subclasses__(): |
| 868 | + yield subclass |
| 869 | + yield from _iter_subclasses(subclass) |
873 | 870 |
|
874 | 871 |
|
875 | | -def for_code(error_code): |
| 872 | +kafka_errors = {x.errno: x for x in _iter_subclasses(BrokerResponseError)} |
| 873 | + |
| 874 | + |
| 875 | +def for_code(error_code: int) -> Type[BrokerResponseError]: |
876 | 876 | return kafka_errors.get(error_code, UnknownError) |
0 commit comments