Skip to content

Commit 5422119

Browse files
committed
Fixed startup events for multi-broker setup.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 484647f commit 5422119

File tree

1 file changed

+32
-28
lines changed

1 file changed

+32
-28
lines changed

taskiq_fastapi/initializator.py

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,40 +6,58 @@
66
from taskiq.cli.utils import import_object
77

88

9-
def startup_event_generator(app: FastAPI) -> Callable[[TaskiqState], Awaitable[None]]:
9+
def startup_event_generator(
10+
broker: AsyncBroker,
11+
app_path: str,
12+
) -> Callable[[TaskiqState], Awaitable[None]]:
1013
"""
1114
Generate shutdown event.
1215
13-
This function takes FastAPI application
16+
This function takes FastAPI application path
1417
and runs startup event on broker's startup.
1518
16-
:param app: fastapi application.
19+
:param broker: current broker.
20+
:param app_path: fastapi application path.
1721
:returns: startup handler.
1822
"""
1923

2024
async def startup(state: TaskiqState) -> None:
25+
if not broker.is_worker_process:
26+
return
27+
app = import_object(app_path)
28+
if not isinstance(app, FastAPI):
29+
app = app()
30+
31+
if not isinstance(app, FastAPI):
32+
raise ValueError(f"'{app_path}' is not a FastAPI application.")
33+
2134
state.fastapi_app = app
2235
app.router.routes = []
2336
await app.router.startup()
37+
populate_dependency_context(broker, app)
2438

2539
return startup
2640

2741

28-
def shutdown_event_generator(app: FastAPI) -> Callable[[TaskiqState], Awaitable[None]]:
42+
def shutdown_event_generator(
43+
broker: AsyncBroker,
44+
) -> Callable[[TaskiqState], Awaitable[None]]:
2945
"""
3046
Generate shutdown event.
3147
3248
This function takes FastAPI application
3349
and runs shutdown event on broker's shutdown.
3450
35-
:param app: current application.
36-
:return: startup event handler.
51+
:param broker: current broker.
52+
:return: shutdown event handler.
3753
"""
3854

39-
async def startup(_: TaskiqState) -> None:
40-
await app.router.shutdown()
55+
async def shutdown(state: TaskiqState) -> None:
56+
if not broker.is_worker_process:
57+
return
58+
await state.fastapi_app.router.shutdown()
4159

42-
return startup
60+
return shutdown
4361

4462

4563
def init(broker: AsyncBroker, app_path: str) -> None:
@@ -49,35 +67,21 @@ def init(broker: AsyncBroker, app_path: str) -> None:
4967
This is the main function to integrate FastAPI
5068
with taskiq.
5169
52-
This function imports fastapi application by
53-
python's path string and adds startup events
54-
for broker.
70+
It creates startup events for broker. So
71+
in worker processes all fastapi
72+
startup events will run.
5573
5674
:param broker: current broker to use.
5775
:param app_path: path to fastapi application.
58-
:raises ValueError: if fastapi cannot be resolved.
5976
"""
60-
if not broker.is_worker_process:
61-
return
62-
63-
app = import_object(app_path)
64-
65-
if not isinstance(app, FastAPI):
66-
app = app()
67-
68-
if not isinstance(app, FastAPI):
69-
raise ValueError(f"'{app_path}' is not a FastAPI application.")
70-
71-
populate_dependency_context(broker, app)
72-
7377
broker.add_event_handler(
7478
TaskiqEvents.WORKER_STARTUP,
75-
startup_event_generator(app),
79+
startup_event_generator(broker, app_path),
7680
)
7781

7882
broker.add_event_handler(
7983
TaskiqEvents.WORKER_SHUTDOWN,
80-
shutdown_event_generator(app),
84+
shutdown_event_generator(broker),
8185
)
8286

8387

0 commit comments

Comments
 (0)