Skip to content

Commit 66623fe

Browse files
authored
Merge pull request #25 from taskiq-python/feature/zmq
Added ZeroMQ broker, fixed multiprocessing.
2 parents 7830ce4 + 2aeb77a commit 66623fe

File tree

8 files changed

+417
-142
lines changed

8 files changed

+417
-142
lines changed

README.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ Or you can let taskiq find all python modules named tasks in current directory r
154154
taskiq test_project.broker:broker -fsd
155155
```
156156

157+
If you have uvloop installed, taskiq will automatically install new policies to event loop.
158+
157159
You can always run `--help` to see all possible options.
158160

159161

@@ -262,3 +264,48 @@ async def main():
262264
asyncio.run(main())
263265

264266
```
267+
268+
269+
# Available Brokers
270+
271+
Taskiq has several brokers out of the box:
272+
* InMemoryBroker
273+
* ZeroMQBroker
274+
275+
## InMemoryBroker
276+
277+
This broker is created for development purpose.
278+
You can easily use it without setting up workers for your project.
279+
280+
It works the same as real brokers, but with some limitations.
281+
282+
1. It cannot use `pre_execute` and `post_execute` hooks in middlewares.
283+
2. You cannot use it in multiprocessing applications without real result_backend.
284+
285+
This broker is sutable for local development only.
286+
287+
InMemoryBroker parameters:
288+
* `sync_tasks_pool_size` - number of threads for threadpool executor.
289+
All sync functions are executed in threadpool.
290+
* `logs_format` - format which is used to collect logs from task execution.
291+
* `max_stored_results` - maximum number of results that is stored in memory. This number is used only if no custom result backend is provided.
292+
* `cast_types` - whether to use agressive type cast for types.
293+
* `result_backend` - custom result backend. By default
294+
it uses `InmemoryResultBackend`.
295+
* `task_id_generator` - custom function to generate task ids.
296+
297+
## ZeroMQBroker
298+
299+
ZeroMQ is not available by default. To enable it. Please install [pyzmq](https://pypi.org/project/pyzmq/),
300+
or you can install `taskiq[zmq]`.
301+
302+
This broker doesn't have limitations, but it requires you to set up a worker using taskiq CLI.
303+
Also please ensure that worker process is started after your application is ready.
304+
305+
Because ZMQ publishes messages in socket and all worker processes will connect to it.
306+
307+
ZeroMQBroker parameters:
308+
* `zmq_pub_host` - host that used to publish messages.
309+
* `zmq_sub_host` - host to subscribe to publisher.
310+
* `result_backend` - custom result backend.
311+
* `task_id_generator` - custom function to generate task ids.

poetry.lock

Lines changed: 239 additions & 44 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ keywords = ["taskiq", "tasks", "distributed", "async"]
2727
python = "^3.7"
2828
typing-extensions = ">=3.10.0.0"
2929
pydantic = "^1.6.2"
30+
pyzmq = { version = "^23.2.0", optional = true }
31+
uvloop = { version = "^0.16.0", optional = true }
3032

3133
[tool.poetry.dev-dependencies]
3234
pytest = "^7.1.2"
@@ -41,6 +43,10 @@ wemake-python-styleguide = "^0.16.1"
4143
coverage = "^6.4.2"
4244
pytest-cov = "^3.0.0"
4345

46+
[tool.poetry.extras]
47+
zmq = ["pyzmq"]
48+
uv = ["uvloop"]
49+
4450
[tool.poetry.scripts]
4551
taskiq = "taskiq.__main__:main"
4652

@@ -54,6 +60,7 @@ show_error_codes = true
5460
implicit_reexport = true
5561
allow_untyped_decorators = true
5662
warn_return_any = false
63+
warn_unused_ignores = false
5764

5865
[tool.isort]
5966
profile = "black"

taskiq/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
from taskiq.abc.formatter import TaskiqFormatter
44
from taskiq.abc.middleware import TaskiqMiddleware
55
from taskiq.abc.result_backend import AsyncResultBackend
6+
from taskiq.brokers.inmemory_broker import InMemoryBroker
67
from taskiq.brokers.shared_broker import async_shared_broker
8+
from taskiq.brokers.zmq_broker import ZeroMQBroker
79
from taskiq.exceptions import TaskiqError
810
from taskiq.message import BrokerMessage, TaskiqMessage
911
from taskiq.result import TaskiqResult
@@ -13,8 +15,10 @@
1315
"AsyncBroker",
1416
"TaskiqError",
1517
"TaskiqResult",
18+
"ZeroMQBroker",
1619
"TaskiqMessage",
1720
"BrokerMessage",
21+
"InMemoryBroker",
1822
"TaskiqFormatter",
1923
"AsyncTaskiqTask",
2024
"TaskiqMiddleware",

taskiq/abc/broker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class AsyncBroker(ABC):
5757
"""
5858

5959
available_tasks: Dict[str, AsyncTaskiqDecoratedTask[Any, Any]] = {}
60+
is_worker_process: bool = False
6061

6162
def __init__(
6263
self,
@@ -69,7 +70,6 @@ def __init__(
6970
task_id_generator = default_id_generator
7071
self.middlewares: "List[TaskiqMiddleware]" = []
7172
self.result_backend = result_backend
72-
self.is_worker_process = False
7373
self.decorator_class = AsyncTaskiqDecoratedTask
7474
self.formatter: "TaskiqFormatter" = JSONFormatter()
7575
self.id_generator = task_id_generator
@@ -101,6 +101,7 @@ async def shutdown(self) -> None:
101101
middleware_shutdown = middleware.shutdown()
102102
if inspect.isawaitable(middleware_shutdown):
103103
await middleware_shutdown
104+
await self.result_backend.shutdown()
104105

105106
@abstractmethod
106107
async def kick(

taskiq/brokers/zmq_broker.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from typing import AsyncGenerator, Callable, Optional, TypeVar
2+
3+
from taskiq.abc.broker import AsyncBroker
4+
from taskiq.abc.result_backend import AsyncResultBackend
5+
from taskiq.message import BrokerMessage
6+
7+
try:
8+
import zmq # noqa: WPS433
9+
from zmq.asyncio import Context # noqa: WPS433
10+
except ImportError:
11+
zmq = None # type: ignore
12+
13+
_T = TypeVar("_T") # noqa: WPS111
14+
15+
16+
class ZeroMQBroker(AsyncBroker):
17+
"""
18+
ZeroMQ broker.
19+
20+
This broker starts a socket ON A CLIENT SIDE,
21+
and all workers connect to this socket using sub_host.
22+
23+
If you're using this socket you have to be sure,
24+
that your workers start after the client is ready.
25+
"""
26+
27+
def __init__(
28+
self,
29+
zmq_pub_host: str = "tcp://0.0.0.0:5555",
30+
zmq_sub_host: str = "tcp://localhost:5555",
31+
result_backend: "Optional[AsyncResultBackend[_T]]" = None,
32+
task_id_generator: Optional[Callable[[], str]] = None,
33+
) -> None:
34+
if zmq is None:
35+
raise RuntimeError(
36+
"To use ZMQ broker please install pyzmq lib or taskiq[zmq].",
37+
)
38+
super().__init__(result_backend, task_id_generator)
39+
self.context = Context()
40+
self.pub_host = zmq_pub_host
41+
self.sub_host = zmq_sub_host
42+
if self.is_worker_process:
43+
self.socket = self.context.socket(zmq.SUB)
44+
self.socket.setsockopt(zmq.SUBSCRIBE, b"")
45+
else:
46+
self.socket = self.context.socket(zmq.PUB)
47+
self.socket.bind(self.pub_host)
48+
49+
async def kick(self, message: BrokerMessage) -> None:
50+
"""
51+
Kicking message.
52+
53+
This method is used to publish message
54+
via socket.
55+
56+
:param message: message to publish.
57+
"""
58+
with self.socket.connect(self.sub_host) as sock:
59+
await sock.send_string(message.json())
60+
61+
async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
62+
"""
63+
Start accepting new messages.
64+
65+
:yield: received broker message
66+
"""
67+
while True: # noqa: WPS457
68+
with self.socket.connect(self.sub_host) as sock:
69+
yield BrokerMessage.parse_raw(await sock.recv_string())

taskiq/cli/async_task_runner.py

Lines changed: 1 addition & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
import asyncio
22
import inspect
33
import io
4-
import signal
5-
import sys
64
from concurrent.futures import Executor, ThreadPoolExecutor
75
from logging import getLogger
86
from time import time
9-
from typing import Any, Callable, Dict, List, NoReturn, Optional
7+
from typing import Any, Callable, Dict, List, Optional
108

119
from pydantic import parse_obj_as
1210

@@ -180,81 +178,6 @@ async def run_task( # noqa: C901, WPS210, WPS211
180178
return result
181179

182180

183-
def exit_process(task: "asyncio.Task[Any]") -> NoReturn:
184-
"""
185-
This function exits from the current process.
186-
187-
It receives asyncio Task of broker.shutdown().
188-
We check if there were an exception or returned value.
189-
190-
If the function raised an exception, we print it with stack trace.
191-
If it returned a value, we log it.
192-
193-
After this, we cancel all current tasks in the loop
194-
and exits.
195-
196-
:param task: broker.shutdown task.
197-
"""
198-
exitcode = 0
199-
try:
200-
result = task.result()
201-
if result is not None:
202-
logger.info("Broker returned value on shutdown: '%s'", str(result))
203-
except Exception as exc:
204-
logger.warning("Exception was found while shutting down!")
205-
logger.warning(exc, exc_info=True)
206-
exitcode = 1
207-
208-
loop = asyncio.get_event_loop()
209-
for running_task in asyncio.all_tasks(loop):
210-
running_task.cancel()
211-
212-
logger.info("Worker process killed.")
213-
sys.exit(exitcode)
214-
215-
216-
def signal_handler(broker: AsyncBroker) -> Callable[[int, Any], None]:
217-
"""
218-
Signal handler.
219-
220-
This function is used to generate
221-
real signal handler using closures.
222-
223-
It takes current broker as an argument
224-
and returns function that shuts it down.
225-
226-
:param broker: current broker.
227-
:returns: signal handler function.
228-
"""
229-
230-
def _handler(signum: int, _frame: Any) -> None:
231-
"""
232-
Exit signal handler.
233-
234-
This signal handler
235-
calls shutdown for broker and after
236-
the task is done it exits process with 0 status code.
237-
238-
:param signum: received signal.
239-
:param _frame: current execution frame.
240-
"""
241-
if getattr(broker, "_is_shutting_down", False):
242-
# We're already shutting down the broker.
243-
return
244-
245-
# We set this flag to not call this method twice.
246-
# Since we add an asynchronous task in loop
247-
# It can wait for execution for some time.
248-
# We want to execute shutdown only once. Otherwise
249-
# it would give us Undefined Behaviour.
250-
broker._is_shutting_down = True # type: ignore # noqa: WPS437
251-
logger.info(f"Got {signum} signal. Shutting down worker process.")
252-
task = asyncio.create_task(broker.shutdown())
253-
task.add_done_callback(exit_process)
254-
255-
return _handler
256-
257-
258181
async def async_listen_messages( # noqa: C901, WPS210, WPS213
259182
broker: AsyncBroker,
260183
cli_args: TaskiqArgs,
@@ -268,15 +191,6 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
268191
:param broker: broker to listen to.
269192
:param cli_args: CLI arguments for worker.
270193
"""
271-
signal.signal(
272-
signal.SIGTERM,
273-
signal_handler(broker),
274-
)
275-
signal.signal(
276-
signal.SIGINT,
277-
signal_handler(broker),
278-
)
279-
280194
logger.info("Runing startup event.")
281195
await broker.startup()
282196
executor = ThreadPoolExecutor(

0 commit comments

Comments
 (0)