Skip to content

Commit 3f081b4

Browse files
fix: use lifespan
1 parent 34e891c commit 3f081b4

File tree

3 files changed

+24
-34
lines changed

3 files changed

+24
-34
lines changed

packages/celery-library/src/celery_library/signals.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ def _init(startup_complete_event: threading.Event) -> None:
2626
loop = asyncio.new_event_loop()
2727
asyncio.set_event_loop(loop)
2828

29-
shutdown_event = asyncio.Event()
30-
3129
async def _setup_task_manager():
3230
assert sender.app # nosec
3331
assert isinstance(sender.app, Celery) # nosec
@@ -42,9 +40,7 @@ async def _setup_task_manager():
4240
app_server.event_loop = loop
4341

4442
loop.run_until_complete(_setup_task_manager())
45-
loop.run_until_complete(
46-
app_server.startup(startup_complete_event, shutdown_event)
47-
)
43+
loop.run_until_complete(app_server.lifespan(startup_complete_event))
4844

4945
thread = threading.Thread(
5046
group=None,
@@ -63,4 +59,4 @@ def on_worker_shutdown(sender, **_kwargs) -> None:
6359
assert isinstance(sender.app, Celery)
6460
app_server = get_app_server(sender.app)
6561

66-
app_server.event_loop.run_until_complete(app_server.shutdown())
62+
app_server.shutdown_event.set()

packages/service-library/src/servicelib/celery/app_server.py

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
class BaseAppServer(ABC, Generic[T]):
1313
def __init__(self, app: T) -> None:
1414
self._app: T = app
15-
self._shutdown_event: asyncio.Event | None = None
15+
self._shutdown_event: asyncio.Event = asyncio.Event()
1616

1717
@property
1818
def app(self) -> T:
@@ -26,6 +26,10 @@ def event_loop(self) -> AbstractEventLoop:
2626
def event_loop(self, loop: AbstractEventLoop) -> None:
2727
self._event_loop = loop
2828

29+
@property
30+
def shutdown_event(self) -> asyncio.Event:
31+
return self._shutdown_event
32+
2933
@property
3034
def task_manager(self) -> TaskManager:
3135
return self._task_manager
@@ -35,23 +39,8 @@ def task_manager(self, manager: TaskManager) -> None:
3539
self._task_manager = manager
3640

3741
@abstractmethod
38-
async def on_startup(self) -> None:
39-
raise NotImplementedError
40-
41-
async def startup(
42-
self, startup_completed_event: threading.Event, shutdown_event: asyncio.Event
42+
async def lifespan(
43+
self,
44+
startup_completed_event: threading.Event,
4345
) -> None:
44-
self._shutdown_event = shutdown_event
45-
await self.on_startup()
46-
startup_completed_event.set()
47-
await self._shutdown_event.wait()
48-
49-
@abstractmethod
50-
async def on_shutdown(self) -> None:
5146
raise NotImplementedError
52-
53-
async def shutdown(self) -> None:
54-
if self._shutdown_event is not None:
55-
self._shutdown_event.set()
56-
57-
await self.on_shutdown()
Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
import asyncio
12
import datetime
3+
import logging
4+
import threading
25
from typing import Final
36

47
from asgi_lifespan import LifespanManager
@@ -8,21 +11,23 @@
811

912
_SHUTDOWN_TIMEOUT: Final[float] = datetime.timedelta(seconds=10).total_seconds()
1013

14+
_logger = logging.getLogger(__name__)
15+
1116

1217
class FastAPIAppServer(BaseAppServer[FastAPI]):
1318
def __init__(self, app: FastAPI):
1419
super().__init__(app)
1520
self._lifespan_manager: LifespanManager | None = None
1621

17-
async def on_startup(self) -> None:
18-
self._lifespan_manager = LifespanManager(
22+
async def lifespan(self, startup_completed_event: threading.Event) -> None:
23+
async with LifespanManager(
1924
self.app,
2025
startup_timeout=None, # waits for full app initialization (DB migrations, etc.)
2126
shutdown_timeout=_SHUTDOWN_TIMEOUT,
22-
)
23-
await self._lifespan_manager.__aenter__()
24-
25-
async def on_shutdown(self) -> None:
26-
if self._lifespan_manager is None:
27-
return
28-
await self._lifespan_manager.__aexit__(None, None, None)
27+
):
28+
try:
29+
_logger.info("fastapi app initialized")
30+
startup_completed_event.set()
31+
await self.shutdown_event.wait()
32+
except asyncio.CancelledError:
33+
_logger.warning("lifespan task cancelled")

0 commit comments

Comments
 (0)