Skip to content

Commit 1c4f7ec

Browse files
fix: startup
1 parent 72a0ce9 commit 1c4f7ec

File tree

3 files changed

+14
-7
lines changed

3 files changed

+14
-7
lines changed

packages/celery-library/src/celery_library/signals.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,19 @@ def on_worker_init(
2424
sender: WorkController,
2525
**_kwargs,
2626
) -> None:
27-
def _init() -> None:
27+
startup_complete_event = threading.Event()
28+
29+
def _init(startup_complete_event: threading.Event) -> None:
2830
loop = asyncio.new_event_loop()
2931
asyncio.set_event_loop(loop)
3032

3133
app_server.event_loop = loop
3234

33-
async def setup_task_manager():
35+
async def _setup():
3436
assert sender.app # nosec
3537
assert isinstance(sender.app, Celery) # nosec
3638

39+
set_app_server(sender.app, app_server)
3740
set_task_manager(
3841
sender.app,
3942
create_task_manager(
@@ -42,18 +45,20 @@ async def setup_task_manager():
4245
),
4346
)
4447

45-
set_app_server(sender.app, app_server)
46-
loop.run_until_complete(setup_task_manager())
47-
loop.run_until_complete(app_server.startup())
48+
loop.run_until_complete(_setup())
49+
loop.run_until_complete(app_server.startup(startup_complete_event))
4850

4951
thread = threading.Thread(
5052
group=None,
5153
target=_init,
5254
name="app_server_init",
55+
args=(startup_complete_event,),
5356
daemon=True,
5457
)
5558
thread.start()
5659

60+
startup_complete_event.wait()
61+
5762

5863
def on_worker_shutdown(sender, **_kwargs) -> None:
5964
with log_context(_logger, logging.INFO, "Worker shutdown"):

packages/service-library/src/servicelib/base_app_server.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import threading
12
from abc import ABC, abstractmethod
23
from asyncio import AbstractEventLoop
34
from contextlib import suppress
@@ -20,7 +21,7 @@ def aiohttp_app(self) -> "Application":
2021
raise NotImplementedError
2122

2223
@abstractmethod
23-
async def startup(self):
24+
async def startup(self, completed: threading.Event):
2425
pass
2526

2627
@property

packages/service-library/src/servicelib/fastapi/app_server.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ def fastapi_app(self) -> FastAPI:
2222
assert isinstance(self._app, FastAPI) # nosec
2323
return self._app
2424

25-
async def startup(self):
25+
async def startup(self, completed: asyncio.Event):
2626
self._lifespan_manager = LifespanManager(
2727
self.fastapi_app,
2828
startup_timeout=_STARTUP_TIMEOUT,
2929
shutdown_timeout=_SHUTDOWN_TIMEOUT,
3030
)
3131
await self._lifespan_manager.__aenter__()
32+
completed.set()
3233
await self._shutdown_event.wait()
3334

3435
async def shutdown(self):

0 commit comments

Comments
 (0)