Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 32 additions & 28 deletions packages/service-library/src/servicelib/fastapi/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,51 +11,55 @@
from fastapi import FastAPI
from fastapi_lifespan_manager import State
from pydantic import NonNegativeInt
from servicelib.fastapi.lifespan_utils import LifespanGenerator
from settings_library.docker_api_proxy import DockerApiProxysettings

_logger = logging.getLogger(__name__)

_DEFAULT_DOCKER_API_PROXY_HEALTH_TIMEOUT: Final[NonNegativeInt] = 5


def get_lifespan_remote_docker_client(
settings: DockerApiProxysettings,
) -> LifespanGenerator:
async def _(app: FastAPI) -> AsyncIterator[State]:
_DOCKER_API_PROXY_SETTINGS: Final[str] = "docker_api_proxy_settings"

session: ClientSession | None = None
if settings.DOCKER_API_PROXY_USER and settings.DOCKER_API_PROXY_PASSWORD:
session = ClientSession(
auth=aiohttp.BasicAuth(
login=settings.DOCKER_API_PROXY_USER,
password=settings.DOCKER_API_PROXY_PASSWORD.get_secret_value(),
)

def get_remote_docker_client_main_lifespan(settings: DockerApiProxysettings) -> State:
return {_DOCKER_API_PROXY_SETTINGS: settings}


async def remote_docker_client_lifespan(
app: FastAPI, state: State
) -> AsyncIterator[State]:
settings: DockerApiProxysettings = state[_DOCKER_API_PROXY_SETTINGS]

session: ClientSession | None = None
if settings.DOCKER_API_PROXY_USER and settings.DOCKER_API_PROXY_PASSWORD:
session = ClientSession(
auth=aiohttp.BasicAuth(
login=settings.DOCKER_API_PROXY_USER,
password=settings.DOCKER_API_PROXY_PASSWORD.get_secret_value(),
)
)

async with AsyncExitStack() as exit_stack:
if settings.DOCKER_API_PROXY_USER and settings.DOCKER_API_PROXY_PASSWORD:
await exit_stack.enter_async_context(
ClientSession(
auth=aiohttp.BasicAuth(
login=settings.DOCKER_API_PROXY_USER,
password=settings.DOCKER_API_PROXY_PASSWORD.get_secret_value(),
)
async with AsyncExitStack() as exit_stack:
if settings.DOCKER_API_PROXY_USER and settings.DOCKER_API_PROXY_PASSWORD:
await exit_stack.enter_async_context(
ClientSession(
auth=aiohttp.BasicAuth(
login=settings.DOCKER_API_PROXY_USER,
password=settings.DOCKER_API_PROXY_PASSWORD.get_secret_value(),
)
)

client = await exit_stack.enter_async_context(
aiodocker.Docker(url=settings.base_url, session=session)
)

app.state.remote_docker_client = client
client = await exit_stack.enter_async_context(
aiodocker.Docker(url=settings.base_url, session=session)
)

await wait_till_docker_api_proxy_is_responsive(app)
app.state.remote_docker_client = client

# NOTE this has to be inside exit_stack scope
yield {}
await wait_till_docker_api_proxy_is_responsive(app)

return _
# NOTE this has to be inside exit_stack scope
yield {}


@tenacity.retry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from common_library.errors_classes import OsparcErrorMixin
from fastapi import FastAPI
from fastapi_lifespan_manager import LifespanManager, State
from fastapi_lifespan_manager import State


class LifespanError(OsparcErrorMixin, RuntimeError): ...
Expand All @@ -19,13 +19,3 @@ class LifespanOnShutdownError(LifespanError):

class LifespanGenerator(Protocol):
def __call__(self, app: FastAPI) -> AsyncIterator["State"]: ...


def combine_lifespans(*generators: LifespanGenerator) -> LifespanManager:

manager = LifespanManager()

for generator in generators:
manager.add(generator)

return manager
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class PostgresConfigurationError(LifespanOnStartupError):
msg_template = "Invalid postgres settings [={settings}] on startup. Note that postgres cannot be disabled using settings"


def create_input_state(settings: PostgresSettings) -> State:
def get_postgres_database_main_lifespan(settings: PostgresSettings) -> State:
return {PostgresLifespanState.POSTGRES_SETTINGS: settings}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# pylint: disable=protected-access


from collections.abc import AsyncIterator
from typing import Final

from fastapi import FastAPI
from fastapi_lifespan_manager import State
Expand Down Expand Up @@ -54,9 +54,23 @@ def _on_shutdown() -> None:
return get_prometheus_instrumentator(app)


async def lifespan_prometheus_instrumentation(app: FastAPI) -> AsyncIterator[State]:
_PROMETHEUS_INSTRUMENTATION_ENABLED: Final[str] = "prometheus_instrumentation_enabled"


def get_prometheus_instrumentationmain_main_lifespan(*, enabled: bool) -> State:
return {_PROMETHEUS_INSTRUMENTATION_ENABLED: enabled}


async def prometheus_instrumentation_lifespan(
app: FastAPI, state: State
) -> AsyncIterator[State]:
# NOTE: requires ``initialize_prometheus_instrumentation`` to be called before the
# lifespan of the applicaiton runs, usually rigth after the ``FastAPI`` instance is created
_startup(app)

instrumentaiton_enabled = state.get(_PROMETHEUS_INSTRUMENTATION_ENABLED, False)
if instrumentaiton_enabled:

_startup(app)
yield {}
_shutdown(app)
if instrumentaiton_enabled:
_shutdown(app)
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from servicelib.fastapi.lifespan_utils import (
LifespanOnShutdownError,
LifespanOnStartupError,
combine_lifespans,
)


Expand All @@ -35,7 +34,11 @@ async def cache_lifespan(app: FastAPI) -> AsyncIterator[State]:
yield {}
print("shutdown CACHE")

app = FastAPI(lifespan=combine_lifespans(database_lifespan, cache_lifespan))
lifespan_manager = LifespanManager()
lifespan_manager.add(database_lifespan)
lifespan_manager.add(cache_lifespan)

app = FastAPI(lifespan=lifespan_manager)

capsys.readouterr()

Expand Down
23 changes: 9 additions & 14 deletions services/catalog/src/simcore_service_catalog/core/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
from fastapi import FastAPI
from fastapi_lifespan_manager import LifespanManager, State
from servicelib.fastapi.postgres_lifespan import (
PostgresLifespanState,
get_postgres_database_main_lifespan,
postgres_database_lifespan,
)
from servicelib.fastapi.prometheus_instrumentation import (
lifespan_prometheus_instrumentation,
get_prometheus_instrumentationmain_main_lifespan,
prometheus_instrumentation_lifespan,
)

from .._meta import APP_FINISHED_BANNER_MSG, APP_STARTED_BANNER_MSG
Expand Down Expand Up @@ -43,20 +44,14 @@ async def _main_lifespan(app: FastAPI) -> AsyncIterator[State]:
settings: ApplicationSettings = app.state.settings

yield {
PostgresLifespanState.POSTGRES_SETTINGS: settings.CATALOG_POSTGRES,
"prometheus_instrumentation_enabled": settings.CATALOG_PROMETHEUS_INSTRUMENTATION_ENABLED,
**get_postgres_database_main_lifespan(settings.CATALOG_POSTGRES),
**get_prometheus_instrumentationmain_main_lifespan(
enabled=settings.CATALOG_PROMETHEUS_INSTRUMENTATION_ENABLED
),
}


async def _prometheus_instrumentation_lifespan(
app: FastAPI, state: State
) -> AsyncIterator[State]:
if state.get("prometheus_instrumentation_enabled", False):
async for prometheus_state in lifespan_prometheus_instrumentation(app):
yield prometheus_state


def create_app_lifespan():
def create_app_lifespan() -> LifespanManager:
# WARNING: order matters
app_lifespan = LifespanManager()
app_lifespan.add(_main_lifespan)
Expand All @@ -81,7 +76,7 @@ def create_app_lifespan():
app_lifespan.add(background_task_lifespan)

# - prometheus instrumentation
app_lifespan.add(_prometheus_instrumentation_lifespan)
app_lifespan.add(prometheus_instrumentation_lifespan)

app_lifespan.add(_banners_lifespan)

Expand Down
40 changes: 25 additions & 15 deletions services/docker-api-proxy/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@

import aiodocker
import pytest
from asgi_lifespan import LifespanManager
from asgi_lifespan import LifespanManager as ASGILifespanManager
from fastapi import FastAPI
from fastapi_lifespan_manager import LifespanManager, State
from pydantic import Field
from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict
from servicelib.fastapi.docker import (
get_lifespan_remote_docker_client,
get_remote_docker_client,
get_remote_docker_client_main_lifespan,
remote_docker_client_lifespan,
)
from servicelib.fastapi.lifespan_utils import combine_lifespans
from settings_library.application import BaseApplicationSettings
from settings_library.docker_api_proxy import DockerApiProxysettings

Expand All @@ -32,20 +33,29 @@ def pytest_configure(config):
config.option.asyncio_mode = "auto"


def _get_test_app() -> FastAPI:
class ApplicationSetting(BaseApplicationSettings):
DOCKER_API_PROXY: Annotated[
DockerApiProxysettings,
Field(json_schema_extra={"auto_default_from_env": True}),
]
class ApplicationSetting(BaseApplicationSettings):
DOCKER_API_PROXY: Annotated[
DockerApiProxysettings,
Field(json_schema_extra={"auto_default_from_env": True}),
]


async def _main_lifespan(app: FastAPI) -> AsyncIterator[State]:
settings: ApplicationSetting = app.state.settings

yield {
**get_remote_docker_client_main_lifespan(settings.DOCKER_API_PROXY),
}


def _get_test_app() -> FastAPI:
settings = ApplicationSetting.create_from_envs()

app = FastAPI(
lifespan=combine_lifespans(
get_lifespan_remote_docker_client(settings.DOCKER_API_PROXY)
)
)
lifespan_manager = LifespanManager()
lifespan_manager.add(_main_lifespan)
lifespan_manager.add(remote_docker_client_lifespan)

app = FastAPI(lifespan=lifespan_manager)
app.state.settings = settings

return app
Expand All @@ -61,7 +71,7 @@ async def _(env_vars: EnvVarsDict) -> AsyncIterator[aiodocker.Docker]:

app = _get_test_app()

async with LifespanManager(app, startup_timeout=30, shutdown_timeout=30):
async with ASGILifespanManager(app, startup_timeout=30, shutdown_timeout=30):
yield get_remote_docker_client(app)

return _
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
]


async def lifespan_rpc_api_routes(app: FastAPI) -> AsyncIterator[State]:
async def rpc_api_routes_lifespan(app: FastAPI) -> AsyncIterator[State]:
rpc_server = get_rabbitmq_rpc_server(app)
for router in ROUTERS:
await rpc_server.register_router(router, DYNAMIC_SCHEDULER_RPC_NAMESPACE, app)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,70 +1,21 @@
from collections.abc import AsyncIterator

from fastapi import FastAPI
from fastapi_lifespan_manager import State
from servicelib.fastapi.docker import get_lifespan_remote_docker_client
from servicelib.fastapi.lifespan_utils import LifespanGenerator, combine_lifespans
from servicelib.fastapi.openapi import override_fastapi_openapi_method
from servicelib.fastapi.profiler import initialize_profiler
from servicelib.fastapi.prometheus_instrumentation import (
initialize_prometheus_instrumentation,
lifespan_prometheus_instrumentation,
)
from servicelib.fastapi.tracing import initialize_tracing

from .._meta import (
API_VERSION,
API_VTAG,
APP_FINISHED_BANNER_MSG,
APP_NAME,
APP_STARTED_BANNER_MSG,
PROJECT_NAME,
SUMMARY,
)
from .._meta import API_VERSION, API_VTAG, APP_NAME, PROJECT_NAME, SUMMARY
from ..api.frontend import initialize_frontend
from ..api.rest.routes import initialize_rest_api
from ..api.rpc.routes import lifespan_rpc_api_routes
from ..services.catalog import lifespan_catalog
from ..services.deferred_manager import lifespan_deferred_manager
from ..services.director_v0 import lifespan_director_v0
from ..services.director_v2 import lifespan_director_v2
from ..services.notifier import get_lifespans_notifier
from ..services.rabbitmq import lifespan_rabbitmq
from ..services.redis import lifespan_redis
from ..services.service_tracker import lifespan_service_tracker
from ..services.status_monitor import lifespan_status_monitor
from . import events
from .settings import ApplicationSettings


async def _lifespan_banner(app: FastAPI) -> AsyncIterator[State]:
_ = app
print(APP_STARTED_BANNER_MSG, flush=True) # noqa: T201
yield {}
print(APP_FINISHED_BANNER_MSG, flush=True) # noqa: T201


def create_app(settings: ApplicationSettings | None = None) -> FastAPI:
app_settings = settings or ApplicationSettings.create_from_envs()

lifespans: list[LifespanGenerator] = [
lifespan_director_v2,
lifespan_director_v0,
lifespan_catalog,
lifespan_rabbitmq,
lifespan_rpc_api_routes,
lifespan_redis,
*get_lifespans_notifier(),
lifespan_service_tracker,
lifespan_deferred_manager,
lifespan_status_monitor,
get_lifespan_remote_docker_client(
app_settings.DYNAMIC_SCHEDULER_DOCKER_API_PROXY
),
]

if app_settings.DYNAMIC_SCHEDULER_PROMETHEUS_INSTRUMENTATION_ENABLED:
lifespans.append(lifespan_prometheus_instrumentation)

app = FastAPI(
title=f"{PROJECT_NAME} web API",
description=SUMMARY,
Expand All @@ -74,7 +25,7 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI:
"/doc" if app_settings.DYNAMIC_SCHEDULER_SWAGGER_API_DOC_ENABLED else None
),
redoc_url=None,
lifespan=combine_lifespans(*lifespans, _lifespan_banner),
lifespan=events.create_app_lifespan(),
)
override_fastapi_openapi_method(app)

Expand All @@ -84,7 +35,8 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI:

initialize_rest_api(app)

initialize_prometheus_instrumentation(app)
if app_settings.DYNAMIC_SCHEDULER_PROMETHEUS_INSTRUMENTATION_ENABLED:
initialize_prometheus_instrumentation(app)

initialize_frontend(app)

Expand Down
Loading
Loading