Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 32 additions & 28 deletions packages/service-library/src/servicelib/fastapi/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,51 +11,55 @@
from fastapi import FastAPI
from fastapi_lifespan_manager import State
from pydantic import NonNegativeInt
from servicelib.fastapi.lifespan_utils import LifespanGenerator
from settings_library.docker_api_proxy import DockerApiProxysettings

_logger = logging.getLogger(__name__)

_DEFAULT_DOCKER_API_PROXY_HEALTH_TIMEOUT: Final[NonNegativeInt] = 5


def get_lifespan_remote_docker_client(
settings: DockerApiProxysettings,
) -> LifespanGenerator:
async def _(app: FastAPI) -> AsyncIterator[State]:
_DOCKER_API_PROXY_SETTINGS: Final[str] = "docker_api_proxy_settings"

session: ClientSession | None = None
if settings.DOCKER_API_PROXY_USER and settings.DOCKER_API_PROXY_PASSWORD:
session = ClientSession(
auth=aiohttp.BasicAuth(
login=settings.DOCKER_API_PROXY_USER,
password=settings.DOCKER_API_PROXY_PASSWORD.get_secret_value(),
)

def create_remote_docker_client_input_state(settings: DockerApiProxysettings) -> State:
return {_DOCKER_API_PROXY_SETTINGS: settings}


async def remote_docker_client_lifespan(
app: FastAPI, state: State
) -> AsyncIterator[State]:
settings: DockerApiProxysettings = state[_DOCKER_API_PROXY_SETTINGS]

session: ClientSession | None = None
if settings.DOCKER_API_PROXY_USER and settings.DOCKER_API_PROXY_PASSWORD:
session = ClientSession(
auth=aiohttp.BasicAuth(
login=settings.DOCKER_API_PROXY_USER,
password=settings.DOCKER_API_PROXY_PASSWORD.get_secret_value(),
)
)

async with AsyncExitStack() as exit_stack:
if settings.DOCKER_API_PROXY_USER and settings.DOCKER_API_PROXY_PASSWORD:
await exit_stack.enter_async_context(
ClientSession(
auth=aiohttp.BasicAuth(
login=settings.DOCKER_API_PROXY_USER,
password=settings.DOCKER_API_PROXY_PASSWORD.get_secret_value(),
)
async with AsyncExitStack() as exit_stack:
if settings.DOCKER_API_PROXY_USER and settings.DOCKER_API_PROXY_PASSWORD:
await exit_stack.enter_async_context(
ClientSession(
auth=aiohttp.BasicAuth(
login=settings.DOCKER_API_PROXY_USER,
password=settings.DOCKER_API_PROXY_PASSWORD.get_secret_value(),
)
)

client = await exit_stack.enter_async_context(
aiodocker.Docker(url=settings.base_url, session=session)
)

app.state.remote_docker_client = client
client = await exit_stack.enter_async_context(
aiodocker.Docker(url=settings.base_url, session=session)
)

await wait_till_docker_api_proxy_is_responsive(app)
app.state.remote_docker_client = client

# NOTE this has to be inside exit_stack scope
yield {}
await wait_till_docker_api_proxy_is_responsive(app)

return _
# NOTE this has to be inside exit_stack scope
yield {}


@tenacity.retry(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
from collections.abc import AsyncIterator
from typing import Protocol

from common_library.errors_classes import OsparcErrorMixin
from fastapi import FastAPI
from fastapi_lifespan_manager import LifespanManager, State


class LifespanError(OsparcErrorMixin, RuntimeError): ...
Expand All @@ -15,17 +10,3 @@ class LifespanOnStartupError(LifespanError):

class LifespanOnShutdownError(LifespanError):
msg_template = "Failed during shutdown of {module}"


class LifespanGenerator(Protocol):
def __call__(self, app: FastAPI) -> AsyncIterator["State"]: ...


def combine_lifespans(*generators: LifespanGenerator) -> LifespanManager:

manager = LifespanManager()

for generator in generators:
manager.add(generator)

return manager
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections.abc import AsyncIterator
from enum import Enum

from fastapi import FastAPI
from fastapi_lifespan_manager import State
from servicelib.logging_utils import log_catch, log_context
from settings_library.postgres import PostgresSettings
Expand All @@ -23,11 +24,11 @@ class PostgresConfigurationError(LifespanOnStartupError):
msg_template = "Invalid postgres settings [={settings}] on startup. Note that postgres cannot be disabled using settings"


def create_input_state(settings: PostgresSettings) -> State:
def create_postgres_database_input_state(settings: PostgresSettings) -> State:
return {PostgresLifespanState.POSTGRES_SETTINGS: settings}


async def postgres_database_lifespan(_, state: State) -> AsyncIterator[State]:
async def postgres_database_lifespan(_: FastAPI, state: State) -> AsyncIterator[State]:

with log_context(_logger, logging.INFO, f"{__name__}"):

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# pylint: disable=protected-access


from collections.abc import AsyncIterator
from typing import Final

from fastapi import FastAPI
from fastapi_lifespan_manager import State
Expand Down Expand Up @@ -54,9 +54,23 @@ def _on_shutdown() -> None:
return get_prometheus_instrumentator(app)


async def lifespan_prometheus_instrumentation(app: FastAPI) -> AsyncIterator[State]:
_PROMETHEUS_INSTRUMENTATION_ENABLED: Final[str] = "prometheus_instrumentation_enabled"


def create_prometheus_instrumentationmain_input_state(*, enabled: bool) -> State:
return {_PROMETHEUS_INSTRUMENTATION_ENABLED: enabled}


async def prometheus_instrumentation_lifespan(
app: FastAPI, state: State
) -> AsyncIterator[State]:
# NOTE: requires ``initialize_prometheus_instrumentation`` to be called before the
# lifespan of the applicaiton runs, usually rigth after the ``FastAPI`` instance is created
_startup(app)

instrumentaiton_enabled = state.get(_PROMETHEUS_INSTRUMENTATION_ENABLED, False)
if instrumentaiton_enabled:

_startup(app)
yield {}
_shutdown(app)
if instrumentaiton_enabled:
_shutdown(app)
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from servicelib.fastapi.lifespan_utils import (
LifespanOnShutdownError,
LifespanOnStartupError,
combine_lifespans,
)


Expand All @@ -35,7 +34,11 @@ async def cache_lifespan(app: FastAPI) -> AsyncIterator[State]:
yield {}
print("shutdown CACHE")

app = FastAPI(lifespan=combine_lifespans(database_lifespan, cache_lifespan))
lifespan_manager = LifespanManager()
lifespan_manager.add(database_lifespan)
lifespan_manager.add(cache_lifespan)

app = FastAPI(lifespan=lifespan_manager)

capsys.readouterr()

Expand Down
27 changes: 11 additions & 16 deletions services/catalog/src/simcore_service_catalog/core/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from fastapi import FastAPI
from fastapi_lifespan_manager import LifespanManager, State
from servicelib.fastapi.postgres_lifespan import (
PostgresLifespanState,
create_postgres_database_input_state,
)
from servicelib.fastapi.prometheus_instrumentation import (
lifespan_prometheus_instrumentation,
create_prometheus_instrumentationmain_input_state,
prometheus_instrumentation_lifespan,
)

from .._meta import APP_FINISHED_BANNER_MSG, APP_STARTED_BANNER_MSG
Expand Down Expand Up @@ -38,27 +39,21 @@ async def _banners_lifespan(_) -> AsyncIterator[State]:
_flush_finished_banner()


async def _main_lifespan(app: FastAPI) -> AsyncIterator[State]:
async def _settings_lifespan(app: FastAPI) -> AsyncIterator[State]:
settings: ApplicationSettings = app.state.settings

yield {
PostgresLifespanState.POSTGRES_SETTINGS: settings.CATALOG_POSTGRES,
"prometheus_instrumentation_enabled": settings.CATALOG_PROMETHEUS_INSTRUMENTATION_ENABLED,
**create_postgres_database_input_state(settings.CATALOG_POSTGRES),
**create_prometheus_instrumentationmain_input_state(
enabled=settings.CATALOG_PROMETHEUS_INSTRUMENTATION_ENABLED
),
}


async def _prometheus_instrumentation_lifespan(
app: FastAPI, state: State
) -> AsyncIterator[State]:
if state.get("prometheus_instrumentation_enabled", False):
async for prometheus_state in lifespan_prometheus_instrumentation(app):
yield prometheus_state


def create_app_lifespan():
def create_app_lifespan() -> LifespanManager:
# WARNING: order matters
app_lifespan = LifespanManager()
app_lifespan.add(_main_lifespan)
app_lifespan.add(_settings_lifespan)

# - postgres
app_lifespan.include(repository_lifespan_manager)
Expand All @@ -79,7 +74,7 @@ def create_app_lifespan():
app_lifespan.add(background_task_lifespan)

# - prometheus instrumentation
app_lifespan.add(_prometheus_instrumentation_lifespan)
app_lifespan.add(prometheus_instrumentation_lifespan)

app_lifespan.add(_banners_lifespan)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
_logger = logging.getLogger(__name__)


repository_lifespan_manager = LifespanManager()
repository_lifespan_manager.add(postgres_database_lifespan)


@repository_lifespan_manager.add
async def _database_lifespan(app: FastAPI, state: State) -> AsyncIterator[State]:
app.state.engine = state[PostgresLifespanState.POSTGRES_ASYNC_ENGINE]

Expand All @@ -26,3 +21,8 @@ async def _database_lifespan(app: FastAPI, state: State) -> AsyncIterator[State]
app.state.default_product_name = await repo.get_default_product_name()

yield {}


repository_lifespan_manager = LifespanManager()
repository_lifespan_manager.add(postgres_database_lifespan)
repository_lifespan_manager.add(_database_lifespan)
40 changes: 25 additions & 15 deletions services/docker-api-proxy/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@

import aiodocker
import pytest
from asgi_lifespan import LifespanManager
from asgi_lifespan import LifespanManager as ASGILifespanManager
from fastapi import FastAPI
from fastapi_lifespan_manager import LifespanManager, State
from pydantic import Field
from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict
from servicelib.fastapi.docker import (
get_lifespan_remote_docker_client,
create_remote_docker_client_input_state,
get_remote_docker_client,
remote_docker_client_lifespan,
)
from servicelib.fastapi.lifespan_utils import combine_lifespans
from settings_library.application import BaseApplicationSettings
from settings_library.docker_api_proxy import DockerApiProxysettings

Expand All @@ -32,20 +33,29 @@ def pytest_configure(config):
config.option.asyncio_mode = "auto"


def _get_test_app() -> FastAPI:
class ApplicationSetting(BaseApplicationSettings):
DOCKER_API_PROXY: Annotated[
DockerApiProxysettings,
Field(json_schema_extra={"auto_default_from_env": True}),
]
class ApplicationSetting(BaseApplicationSettings):
DOCKER_API_PROXY: Annotated[
DockerApiProxysettings,
Field(json_schema_extra={"auto_default_from_env": True}),
]


async def _settings_lifespan(app: FastAPI) -> AsyncIterator[State]:
settings: ApplicationSetting = app.state.settings

yield {
**create_remote_docker_client_input_state(settings.DOCKER_API_PROXY),
}


def _get_test_app() -> FastAPI:
settings = ApplicationSetting.create_from_envs()

app = FastAPI(
lifespan=combine_lifespans(
get_lifespan_remote_docker_client(settings.DOCKER_API_PROXY)
)
)
lifespan_manager = LifespanManager()
lifespan_manager.add(_settings_lifespan)
lifespan_manager.add(remote_docker_client_lifespan)

app = FastAPI(lifespan=lifespan_manager)
app.state.settings = settings

return app
Expand All @@ -61,7 +71,7 @@ async def _(env_vars: EnvVarsDict) -> AsyncIterator[aiodocker.Docker]:

app = _get_test_app()

async with LifespanManager(app, startup_timeout=30, shutdown_timeout=30):
async with ASGILifespanManager(app, startup_timeout=30, shutdown_timeout=30):
yield get_remote_docker_client(app)

return _
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
]


async def lifespan_rpc_api_routes(app: FastAPI) -> AsyncIterator[State]:
async def rpc_api_routes_lifespan(app: FastAPI) -> AsyncIterator[State]:
rpc_server = get_rabbitmq_rpc_server(app)
for router in ROUTERS:
await rpc_server.register_router(router, DYNAMIC_SCHEDULER_RPC_NAMESPACE, app)
Expand Down
Loading
Loading