Skip to content

Commit acd33b2

Browse files
committed
✨ Refactor: Update service setup functions to use async iterators for improved lifecycle management
1 parent 80b4063 commit acd33b2

File tree

6 files changed

+66
-97
lines changed

6 files changed

+66
-97
lines changed

services/catalog/src/simcore_service_catalog/core/application.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,7 @@
2222
from ..api.rest.routes import setup_rest_api_routes
2323
from ..api.rpc.routes import setup_rpc_api_routes
2424
from ..exceptions.handlers import setup_exception_handlers
25-
from ..services.function_services import setup_function_services
26-
from ..services.rabbitmq import setup_rabbitmq
2725
from . import events
28-
from .events import (
29-
_create_on_shutdown,
30-
)
3126
from .settings import ApplicationSettings
3227

3328
_logger = logging.getLogger(__name__)
@@ -73,14 +68,10 @@ def create_app() -> FastAPI:
7368
if settings.CATALOG_TRACING:
7469
initialize_tracing(app, settings.CATALOG_TRACING, APP_NAME)
7570

76-
# PLUGIN SETUP
77-
setup_function_services(app)
78-
setup_rabbitmq(app)
79-
71+
# MIDDLEWARES
8072
if settings.CATALOG_PROMETHEUS_INSTRUMENTATION_ENABLED:
8173
setup_prometheus_instrumentation(app)
8274

83-
# MIDDLEWARES
8475
if settings.CATALOG_PROFILING:
8576
initialize_profiler(app)
8677

@@ -96,9 +87,6 @@ def create_app() -> FastAPI:
9687
setup_rest_api_routes(app, vtag=API_VTAG)
9788
setup_rpc_api_routes(app)
9889

99-
# SHUTDOWN-EVENT
100-
app.add_event_handler("shutdown", _create_on_shutdown(app))
101-
10290
# EXCEPTIONS
10391
setup_exception_handlers(app)
10492

services/catalog/src/simcore_service_catalog/core/background_tasks.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111

1212
import asyncio
1313
import logging
14+
from collections.abc import AsyncIterator
1415
from contextlib import suppress
1516
from pprint import pformat
1617
from typing import Final
1718

1819
from fastapi import FastAPI, HTTPException
20+
from fastapi_lifespan_manager import State
1921
from models_library.services import ServiceMetaDataPublished
2022
from models_library.services_types import ServiceKey, ServiceVersion
2123
from packaging.version import Version
@@ -115,9 +117,9 @@ async def _ensure_registry_and_database_are_synced(app: FastAPI) -> None:
115117
director_api = get_director_api(app)
116118
services_in_manifest_map = await manifest.get_services_map(director_api)
117119

118-
services_in_db: set[
119-
tuple[ServiceKey, ServiceVersion]
120-
] = await _list_services_in_database(app.state.engine)
120+
services_in_db: set[tuple[ServiceKey, ServiceVersion]] = (
121+
await _list_services_in_database(app.state.engine)
122+
)
121123

122124
# check that the db has all the services at least once
123125
missing_services_in_db = set(services_in_manifest_map.keys()) - services_in_db
@@ -232,3 +234,11 @@ async def stop_registry_sync_task(app: FastAPI) -> None:
232234
await task
233235
app.state.registry_sync_task = None
234236
_logger.info("registry syncing task stopped")
237+
238+
239+
async def setup_background_task(app: FastAPI) -> AsyncIterator[State]:
240+
await start_registry_sync_task(app)
241+
try:
242+
yield {}
243+
finally:
244+
await stop_registry_sync_task(app)
Lines changed: 18 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,24 @@
1-
import contextlib
21
import logging
3-
from collections.abc import AsyncIterator, Awaitable, Callable
4-
from typing import TypeAlias
2+
from collections.abc import AsyncIterator
53

64
from fastapi import FastAPI
75
from fastapi_lifespan_manager import LifespanManager, State
86
from servicelib.fastapi.postgres_lifespan import (
97
PostgresLifespanStateKeys,
108
postgres_lifespan,
119
)
12-
from servicelib.logging_utils import log_context
1310

1411
from .._meta import APP_FINISHED_BANNER_MSG, APP_STARTED_BANNER_MSG
1512
from ..db.events import setup_database
16-
from ..services.director import close_director, setup_director
17-
from .background_tasks import start_registry_sync_task, stop_registry_sync_task
13+
from ..services.director import setup_director
14+
from ..services.function_services import setup_function_services
15+
from ..services.rabbitmq import setup_rabbitmq
16+
from .background_tasks import setup_background_task
1817
from .settings import ApplicationSettings
1918

