Skip to content

Commit 8b8d89d

Browse files
committed
🔧 Refactor: Update setup functions to use AsyncIterator for improved lifecycle management
1 parent 9c677c0 commit 8b8d89d

File tree

8 files changed

+116
-105
lines changed

8 files changed

+116
-105
lines changed

packages/service-library/src/servicelib/fastapi/db_asyncpg_engine.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,17 @@
1313
_logger = logging.getLogger(__name__)
1414

1515

16-
async def connect_to_db(app: FastAPI, settings: PostgresSettings) -> None:
16+
async def connect_to_postgres_until_ready(settings: PostgresSettings) -> AsyncEngine:
1717
with log_context(
1818
_logger,
1919
logging.DEBUG,
20-
f"Connecting and migraging {settings.dsn_with_async_sqlalchemy}",
20+
f"Connecting to {settings.dsn_with_async_sqlalchemy}",
2121
):
22-
engine = await create_async_engine_and_pg_database_ready(settings)
22+
return await create_async_engine_and_pg_database_ready(settings)
23+
24+
25+
async def connect_to_db(app: FastAPI, settings: PostgresSettings) -> None:
26+
engine = await connect_to_postgres_until_ready(settings)
2327

2428
app.state.engine = engine
2529
_logger.debug(

services/catalog/src/simcore_service_catalog/core/application.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import logging
2+
from collections.abc import AsyncIterator
23

34
from fastapi import FastAPI
45
from fastapi.middleware.gzip import GZipMiddleware
6+
from fastapi_lifespan_manager import LifespanManager, State
57
from models_library.basic_types import BootModeEnum
68
from servicelib.fastapi import timing_middleware
79
from servicelib.fastapi.openapi import override_fastapi_openapi_method
@@ -11,8 +13,6 @@
1113
)
1214
from servicelib.fastapi.tracing import initialize_tracing
1315
from simcore_service_catalog.core.background_tasks import setup_background_task
14-
from simcore_service_catalog.infrastructure.director import setup_director
15-
from simcore_service_catalog.infrastructure.postgres import setup_postgres_database
1616
from starlette.middleware.base import BaseHTTPMiddleware
1717

1818
from .._meta import (
@@ -27,7 +27,10 @@
2727
from ..api.rest.routes import setup_rest_api_routes
2828
from ..api.rpc.routes import setup_rpc_api_routes
2929
from ..exceptions.handlers import setup_exception_handlers
30-
from ..infrastructure.rabbitmq import setup_rabbitmq
30+
from ..infrastructure.director import director_lifespan
31+
from ..infrastructure.postgres import postgres_lifespan
32+
from ..infrastructure.rabbitmq import rabbitmq_lifespan
33+
from ..repository.setup import setup_repository
3134
from ..service.function_services import setup_function_services
3235
from .settings import ApplicationSettings
3336

@@ -44,15 +47,33 @@
4447
)
4548

4649

47-
def _flush_started_banner() -> None:
50+
async def _setup_banner(app: FastAPI) -> AsyncIterator[State]:
4851
# WARNING: this function is spied in the tests
52+
assert app
4953
print(APP_STARTED_BANNER_MSG, flush=True) # noqa: T201
5054

55+
yield {}
5156

52-
def _flush_finished_banner() -> None:
5357
print(APP_FINISHED_BANNER_MSG, flush=True) # noqa: T201
5458

5559

60+
def _create_app_lifespan(settings: ApplicationSettings):
61+
app_lifespan = LifespanManager()
62+
63+
app_lifespan.add(_setup_banner)
64+
65+
postgres_lifespan.add(setup_repository)
66+
app_lifespan.include(postgres_lifespan)
67+
68+
app_lifespan.include(director_lifespan)
69+
app_lifespan.add(rabbitmq_lifespan)
70+
71+
app_lifespan.add(setup_function_services)
72+
app_lifespan.add(setup_background_task)
73+
74+
return app_lifespan
75+
76+
5677
def create_app(settings: ApplicationSettings | None = None) -> FastAPI:
5778
# keep mostly quiet noisy loggers
5879
quiet_level: int = max(
@@ -76,6 +97,7 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI:
7697
openapi_url=f"/api/{API_VTAG}/openapi.json",
7798
docs_url="/dev/doc",
7899
redoc_url=None, # default disabled
100+
lifespan=_create_app_lifespan(settings),
79101
)
80102
override_fastapi_openapi_method(app)
81103

