Skip to content

Commit 2f99231

Browse files
authored
Added better support for multi-broker setup. (#169)
1 parent 6fb6c33 commit 2f99231

File tree

9 files changed

+86
-26
lines changed

9 files changed

+86
-26
lines changed

docs/examples/extending/middleware_async.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@
55

66

77
class MyMiddleware(TaskiqMiddleware):
8+
async def startup(self) -> None:
9+
print("RUN STARTUP")
10+
await sleep(1)
11+
12+
async def shutdown(self) -> None:
13+
print("RUN SHUTDOWN")
14+
await sleep(1)
15+
816
async def pre_execute(self, message: "TaskiqMessage") -> TaskiqMessage:
917
await sleep(1)
1018
return message

docs/examples/extending/middleware_sync.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@
55

66

77
class MyMiddleware(TaskiqMiddleware):
8+
def startup(self) -> None:
9+
print("RUN STARTUP")
10+
sleep(1)
11+
12+
def shutdown(self) -> None:
13+
print("RUN SHUTDOWN")
14+
sleep(1)
15+
816
def pre_execute(self, message: "TaskiqMessage") -> TaskiqMessage:
917
sleep(1)
1018
return message

taskiq/abc/broker.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,6 @@ class AsyncBroker(ABC):
6969
"""
7070

7171
available_tasks: Dict[str, AsyncTaskiqDecoratedTask[Any, Any]] = {}
72-
# True only if broker runs in worker process.
73-
is_worker_process: bool = False
74-
# True only if broker runs in scheduler process.
75-
is_scheduler_process: bool = False
7672

7773
def __init__(
7874
self,
@@ -111,6 +107,10 @@ def __init__(
111107
] = defaultdict(list)
112108
self.state = TaskiqState()
113109
self.custom_dependency_context: Dict[Any, Any] = {}
110+
# True only if broker runs in worker process.
111+
self.is_worker_process: bool = False
112+
# True only if broker runs in scheduler process.
113+
self.is_scheduler_process: bool = False
114114

115115
def add_dependency_context(self, new_ctx: Dict[Any, Any]) -> None:
116116
"""
@@ -151,6 +151,10 @@ async def startup(self) -> None:
151151
for handler in self.event_handlers[event]:
152152
await maybe_awaitable(handler(self.state))
153153

154+
for middleware in self.middlewares:
155+
if middleware.__class__.startup != TaskiqMiddleware.startup:
156+
await maybe_awaitable(middleware.startup)
157+
154158
await self.result_backend.startup()
155159

156160
async def shutdown(self) -> None:
@@ -168,6 +172,10 @@ async def shutdown(self) -> None:
168172
for handler in self.event_handlers[event]:
169173
await maybe_awaitable(handler(self.state))
170174

175+
for middleware in self.middlewares:
176+
if middleware.__class__.shutdown != TaskiqMiddleware.shutdown:
177+
await maybe_awaitable(middleware.shutdown)
178+
171179
await self.result_backend.shutdown()
172180

173181
@abstractmethod

taskiq/abc/middleware.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,26 @@ def set_broker(self, broker: "AsyncBroker") -> None:
2020
"""
2121
self.broker = broker
2222

23+
def startup(self) -> "Union[None, Coroutine[Any, Any, None]]":
24+
"""
25+
Startup method to perform various action during startup.
26+
27+
This function can be either sync or async.
28+
Executed during broker's startup.
29+
30+
:returns nothing.
31+
"""
32+
33+
def shutdown(self) -> "Union[None, Coroutine[Any, Any, None]]":
34+
"""
35+
Shutdown method to perform various action during shutdown.
36+
37+
This function can be either sync or async.
38+
Executed during broker's shutdown.
39+
40+
:returns nothing.
41+
"""
42+
2343
def pre_send(
2444
self,
2545
message: "TaskiqMessage",

taskiq/brokers/zmq_broker.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
try:
1010
import zmq # noqa: WPS433
11-
from zmq.asyncio import Context # noqa: WPS433
11+
from zmq.asyncio import Context, Socket # noqa: WPS433
1212
except ImportError:
1313
zmq = None # type: ignore
1414

@@ -43,12 +43,23 @@ def __init__(
4343
self.context = Context()
4444
self.pub_host = zmq_pub_host
4545
self.sub_host = zmq_sub_host
46+
self.socket: Socket
47+
48+
async def startup(self) -> None:
49+
"""
50+
Startup for zmq broker.
51+
52+
This function creates actual connections to
53+
sockets. if current process is worker,
54+
it subscribes, otherwise it becomes publisher.
55+
"""
4656
if self.is_worker_process:
4757
self.socket = self.context.socket(zmq.SUB)
4858
self.socket.setsockopt(zmq.SUBSCRIBE, b"")
4959
else:
5060
self.socket = self.context.socket(zmq.PUB)
5161
self.socket.bind(self.pub_host)
62+
await super().startup()
5263

5364
async def kick(self, message: BrokerMessage) -> None:
5465
"""

taskiq/cli/scheduler/run.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
from pycron import is_now
77

8-
from taskiq.abc.broker import AsyncBroker
98
from taskiq.cli.scheduler.args import SchedulerArgs
109
from taskiq.cli.utils import import_object, import_tasks
1110
from taskiq.kicker import AsyncKicker
@@ -71,13 +70,16 @@ async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS
7170
7271
:param args: parsed CLI args.
7372
"""
74-
AsyncBroker.is_scheduler_process = True
75-
scheduler = import_object(args.scheduler)
76-
if not isinstance(scheduler, TaskiqScheduler):
73+
if isinstance(args.scheduler, str):
74+
scheduler = import_object(args.scheduler)
75+
else:
76+
scheduler = args.scheduler
77+
if not isinstance(args.scheduler, TaskiqScheduler):
7778
print( # noqa: WPS421
7879
"Imported scheduler is not a subclass of TaskiqScheduler.",
7980
)
8081
exit(1) # noqa: WPS421
82+
scheduler.broker.is_scheduler_process = True
8183
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
8284
basicConfig(
8385
level=getLevelName(args.log_level),

taskiq/cli/worker/run.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,11 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
118118
# broker is running as a worker.
119119
# We must set this field before importing tasks,
120120
# so broker will remember all tasks it's related to.
121-
AsyncBroker.is_worker_process = True
122121
broker = import_object(args.broker)
123-
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
124122
if not isinstance(broker, AsyncBroker):
125123
raise ValueError("Unknown broker type. Please use AsyncBroker instance.")
124+
broker.is_worker_process = True
125+
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
126126

127127
receiver_type = get_receiver_type(args)
128128
receiver_args = dict(args.receiver_arg)

taskiq/middlewares/prometheus_middleware.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from tempfile import gettempdir
55
from typing import Any, Optional
66

7-
from taskiq.abc.broker import AsyncBroker
87
from taskiq.abc.middleware import TaskiqMiddleware
98
from taskiq.message import TaskiqMessage
109
from taskiq.result import TaskiqResult
@@ -36,9 +35,6 @@ def __init__(
3635
self.saved_results = None
3736
self.execution_time = None
3837

39-
if not AsyncBroker.is_worker_process:
40-
return
41-
4238
metrics_path = metrics_path or Path(gettempdir()) / "taskiq_worker"
4339

4440
if not metrics_path.exists():
@@ -52,11 +48,7 @@ def __init__(
5248
logger.debug("Initializing metrics")
5349

5450
try:
55-
from prometheus_client import ( # noqa: WPS433
56-
Counter,
57-
Histogram,
58-
start_http_server,
59-
)
51+
from prometheus_client import Counter, Histogram # noqa: WPS433
6052
except ImportError as exc:
6153
raise ImportError(
6254
"Cannot initialize metrics. Please install 'taskiq[metrics]'.",
@@ -87,10 +79,23 @@ def __init__(
8779
"Tome of function execution",
8880
["task_name"],
8981
)
90-
try:
91-
start_http_server(port=server_port, addr=server_addr)
92-
except OSError as exc:
93-
logger.debug("Cannot start prometheus server: %s", exc)
82+
self.server_port = server_port
83+
self.server_addr = server_addr
84+
85+
def startup(self) -> None:
86+
"""
87+
Prometheus startup.
88+
89+
This function starts prometheus server.
90+
It starts it only in case if it's a worker process.
91+
"""
92+
from prometheus_client import start_http_server # noqa: WPS433
93+
94+
if self.broker.is_worker_process:
95+
try:
96+
start_http_server(port=self.server_port, addr=self.server_addr)
97+
except OSError as exc:
98+
logger.debug("Cannot start prometheus server: %s", exc)
9499

95100
def pre_execute(
96101
self,

tests/conftest.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,3 @@ def reset_broker() -> Generator[None, None, None]:
2626
"""
2727
yield
2828
AsyncBroker.available_tasks = {}
29-
AsyncBroker.is_worker_process = False
30-
AsyncBroker.is_scheduler_process = False

0 commit comments

Comments
 (0)