2019
_logger = logging.getLogger(__name__)
2120

2221

23-
EventCallable: TypeAlias = Callable[[], Awaitable[None]]
24-
25-
2622
def flush_started_banner() -> None:
2723
# WARNING: this function is spied in the tests
2824
print(APP_STARTED_BANNER_MSG, flush=True) # noqa: T201
@@ -32,7 +28,7 @@ def flush_finished_banner() -> None:
3228
print(APP_FINISHED_BANNER_MSG, flush=True) # noqa: T201
3329

3430

35-
async def _main_setup(app: FastAPI) -> AsyncIterator[State]:
31+
async def _setup_app(app: FastAPI) -> AsyncIterator[State]:
3632
flush_started_banner()
3733

3834
settings: ApplicationSettings = app.state.settings
@@ -44,55 +40,25 @@ async def _main_setup(app: FastAPI) -> AsyncIterator[State]:
4440
flush_finished_banner()
4541

4642

47-
def _create_on_startup(app: FastAPI) -> EventCallable:
48-
async def _() -> None:
49-
50-
if app.state.settings.CATALOG_DIRECTOR:
51-
# setup connection to director
52-
await setup_director(app)
53-
54-
# FIXME: check director service is in place and ready. Hand-shake??
55-
# SEE https://github.com/ITISFoundation/osparc-simcore/issues/1728
56-
await start_registry_sync_task(app)
57-
58-
_logger.info("Application started")
59-
60-
return _
61-
62-
63-
def _create_on_shutdown(app: FastAPI) -> EventCallable:
64-
async def _() -> None:
65-
66-
with log_context(_logger, logging.INFO, "Application shutdown"):
67-
if app.state.settings.CATALOG_DIRECTOR:
68-
try:
69-
await stop_registry_sync_task(app)
70-
await close_director(app)
71-
except Exception: # pylint: disable=broad-except
72-
_logger.exception("Unexpected error while closing application")
73-
74-
return _
75-
76-
77-
@contextlib.asynccontextmanager
78-
async def _other_setup(app: FastAPI) -> AsyncIterator[State]:
79-
80-
await _create_on_startup(app)()
81-
82-
yield {}
83-
84-
await _create_on_shutdown(app)()
85-
86-
8743
def create_app_lifespan():
8844
# app lifespan
8945
app_lifespan = LifespanManager()
90-
app_lifespan.add(_main_setup)
46+
app_lifespan.add(_setup_app)
9147

9248
# - postgres lifespan
9349
postgres_lifespan.add(setup_database)
9450
app_lifespan.include(postgres_lifespan)
9551

96-
app_lifespan.add(_other_setup)
52+
# - rabbitmq lifespan
53+
app_lifespan.add(setup_rabbitmq)
54+
55+
# - director lifespan
56+
app_lifespan.add(setup_director)
57+
58+
# - function services lifespan
59+
app_lifespan.add(setup_function_services)
60+
61+
# - background task lifespan
62+
app_lifespan.add(setup_background_task)
9763

9864
return app_lifespan

services/catalog/src/simcore_service_catalog/services/director.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
import json
44
import logging
55
import urllib.parse
6-
from collections.abc import Awaitable, Callable
6+
from collections.abc import AsyncIterator, Awaitable, Callable
77
from contextlib import suppress
88
from pprint import pformat
99
from typing import Any, Final
1010

1111
import httpx
1212
from common_library.json_serialization import json_dumps
1313
from fastapi import FastAPI, HTTPException
14+
from fastapi_lifespan_manager import State
1415
from models_library.api_schemas_directorv2.services import ServiceExtras
1516
from models_library.services_metadata_published import ServiceMetaDataPublished
1617
from models_library.services_types import ServiceKey, ServiceVersion
@@ -65,7 +66,7 @@ def _validate_kind(entry_to_validate: dict[str, Any], kind_name: str):
6566

6667