@@ -85,15 +107,6 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI:
85107
if settings.CATALOG_TRACING:
86108
initialize_tracing(app, settings.CATALOG_TRACING, APP_NAME)
87109

88-
# STARTUP-EVENT
89-
app.add_event_handler("startup", _flush_started_banner)
90-
91-
setup_postgres_database(app)
92-
setup_director(app)
93-
setup_function_services(app)
94-
setup_rabbitmq(app)
95-
setup_background_task(app)
96-
97110
if app.state.settings.CATALOG_PROMETHEUS_INSTRUMENTATION_ENABLED:
98111
setup_prometheus_instrumentation(app)
99112

@@ -113,9 +126,6 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI:
113126
setup_rest_api_routes(app, vtag=API_VTAG)
114127
setup_rpc_api_routes(app)
115128

116-
# SHUTDOWN-EVENT
117-
app.add_event_handler("shutdown", _flush_finished_banner)
118-
119129
# EXCEPTIONS
120130
setup_exception_handlers(app)
121131

services/catalog/src/simcore_service_catalog/core/background_tasks.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111

1212
import asyncio
1313
import logging
14+
from collections.abc import AsyncIterator
1415
from contextlib import suppress
1516
from pprint import pformat
1617
from typing import Final
1718

1819
from fastapi import FastAPI, HTTPException
20+
from fastapi_lifespan_manager import State
1921
from models_library.services import ServiceMetaDataPublished
2022
from models_library.services_types import ServiceKey, ServiceVersion
2123
from packaging.version import Version
@@ -213,29 +215,23 @@ async def _sync_services_task(app: FastAPI) -> None:
213215
)
214216

215217

216-
async def _start_registry_sync_task(app: FastAPI) -> None:
218+
async def setup_background_task(app: FastAPI) -> AsyncIterator[State]:
219+
# FIXME: check director service is in place and ready. Hand-shake??
220+
# SEE https://github.com/ITISFoundation/osparc-simcore/issues/1728
221+
217222
# FIXME: added this variable to overcome the state in which the
218223
# task cancelation is ignored and the exceptions enter in a loop
219224
# that never stops the background task. This flag is an additional
220225
# mechanism to enforce stopping the background task
221-
app.state.registry_syncer_running = True
222226
task = asyncio.create_task(_sync_services_task(app))
223-
app.state.registry_sync_task = task
227+
224228
_logger.info("registry syncing task started")
225229

230+
yield {"registry_syncer_running": True, "registry_sync_task": task}
226231

227-
async def _stop_registry_sync_task(app: FastAPI) -> None:
228-
if task := app.state.registry_sync_task:
229-
with suppress(asyncio.CancelledError):
230-
app.state.registry_syncer_running = False
231-
task.cancel()
232-
await task
233-
app.state.registry_sync_task = None
234-
_logger.info("registry syncing task stopped")
232+
with suppress(asyncio.CancelledError):
233+
app.state.registry_syncer_running = False
234+
task.cancel()
235+
await task
235236

236-
237-
def setup_background_task(app: FastAPI):
238-
# FIXME: check director service is in place and ready. Hand-shake??
239-
# SEE https://github.com/ITISFoundation/osparc-simcore/issues/1728
240-
app.add_event_handler("startup", _start_registry_sync_task)
241-
app.add_event_handler("shutdown", _stop_registry_sync_task)
237+
_logger.info("registry syncing task stopped")

services/catalog/src/simcore_service_catalog/infrastructure/director.py

Lines changed: 24 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
import json
44
import logging
55
import urllib.parse
6-
from collections.abc import Awaitable, Callable
6+
from collections.abc import AsyncIterator, Awaitable, Callable
77
from contextlib import suppress
88
from pprint import pformat
99
from typing import Any, Final
1010

