Skip to content

Commit 615c826

Browse files
🐛 Use context manager for app server lifecycle in Celery workers (#7962)
1 parent 8bc8f3c commit 615c826

File tree

4 files changed

+28
-48
lines changed

4 files changed

+28
-48
lines changed

packages/celery-library/src/celery_library/signals.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from celery import Celery # type: ignore[import-untyped]
66
from celery.worker.worker import WorkController # type: ignore[import-untyped]
7-
from servicelib.celery.app_server import STARTUP_TIMEOUT, BaseAppServer
7+
from servicelib.celery.app_server import BaseAppServer
88
from servicelib.logging_utils import log_context
99
from settings_library.celery import CelerySettings
1010

@@ -26,8 +26,6 @@ def _init(startup_complete_event: threading.Event) -> None:
2626
loop = asyncio.new_event_loop()
2727
asyncio.set_event_loop(loop)
2828

29-
shutdown_event = asyncio.Event()
30-
3129
async def _setup_task_manager():
3230
assert sender.app # nosec
3331
assert isinstance(sender.app, Celery) # nosec
@@ -42,9 +40,7 @@ async def _setup_task_manager():
4240
app_server.event_loop = loop
4341

4442
loop.run_until_complete(_setup_task_manager())
45-
loop.run_until_complete(
46-
app_server.startup(startup_complete_event, shutdown_event)
47-
)
43+
loop.run_until_complete(app_server.lifespan(startup_complete_event))
4844

4945
thread = threading.Thread(
5046
group=None,
@@ -55,12 +51,12 @@ async def _setup_task_manager():
5551
)
5652
thread.start()
5753

58-
startup_complete_event.wait(STARTUP_TIMEOUT * 1.1)
54+
startup_complete_event.wait()
5955

6056

6157
def on_worker_shutdown(sender, **_kwargs) -> None:
6258
with log_context(_logger, logging.INFO, "Worker shutdown"):
6359
assert isinstance(sender.app, Celery)
6460
app_server = get_app_server(sender.app)
6561

66-
app_server.event_loop.run_until_complete(app_server.shutdown())
62+
app_server.shutdown_event.set()

packages/celery-library/tests/conftest.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# pylint: disable=unused-argument
33

44
import datetime
5+
import threading
56
from collections.abc import AsyncIterator, Callable
67
from functools import partial
78
from typing import Any
@@ -30,10 +31,7 @@
3031

3132

3233
class FakeAppServer(BaseAppServer):
33-
async def on_startup(self) -> None:
34-
pass
35-
36-
async def on_shutdown(self) -> None:
34+
async def lifespan(self, startup_completed_event: threading.Event) -> None:
3735
pass
3836

3937

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,18 @@
11
import asyncio
2-
import datetime
32
import threading
43
from abc import ABC, abstractmethod
54
from asyncio import AbstractEventLoop
6-
from typing import Final, Generic, TypeVar
5+
from typing import Generic, TypeVar
76

87
from servicelib.celery.task_manager import TaskManager
98

10-
STARTUP_TIMEOUT: Final[float] = datetime.timedelta(minutes=1).total_seconds()
11-
129
T = TypeVar("T")
1310

1411

1512
class BaseAppServer(ABC, Generic[T]):
1613
def __init__(self, app: T) -> None:
1714
self._app: T = app
18-
self._shutdown_event: asyncio.Event | None = None
15+
self._shutdown_event: asyncio.Event = asyncio.Event()
1916

2017
@property
2118
def app(self) -> T:
@@ -29,6 +26,10 @@ def event_loop(self) -> AbstractEventLoop:
2926
def event_loop(self, loop: AbstractEventLoop) -> None:
3027
self._event_loop = loop
3128

29+
@property
30+
def shutdown_event(self) -> asyncio.Event:
31+
return self._shutdown_event
32+
3233
@property
3334
def task_manager(self) -> TaskManager:
3435
return self._task_manager
@@ -38,23 +39,8 @@ def task_manager(self, manager: TaskManager) -> None:
3839
self._task_manager = manager
3940

4041
@abstractmethod
41-
async def on_startup(self) -> None:
42-
raise NotImplementedError
43-
44-
async def startup(
45-
self, completed_event: threading.Event, shutdown_event: asyncio.Event
42+
async def lifespan(
43+
self,
44+
startup_completed_event: threading.Event,
4645
) -> None:
47-
self._shutdown_event = shutdown_event
48-
completed_event.set()
49-
await self.on_startup()
50-
await self._shutdown_event.wait()
51-
52-
@abstractmethod
53-
async def on_shutdown(self) -> None:
5446
raise NotImplementedError
55-
56-
async def shutdown(self) -> None:
57-
if self._shutdown_event is not None:
58-
self._shutdown_event.set()
59-
60-
await self.on_shutdown()
Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,29 @@
1-
from datetime import timedelta
1+
import datetime
2+
import logging
3+
import threading
24
from typing import Final
35

46
from asgi_lifespan import LifespanManager
57
from fastapi import FastAPI
68

79
from ...celery.app_server import BaseAppServer
810

9-
_SHUTDOWN_TIMEOUT: Final[float] = timedelta(seconds=10).total_seconds()
10-
_STARTUP_TIMEOUT: Final[float] = timedelta(minutes=1).total_seconds()
11+
_SHUTDOWN_TIMEOUT: Final[float] = datetime.timedelta(seconds=10).total_seconds()
12+
13+
_logger = logging.getLogger(__name__)
1114

1215

1316
class FastAPIAppServer(BaseAppServer[FastAPI]):
1417
def __init__(self, app: FastAPI):
1518
super().__init__(app)
1619
self._lifespan_manager: LifespanManager | None = None
1720

18-
async def on_startup(self) -> None:
19-
self._lifespan_manager = LifespanManager(
21+
async def lifespan(self, startup_completed_event: threading.Event) -> None:
22+
async with LifespanManager(
2023
self.app,
21-
startup_timeout=_STARTUP_TIMEOUT,
24+
startup_timeout=None, # waits for full app initialization (DB migrations, etc.)
2225
shutdown_timeout=_SHUTDOWN_TIMEOUT,
23-
)
24-
await self._lifespan_manager.__aenter__()
25-
26-
async def on_shutdown(self) -> None:
27-
if self._lifespan_manager is None:
28-
return
29-
await self._lifespan_manager.__aexit__(None, None, None)
26+
):
27+
_logger.info("fastapi app initialized")
28+
startup_completed_event.set()
29+
await self.shutdown_event.wait() # NOTE: wait here until shutdown is requested

0 commit comments

Comments
 (0)