6768
def _return_data_or_raise_error(
68-
request_func: Callable[..., Awaitable[httpx.Response]]
69+
request_func: Callable[..., Awaitable[httpx.Response]],
6970
) -> Callable[..., Awaitable[list[Any] | dict[str, Any]]]:
7071
"""
7172
Creates a context for safe inter-process communication (IPC)
@@ -288,13 +289,14 @@ async def get_service_extras(
288289
return TypeAdapter(ServiceExtras).validate_python(result)
289290

290291

291-
async def setup_director(app: FastAPI) -> None:
292+
async def setup_director(app: FastAPI) -> AsyncIterator[State]:
293+
client: DirectorApi | None = None
294+
292295
if settings := app.state.settings.CATALOG_DIRECTOR:
293296
with log_context(
294297
_logger, logging.DEBUG, "Setup director at %s", f"{settings.base_url=}"
295298
):
296299
async for attempt in AsyncRetrying(**_director_startup_retry_policy):
297-
client = DirectorApi(base_url=settings.base_url, app=app)
298300
with attempt:
299301
client = DirectorApi(base_url=settings.base_url, app=app)
300302
if not await client.is_responsive():
@@ -303,17 +305,16 @@ async def setup_director(app: FastAPI) -> None:
303305
raise DirectorUnresponsiveError
304306

305307
_logger.info(
306-
"Connection to director-v0 succeded [%s]",
308+
"Connection to director-v0 succeeded [%s]",
307309
json_dumps(attempt.retry_state.retry_object.statistics),
308310
)
309311

310312
# set when connected
311313
app.state.director_api = client
312314

313-
314-
async def close_director(app: FastAPI) -> None:
315-
client: DirectorApi | None
316-
if client := app.state.director_api:
317-
await client.close()
318-
319-
_logger.debug("Director client closed successfully")
315+
try:
316+
yield {}
317+
finally:
318+
if client:
319+
await client.close()
320+
_logger.debug("Director client closed successfully")

services/catalog/src/simcore_service_catalog/services/function_services.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
from collections.abc import AsyncIterator
2+
13
# mypy: disable-error-code=truthy-function
24
from typing import Any
35

46
from fastapi import status
57
from fastapi.applications import FastAPI
68
from fastapi.exceptions import HTTPException
9+
from fastapi_lifespan_manager import State
710
from models_library.function_services_catalog import (
811
is_function_service,
912
iter_service_docker_data,
@@ -31,12 +34,15 @@ def get_function_service(key, version) -> ServiceMetaDataPublished:
3134
) from err
3235

3336

34-
def setup_function_services(app: FastAPI):
35-
def _on_startup() -> None:
36-
catalog = [_as_dict(metadata) for metadata in iter_service_docker_data()]
37-
app.state.frontend_services_catalog = catalog
37+
async def setup_function_services(app: FastAPI) -> AsyncIterator[State]:
38+
app.state.frontend_services_catalog = [
39+
_as_dict(metadata) for metadata in iter_service_docker_data()
40+
]
3841

39-
app.add_event_handler("startup", _on_startup)
42+
try:
43+
yield {}
44+
finally:
45+
app.state.frontend_services_catalog = None
4046

4147

4248
__all__: tuple[str, ...] = (

services/catalog/src/simcore_service_catalog/services/rabbitmq.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import logging
2+
from collections.abc import AsyncIterator
23
from typing import cast
34

45
from fastapi import FastAPI
6+
from fastapi_lifespan_manager import State
57
from servicelib.rabbitmq import RabbitMQRPCClient, wait_till_rabbitmq_responsive
68
from settings_library.rabbit import RabbitSettings
79

@@ -15,25 +17,21 @@ def get_rabbitmq_settings(app: FastAPI) -> RabbitSettings:
1517
return settings
1618

1719

18-
def setup_rabbitmq(app: FastAPI) -> None:
20+
async def setup_rabbitmq(app: FastAPI) -> AsyncIterator[State]:
1921
settings: RabbitSettings = get_rabbitmq_settings(app)
20-
app.state.rabbitmq_rpc_server = None
22+
await wait_till_rabbitmq_responsive(settings.dsn)
2123

22-
async def _on_startup() -> None:
23-
await wait_till_rabbitmq_responsive(settings.dsn)
24+
app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create(
25+
client_name=f"{PROJECT_NAME}_rpc_server", settings=settings
26+
)
2427

25-
app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create(
26-
client_name=f"{PROJECT_NAME}_rpc_server", settings=settings
27-
)
28-
29-
async def _on_shutdown() -> None:
28+
try:
29+
yield {}
30+
finally:
3031
if app.state.rabbitmq_rpc_server:
3132
await app.state.rabbitmq_rpc_server.close()
3233
app.state.rabbitmq_rpc_server = None
3334

34-
app.add_event_handler("startup", _on_startup)
35-
app.add_event_handler("shutdown", _on_shutdown)
36-
3735

3836
def get_rabbitmq_rpc_server(app: FastAPI) -> RabbitMQRPCClient:
3937
assert app.state.rabbitmq_rpc_server # nosec

0 commit comments

Comments
 (0)