Skip to content

Commit 0d58ad8

Browse files
authored
🐛Storage-Worker: ensure startup waits for fastapi application readyness (#7442)
1 parent af8e683 commit 0d58ad8

File tree

2 files changed

+53
-38
lines changed
  • packages/aws-library/src/aws_library/s3
  • services/storage/src/simcore_service_storage/modules/celery

2 files changed

+53
-38
lines changed

packages/aws-library/src/aws_library/s3/_client.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,21 +85,30 @@ async def create(
8585
cls, settings: S3Settings, s3_max_concurrency: int = _S3_MAX_CONCURRENCY_DEFAULT
8686
) -> "SimcoreS3API":
8787
session = aioboto3.Session()
88-
session_client = session.client( # type: ignore[call-overload]
89-
"s3",
90-
endpoint_url=f"{settings.S3_ENDPOINT}",
91-
aws_access_key_id=settings.S3_ACCESS_KEY,
92-
aws_secret_access_key=settings.S3_SECRET_KEY,
93-
region_name=settings.S3_REGION,
94-
config=Config(signature_version="s3v4"),
95-
)
96-
assert isinstance(session_client, ClientCreatorContext) # nosec
88+
session_client = None
9789
exit_stack = contextlib.AsyncExitStack()
98-
s3_client = cast(S3Client, await exit_stack.enter_async_context(session_client))
99-
# NOTE: this triggers a botocore.exception.ClientError in case the connection is not made to the S3 backend
100-
await s3_client.list_buckets()
90+
try:
91+
session_client = session.client( # type: ignore[call-overload]
92+
"s3",
93+
endpoint_url=f"{settings.S3_ENDPOINT}",
94+
aws_access_key_id=settings.S3_ACCESS_KEY,
95+
aws_secret_access_key=settings.S3_SECRET_KEY,
96+
region_name=settings.S3_REGION,
97+
config=Config(signature_version="s3v4"),
98+
)
99+
assert isinstance(session_client, ClientCreatorContext) # nosec
101100

102-
return cls(s3_client, session, exit_stack, s3_max_concurrency)
101+
s3_client = cast(
102+
S3Client, await exit_stack.enter_async_context(session_client)
103+
)
104+
# NOTE: this triggers a botocore.exception.ClientError in case the connection is not made to the S3 backend
105+
await s3_client.list_buckets()
106+
107+
return cls(s3_client, session, exit_stack, s3_max_concurrency)
108+
except Exception:
109+
await exit_stack.aclose()
110+
111+
raise
103112

104113
async def close(self) -> None:
105114
await self._exit_stack.aclose()

services/storage/src/simcore_service_storage/modules/celery/signals.py

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import asyncio
2+
import datetime
23
import logging
34
import threading
45
from typing import Final
56

67
from asgi_lifespan import LifespanManager
78
from celery import Celery # type: ignore[import-untyped]
89
from fastapi import FastAPI
9-
from servicelib.async_utils import cancel_wait_task
10+
from servicelib.logging_utils import log_context
1011

1112
from ...core.application import create_app
1213
from ...core.settings import ApplicationSettings
13-
from ...modules.celery import get_event_loop, set_event_loop
14+
from ...modules.celery import set_event_loop
1415
from ...modules.celery.utils import (
1516
get_fastapi_app,
1617
set_celery_worker,
@@ -20,52 +21,57 @@
2021

2122
_logger = logging.getLogger(__name__)
2223

23-
_LIFESPAN_TIMEOUT: Final[int] = 10
24+
_SHUTDOWN_TIMEOUT: Final[float] = datetime.timedelta(seconds=10).total_seconds()
25+
_STARTUP_TIMEOUT: Final[float] = datetime.timedelta(minutes=1).total_seconds()
2426

2527

2628
def on_worker_init(sender, **_kwargs) -> None:
27-
def _init_fastapi() -> None:
29+
startup_complete_event = threading.Event()
30+
31+
def _init_fastapi(startup_complete_event: threading.Event) -> None:
2832
loop = asyncio.new_event_loop()
2933
asyncio.set_event_loop(loop)
3034
shutdown_event = asyncio.Event()
3135

3236
fastapi_app = create_app(ApplicationSettings.create_from_envs())
3337

34-
async def lifespan():
38+
async def lifespan(
39+
startup_complete_event: threading.Event, shutdown_event: asyncio.Event
40+
) -> None:
3541
async with LifespanManager(
3642
fastapi_app,
37-
startup_timeout=_LIFESPAN_TIMEOUT,
38-
shutdown_timeout=_LIFESPAN_TIMEOUT,
43+
startup_timeout=_STARTUP_TIMEOUT,
44+
shutdown_timeout=_SHUTDOWN_TIMEOUT,
3945
):
4046
try:
47+
_logger.info("fastapi APP started!")
48+
startup_complete_event.set()
4149
await shutdown_event.wait()
4250
except asyncio.CancelledError:
4351
_logger.warning("Lifespan task cancelled")
4452

45-
lifespan_task = loop.create_task(lifespan())
46-
fastapi_app.state.lifespan_task = lifespan_task
4753
fastapi_app.state.shutdown_event = shutdown_event
4854
set_event_loop(fastapi_app, loop)
4955

5056
set_fastapi_app(sender.app, fastapi_app)
5157
set_celery_worker(sender.app, CeleryTaskQueueWorker(sender.app))
52-
53-
loop.run_forever()
54-
55-
thread = threading.Thread(target=_init_fastapi, daemon=True)
58+
loop.run_until_complete(lifespan(startup_complete_event, shutdown_event))
59+
60+
thread = threading.Thread(
61+
group=None,
62+
target=_init_fastapi,
63+
name="fastapi_app",
64+
args=(startup_complete_event,),
65+
daemon=True,
66+
)
5667
thread.start()
68+
# ensure the fastapi app is ready before going on
69+
startup_complete_event.wait(_STARTUP_TIMEOUT * 1.1)
5770

5871

59-
def on_worker_shutdown(sender, **_kwargs):
60-
assert isinstance(sender.app, Celery)
61-
62-
fastapi_app = get_fastapi_app(sender.app)
63-
assert isinstance(fastapi_app, FastAPI)
64-
event_loop = get_event_loop(fastapi_app)
65-
66-
async def shutdown():
72+
def on_worker_shutdown(sender, **_kwargs) -> None:
73+
with log_context(_logger, logging.INFO, "Worker Shuts-down"):
74+
assert isinstance(sender.app, Celery)
75+
fastapi_app = get_fastapi_app(sender.app)
76+
assert isinstance(fastapi_app, FastAPI)
6777
fastapi_app.state.shutdown_event.set()
68-
69-
await cancel_wait_task(fastapi_app.state.lifespan_task, max_delay=5)
70-
71-
asyncio.run_coroutine_threadsafe(shutdown(), event_loop)

0 commit comments

Comments
 (0)