Skip to content

Commit 1313065

Browse files
committed
Merge tag '0.0.7' into develop
0.0.7
2 parents 289cec8 + 8694c7d commit 1313065

File tree

6 files changed

+85
-47
lines changed

6 files changed

+85
-47
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq"
3-
version = "0.0.6"
3+
version = "0.0.7"
44
description = "Distributed task queue with full async support"
55
authors = ["Pavel Kirilin <[email protected]>"]
66
maintainers = ["Pavel Kirilin <[email protected]>"]

taskiq/abc/broker.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
Any,
1010
AsyncGenerator,
1111
Callable,
12+
Coroutine,
1213
Dict,
1314
List,
15+
NoReturn,
1416
Optional,
1517
TypeVar,
1618
Union,
@@ -120,14 +122,17 @@ async def kick(
120122
"""
121123

122124
@abstractmethod
123-
def listen(self) -> AsyncGenerator[BrokerMessage, None]:
125+
async def listen(
126+
self,
127+
callback: Callable[[BrokerMessage], Coroutine[Any, Any, None]],
128+
) -> None:
124129
"""
125130
This function listens to new messages and yields them.
126131
127132
This it the main point for workers.
128133
This function is used to get new tasks from the network.
129134
130-
:yields: taskiq messages.
135+
:param callback: function to call when message received.
131136
:return: nothing.
132137
"""
133138

taskiq/brokers/inmemory_broker.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import inspect
22
from collections import OrderedDict
33
from concurrent.futures import ThreadPoolExecutor
4-
from typing import Any, AsyncGenerator, Callable, Optional, TypeVar
4+
from typing import Any, Callable, Coroutine, Optional, TypeVar
55

66
from taskiq.abc.broker import AsyncBroker
77
from taskiq.abc.result_backend import AsyncResultBackend, TaskiqResult
@@ -139,13 +139,17 @@ async def kick(self, message: BrokerMessage) -> None:
139139
except Exception as exc:
140140
raise ResultSetError("Cannot set result.") from exc
141141

142-
async def listen(self) -> AsyncGenerator[BrokerMessage, None]: # type: ignore
142+
async def listen(
143+
self,
144+
callback: Callable[[BrokerMessage], Coroutine[Any, Any, None]],
145+
) -> None:
143146
"""
144147
Inmemory broker cannot listen.
145148
146149
This method throws RuntimeError if you call it.
147150
Because inmemory broker cannot really listen to any of tasks.
148151
152+
:param callback: message callback.
149153
:raises RuntimeError: if this method is called.
150154
"""
151155
raise RuntimeError("Inmemory brokers cannot listen.")

taskiq/brokers/shared_broker.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import AsyncGenerator, Optional, TypeVar
1+
from typing import Any, Callable, Coroutine, Optional, TypeVar
22

33
from taskiq.abc.broker import AsyncBroker
44
from taskiq.decor import AsyncTaskiqDecoratedTask
@@ -59,12 +59,16 @@ async def kick(self, message: BrokerMessage) -> None:
5959
"without setting the default_broker.",
6060
)
6161

62-
async def listen(self) -> AsyncGenerator[BrokerMessage, None]: # type: ignore
62+
async def listen(
63+
self,
64+
callback: Callable[[BrokerMessage], Coroutine[Any, Any, None]],
65+
) -> None: # type: ignore
6366
"""
6467
Shared broker cannot listen to tasks.
6568
6669
This method will throw an exception.
6770
71+
:param callback: message callback.
6872
:raises TaskiqError: if called.
6973
"""
7074
raise TaskiqError("Shared broker cannot listen")

taskiq/brokers/zmq_broker.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import AsyncGenerator, Callable, Optional, TypeVar
1+
from typing import Any, Callable, Coroutine, Optional, TypeVar
22

33
from taskiq.abc.broker import AsyncBroker
44
from taskiq.abc.result_backend import AsyncResultBackend
@@ -58,12 +58,15 @@ async def kick(self, message: BrokerMessage) -> None:
5858
with self.socket.connect(self.sub_host) as sock:
5959
await sock.send_string(message.json())
6060

61-
async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
61+
async def listen(
62+
self,
63+
callback: Callable[[BrokerMessage], Coroutine[Any, Any, None]],
64+
) -> None:
6265
"""
6366
Start accepting new messages.
6467
65-
:yield: received broker message
68+
:param callback: function to call when message received.
6669
"""
6770
while True: # noqa: WPS457
6871
with self.socket.connect(self.sub_host) as sock:
69-
yield BrokerMessage.parse_raw(await sock.recv_string())
72+
await callback(BrokerMessage.parse_raw(await sock.recv_string()))

taskiq/cli/async_task_runner.py

Lines changed: 58 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from taskiq.cli.args import TaskiqArgs
1414
from taskiq.cli.log_collector import log_collector
1515
from taskiq.context import Context, context_updater
16-
from taskiq.message import TaskiqMessage
16+
from taskiq.message import BrokerMessage, TaskiqMessage
1717
from taskiq.result import TaskiqResult
1818
from taskiq.utils import maybe_awaitable
1919

@@ -180,52 +180,53 @@ async def run_task( # noqa: C901, WPS210, WPS211
180180
return result
181181

182182

183-
async def async_listen_messages( # noqa: C901, WPS210, WPS213
184-
broker: AsyncBroker,
185-
cli_args: TaskiqArgs,
186-
) -> None:
187-
"""
188-
This function iterates over tasks asynchronously.
183+
class Receiver:
184+
"""Class that uses as a callback handler."""
189185

190-
It uses listen() method of an AsyncBroker
191-
to get new messages from queues.
192-
193-
:param broker: broker to listen to.
194-
:param cli_args: CLI arguments for worker.
195-
"""
196-
logger.info("Runing startup event.")
197-
await broker.startup()
198-
executor = ThreadPoolExecutor(
199-
max_workers=cli_args.max_threadpool_threads,
200-
)
201-
logger.info("Listening started.")
202-
task_signatures: Dict[str, inspect.Signature] = {}
203-
for task in broker.available_tasks.values():
186+
def __init__(self, broker: AsyncBroker, cli_args: TaskiqArgs) -> None:
187+
self.broker = broker
188+
self.cli_args = cli_args
189+
self.task_signatures: Dict[str, inspect.Signature] = {}
204190
if not cli_args.no_parse:
205-
task_signatures[task.task_name] = inspect.signature(task.original_func)
206-
async for message in broker.listen():
191+
for task in self.broker.available_tasks.values():
192+
self.task_signatures[task.task_name] = inspect.signature(
193+
task.original_func,
194+
)
195+
self.executor = ThreadPoolExecutor(
196+
max_workers=cli_args.max_threadpool_threads,
197+
)
198+
199+
async def callback(self, message: BrokerMessage) -> None: # noqa: C901
200+
"""
201+
Receive new message and execute tasks.
202+
203+
This method is used to process message,
204+
that came from brokers.
205+
206+
:param message: received message.
207+
"""
207208
logger.debug(f"Received message: {message}")
208-
if message.task_name not in broker.available_tasks:
209+
if message.task_name not in self.broker.available_tasks:
209210
logger.warning(
210211
'task "%s" is not found. Maybe you forgot to import it?',
211212
message.task_name,
212213
)
213-
continue
214+
return
214215
logger.debug(
215216
"Function for task %s is resolved. Executing...",
216217
message.task_name,
217218
)
218219
try:
219-
taskiq_msg = broker.formatter.loads(message=message)
220+
taskiq_msg = self.broker.formatter.loads(message=message)
220221
except Exception as exc:
221222
logger.warning(
222223
"Cannot parse message: %s. Skipping execution.\n %s",
223224
message,
224225
exc,
225226
exc_info=True,
226227
)
227-
continue
228-
for middleware in broker.middlewares:
228+
return
229+
for middleware in self.broker.middlewares:
229230
if middleware.__class__.pre_execute != TaskiqMiddleware.pre_execute:
230231
taskiq_msg = await maybe_awaitable(
231232
middleware.pre_execute(
@@ -238,23 +239,44 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
238239
taskiq_msg.task_name,
239240
taskiq_msg.task_id,
240241
)
241-
with context_updater(Context(taskiq_msg, broker)):
242+
with context_updater(Context(taskiq_msg, self.broker)):
242243
result = await run_task(
243-
target=broker.available_tasks[message.task_name].original_func,
244-
signature=task_signatures.get(message.task_name),
244+
target=self.broker.available_tasks[message.task_name].original_func,
245+
signature=self.task_signatures.get(message.task_name),
245246
message=taskiq_msg,
246-
log_collector_format=cli_args.log_collector_format,
247-
executor=executor,
248-
middlewares=broker.middlewares,
247+
log_collector_format=self.cli_args.log_collector_format,
248+
executor=self.executor,
249+
middlewares=self.broker.middlewares,
249250
)
250-
for middleware in broker.middlewares:
251+
for middleware in self.broker.middlewares:
251252
if middleware.__class__.post_execute != TaskiqMiddleware.post_execute:
252253
await maybe_awaitable(middleware.post_execute(taskiq_msg, result))
253254
try:
254-
await broker.result_backend.set_result(message.task_id, result)
255+
await self.broker.result_backend.set_result(message.task_id, result)
255256
except Exception as exc:
256257
logger.exception(
257258
"Can't set result in result backend. Cause: %s",
258259
exc,
259260
exc_info=True,
260261
)
262+
263+
264+
async def async_listen_messages(
265+
broker: AsyncBroker,
266+
cli_args: TaskiqArgs,
267+
) -> None:
268+
"""
269+
This function iterates over tasks asynchronously.
270+
271+
It uses listen() method of an AsyncBroker
272+
to get new messages from queues.
273+
274+
:param broker: broker to listen to.
275+
:param cli_args: CLI arguments for worker.
276+
"""
277+
logger.info("Runing startup event.")
278+
await broker.startup()
279+
logger.info("Inicialized receiver.")
280+
receiver = Receiver(broker, cli_args)
281+
logger.info("Listening started.")
282+
await broker.listen(receiver.callback)

0 commit comments

Comments
 (0)