2
2
import logging
3
3
import signal
4
4
from concurrent .futures import ThreadPoolExecutor
5
- from typing import Any
5
+ from typing import Any , Type
6
6
7
7
from taskiq .abc .broker import AsyncBroker
8
8
from taskiq .cli .utils import import_object , import_tasks
@@ -51,7 +51,21 @@ async def shutdown_broker(broker: AsyncBroker, timeout: float) -> None:
51
51
)
52
52
53
53
54
- def start_listen (args : WorkerArgs ) -> None : # noqa: WPS213
54
+ def get_receiver_type (args : WorkerArgs ) -> Type [Receiver ]:
55
+ """
56
+ Import Receiver from args.
57
+
58
+ :param args: CLI arguments.
59
+ :raises ValueError: if receiver is not a Receiver type.
60
+ :return: Receiver type.
61
+ """
62
+ receiver_type = import_object (args .receiver )
63
+ if not (isinstance (receiver_type , type ) and issubclass (receiver_type , Receiver )):
64
+ raise ValueError ("Unknown receiver type. Please use Receiver class." )
65
+ return receiver_type
66
+
67
+
68
+ def start_listen (args : WorkerArgs ) -> None : # noqa: WPS210, WPS213
55
69
"""
56
70
This function starts actual listening process.
57
71
@@ -63,6 +77,7 @@ def start_listen(args: WorkerArgs) -> None: # noqa: WPS213
63
77
64
78
:param args: CLI arguments.
65
79
:raises ValueError: if broker is not an AsyncBroker instance.
80
+ :raises ValueError: if receiver is not a Receiver type.
66
81
"""
67
82
if uvloop is not None :
68
83
logger .debug ("UVLOOP found. Installing policy." )
@@ -77,6 +92,8 @@ def start_listen(args: WorkerArgs) -> None: # noqa: WPS213
77
92
if not isinstance (broker , AsyncBroker ):
78
93
raise ValueError ("Unknown broker type. Please use AsyncBroker instance." )
79
94
95
+ receiver_type = get_receiver_type (args )
96
+
80
97
# Here how we manage interruptions.
81
98
# We have to remember shutting_down state,
82
99
# because KeyboardInterrupt can be send multiple
@@ -105,10 +122,11 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
105
122
signal .signal (signal .SIGTERM , interrupt_handler )
106
123
107
124
loop = asyncio .get_event_loop ()
125
+
108
126
try :
109
127
logger .debug ("Initialize receiver." )
110
128
with ThreadPoolExecutor (args .max_threadpool_threads ) as pool :
111
- receiver = Receiver (
129
+ receiver = receiver_type (
112
130
broker = broker ,
113
131
executor = pool ,
114
132
validate_params = not args .no_parse ,
0 commit comments