13
13
from taskiq .cli .args import TaskiqArgs
14
14
from taskiq .cli .log_collector import log_collector
15
15
from taskiq .context import Context , context_updater
16
- from taskiq .message import TaskiqMessage
16
+ from taskiq .message import BrokerMessage , TaskiqMessage
17
17
from taskiq .result import TaskiqResult
18
18
from taskiq .utils import maybe_awaitable
19
19
@@ -180,52 +180,53 @@ async def run_task( # noqa: C901, WPS210, WPS211
180
180
return result
181
181
182
182
183
- async def async_listen_messages ( # noqa: C901, WPS210, WPS213
184
- broker : AsyncBroker ,
185
- cli_args : TaskiqArgs ,
186
- ) -> None :
187
- """
188
- This function iterates over tasks asynchronously.
183
+ class Receiver :
184
+ """Class that uses as a callback handler."""
189
185
190
- It uses listen() method of an AsyncBroker
191
- to get new messages from queues.
192
-
193
- :param broker: broker to listen to.
194
- :param cli_args: CLI arguments for worker.
195
- """
196
- logger .info ("Runing startup event." )
197
- await broker .startup ()
198
- executor = ThreadPoolExecutor (
199
- max_workers = cli_args .max_threadpool_threads ,
200
- )
201
- logger .info ("Listening started." )
202
- task_signatures : Dict [str , inspect .Signature ] = {}
203
- for task in broker .available_tasks .values ():
186
+ def __init__ (self , broker : AsyncBroker , cli_args : TaskiqArgs ) -> None :
187
+ self .broker = broker
188
+ self .cli_args = cli_args
189
+ self .task_signatures : Dict [str , inspect .Signature ] = {}
204
190
if not cli_args .no_parse :
205
- task_signatures [task .task_name ] = inspect .signature (task .original_func )
206
- async for message in broker .listen ():
191
+ for task in self .broker .available_tasks .values ():
192
+ self .task_signatures [task .task_name ] = inspect .signature (
193
+ task .original_func ,
194
+ )
195
+ self .executor = ThreadPoolExecutor (
196
+ max_workers = cli_args .max_threadpool_threads ,
197
+ )
198
+
199
+ async def callback (self , message : BrokerMessage ) -> None : # noqa: C901
200
+ """
201
+ Receive new message and execute tasks.
202
+
203
+ This method is used to process message,
204
+ that came from brokers.
205
+
206
+ :param message: received message.
207
+ """
207
208
logger .debug (f"Received message: { message } " )
208
- if message .task_name not in broker .available_tasks :
209
+ if message .task_name not in self . broker .available_tasks :
209
210
logger .warning (
210
211
'task "%s" is not found. Maybe you forgot to import it?' ,
211
212
message .task_name ,
212
213
)
213
- continue
214
+ return
214
215
logger .debug (
215
216
"Function for task %s is resolved. Executing..." ,
216
217
message .task_name ,
217
218
)
218
219
try :
219
- taskiq_msg = broker .formatter .loads (message = message )
220
+ taskiq_msg = self . broker .formatter .loads (message = message )
220
221
except Exception as exc :
221
222
logger .warning (
222
223
"Cannot parse message: %s. Skipping execution.\n %s" ,
223
224
message ,
224
225
exc ,
225
226
exc_info = True ,
226
227
)
227
- continue
228
- for middleware in broker .middlewares :
228
+ return
229
+ for middleware in self . broker .middlewares :
229
230
if middleware .__class__ .pre_execute != TaskiqMiddleware .pre_execute :
230
231
taskiq_msg = await maybe_awaitable (
231
232
middleware .pre_execute (
@@ -238,23 +239,44 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
238
239
taskiq_msg .task_name ,
239
240
taskiq_msg .task_id ,
240
241
)
241
- with context_updater (Context (taskiq_msg , broker )):
242
+ with context_updater (Context (taskiq_msg , self . broker )):
242
243
result = await run_task (
243
- target = broker .available_tasks [message .task_name ].original_func ,
244
- signature = task_signatures .get (message .task_name ),
244
+ target = self . broker .available_tasks [message .task_name ].original_func ,
245
+ signature = self . task_signatures .get (message .task_name ),
245
246
message = taskiq_msg ,
246
- log_collector_format = cli_args .log_collector_format ,
247
- executor = executor ,
248
- middlewares = broker .middlewares ,
247
+ log_collector_format = self . cli_args .log_collector_format ,
248
+ executor = self . executor ,
249
+ middlewares = self . broker .middlewares ,
249
250
)
250
- for middleware in broker .middlewares :
251
+ for middleware in self . broker .middlewares :
251
252
if middleware .__class__ .post_execute != TaskiqMiddleware .post_execute :
252
253
await maybe_awaitable (middleware .post_execute (taskiq_msg , result ))
253
254
try :
254
- await broker .result_backend .set_result (message .task_id , result )
255
+ await self . broker .result_backend .set_result (message .task_id , result )
255
256
except Exception as exc :
256
257
logger .exception (
257
258
"Can't set result in result backend. Cause: %s" ,
258
259
exc ,
259
260
exc_info = True ,
260
261
)
262
+
263
+
264
+ async def async_listen_messages (
265
+ broker : AsyncBroker ,
266
+ cli_args : TaskiqArgs ,
267
+ ) -> None :
268
+ """
269
+ This function iterates over tasks asynchronously.
270
+
271
+ It uses listen() method of an AsyncBroker
272
+ to get new messages from queues.
273
+
274
+ :param broker: broker to listen to.
275
+ :param cli_args: CLI arguments for worker.
276
+ """
277
+ logger .info ("Runing startup event." )
278
+ await broker .startup ()
279
+ logger .info ("Inicialized receiver." )
280
+ receiver = Receiver (broker , cli_args )
281
+ logger .info ("Listening started." )
282
+ await broker .listen (receiver .callback )
0 commit comments