1111
import httpx
1212
from common_library.json_serialization import json_dumps
1313
from fastapi import FastAPI, HTTPException
14+
from fastapi_lifespan_manager import LifespanManager, State
1415
from models_library.api_schemas_directorv2.services import ServiceExtras
1516
from models_library.services_metadata_published import ServiceMetaDataPublished
1617
from models_library.services_types import ServiceKey, ServiceVersion
@@ -288,38 +289,30 @@ async def get_service_extras(
288289
return TypeAdapter(ServiceExtras).validate_python(result)
289290

290291

291-
async def _initialize_director_client(app: FastAPI) -> None:
292-
if settings := app.state.settings.CATALOG_DIRECTOR:
293-
with log_context(
294-
_logger, logging.DEBUG, "Setup director at %s", f"{settings.base_url=}"
295-
):
296-
async for attempt in AsyncRetrying(**_director_startup_retry_policy):
297-
client = DirectorApi(base_url=settings.base_url, app=app)
298-
with attempt:
299-
client = DirectorApi(base_url=settings.base_url, app=app)
300-
if not await client.is_responsive():
301-
with suppress(Exception):
302-
await client.close()
303-
raise DirectorUnresponsiveError
304-
305-
_logger.info(
306-
"Connection to director-v0 succeded [%s]",
307-
json_dumps(attempt.retry_state.retry_object.statistics),
308-
)
309-
310-
# set when connected
311-
app.state.director_api = client
292+
director_lifespan = LifespanManager()
312293

313294

314-
async def _shutdown_director_client(app: FastAPI) -> None:
315-
client: DirectorApi | None
316-
if client := app.state.director_api:
317-
await client.close()
318-
319-
_logger.debug("Director client closed successfully")
295+
@director_lifespan.add
296+
async def _setup_director(app: FastAPI) -> AsyncIterator[State]:
297+
settings = app.state.settings.CATALOG_DIRECTOR
320298

299+
with log_context(
300+
_logger, logging.DEBUG, "Setup director at %s", f"{settings.base_url=}"
301+
):
302+
async for attempt in AsyncRetrying(**_director_startup_retry_policy):
303+
client = DirectorApi(base_url=settings.base_url, app=app)
304+
with attempt:
305+
client = DirectorApi(base_url=settings.base_url, app=app)
306+
if not await client.is_responsive():
307+
with suppress(Exception):
308+
await client.close()
309+
raise DirectorUnresponsiveError
310+
311+
_logger.info(
312+
"Connection to director-v0 succeded [%s]",
313+
json_dumps(attempt.retry_state.retry_object.statistics),
314+
)
321315

322-
def setup_director(app: FastAPI):
316+
yield {"director_api": client}
323317

324-
app.add_event_handler("startup", _initialize_director_client)
325-
app.add_event_handler("shutdown", _shutdown_director_client)
318+
await client.close()
Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,29 @@
1-
import contextlib
21
import logging
2+
from collections.abc import AsyncIterator
33

44
from fastapi import FastAPI
5-
from servicelib.fastapi.db_asyncpg_engine import close_db_connection, connect_to_db
5+
from fastapi_lifespan_manager import LifespanManager, State
6+
from servicelib.fastapi.db_asyncpg_engine import connect_to_postgres_until_ready
67
from servicelib.logging_utils import log_catch, log_context
7-
8-
from ..repository.products import setup_default_product
8+
from sqlalchemy.ext.asyncio import AsyncEngine
99

1010
_logger = logging.getLogger(__name__)
1111

1212

13-
def setup_postgres_database(app: FastAPI):
13+
postgres_lifespan = LifespanManager()
1414

15-
async def _():
16-
with log_context(_logger, logging.INFO, f"{__name__} startup ..."):
17-
# connection
18-
await connect_to_db(app, app.state.settings.CATALOG_POSTGRES)
1915

20-
# configuring default product
21-
await setup_default_product(app)
16+
@postgres_lifespan.add
17+
async def setup_postgres_database(app: FastAPI) -> AsyncIterator[State]:
2218

23-
yield
19+
with log_context(_logger, logging.INFO, f"{__name__} startup ..."):
20+
engine: AsyncEngine = await connect_to_postgres_until_ready(
21+
app.state.settings.CATALOG_POSTGRES
22+
)
2423

25-
with log_context(
26-
_logger, logging.INFO, f"{__name__} shutdown ..."
27-
), contextlib.suppress(Exception), log_catch(_logger):
28-
await close_db_connection(app)
24+
yield {"engine": engine}
2925

30-
return _
26+
with log_context(_logger, logging.INFO, f"{__name__} shutdown ..."), log_catch(
27+
_logger, reraise=False
28+
):
29+
await engine.dispose()

services/catalog/src/simcore_service_catalog/infrastructure/rabbitmq.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import logging
2+
from collections.abc import AsyncIterator
23
from typing import cast
34

45
from fastapi import FastAPI
6+
from fastapi_lifespan_manager import LifespanManager, State
57
from servicelib.rabbitmq import RabbitMQRPCClient, wait_till_rabbitmq_responsive
68
from settings_library.rabbit import RabbitSettings
79

@@ -15,24 +17,23 @@ def get_rabbitmq_settings(app: FastAPI) -> RabbitSettings:
1517
return settings
1618

1719

18-
def setup_rabbitmq(app: FastAPI) -> None:
20+
rabbitmq_lifespan = LifespanManager()
21+
22+
23+
@rabbitmq_lifespan.add
24+
async def setup_rabbitmq(app: FastAPI) -> AsyncIterator[State]:
1925
settings: RabbitSettings = get_rabbitmq_settings(app)
20-
app.state.rabbitmq_rpc_server = None
2126

22-
async def _on_startup() -> None:
23-
await wait_till_rabbitmq_responsive(settings.dsn)
27+
await wait_till_rabbitmq_responsive(settings.dsn)
2428

25-
app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create(
26-
client_name=f"{PROJECT_NAME}_rpc_server", settings=settings
27-
)
29+
rabbitmq_rpc_server = await RabbitMQRPCClient.create(
30+
client_name=f"{PROJECT_NAME}_rpc_server",
31+
settings=settings,
32+
)
2833

29-
async def _on_shutdown() -> None:
30-
if app.state.rabbitmq_rpc_server:
31-
await app.state.rabbitmq_rpc_server.close()
32-
app.state.rabbitmq_rpc_server = None
34+
yield {"rabbitmq_rpc_server": rabbitmq_rpc_server}
3335

34-
app.add_event_handler("startup", _on_startup)
35-
app.add_event_handler("shutdown", _on_shutdown)
36+
await rabbitmq_rpc_server.close()
3637

3738

3839
def get_rabbitmq_rpc_server(app: FastAPI) -> RabbitMQRPCClient:
Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
from collections.abc import AsyncIterator
2+
13
from fastapi import FastAPI
4+
from fastapi_lifespan_manager import State
25

36
from .products import ProductsRepository
47

58

6-
async def setup_repository(app: FastAPI):
9+
async def setup_repository(app: FastAPI) -> AsyncIterator[State]:
710
repo = ProductsRepository(db_engine=app.state.engine)
8-
app.state.default_product_name = await repo.get_default_product_name()
11+
12+
yield {"default_product_name": await repo.get_default_product_name()}

services/catalog/src/simcore_service_catalog/service/function_services.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
from collections.abc import AsyncIterator
2+
13
# mypy: disable-error-code=truthy-function
24
from typing import Any
35

46
from fastapi import status
57
from fastapi.applications import FastAPI
68
from fastapi.exceptions import HTTPException
9+
from fastapi_lifespan_manager import State
710
from models_library.function_services_catalog import (
811
is_function_service,
912
iter_service_docker_data,
@@ -31,12 +34,13 @@ def get_function_service(key, version) -> ServiceMetaDataPublished:
3134
) from err
3235

3336

34-
def setup_function_services(app: FastAPI):
35-
def _on_startup() -> None:
36-
catalog = [_as_dict(metadata) for metadata in iter_service_docker_data()]
37-
app.state.frontend_services_catalog = catalog
37+
async def setup_function_services(app: FastAPI) -> AsyncIterator[State]:
38+
assert app # nosec
39+
assert not hasattr(app.state, "frontend_services_catalog") # nosec
40+
41+
catalog = [_as_dict(metadata) for metadata in iter_service_docker_data()]
3842

39-
app.add_event_handler("startup", _on_startup)
43+
yield {"frontend_services_catalog": catalog}
4044

4145

4246
__all__: tuple[str, ...] = (

0 commit comments

Comments
 (0)