Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from . import server
from ._request import LONG_RUNNING_TASKS_CONTEXT_REQKEY

__all__ = ("server",)
__all__: tuple[str, ...] = (
"server",
"LONG_RUNNING_TASKS_CONTEXT_REQKEY",
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,3 @@
from pydantic import PositiveFloat

MINUTE: Final[PositiveFloat] = 60
APP_LONG_RUNNING_MANAGER_KEY: Final[str] = (
f"{__name__ }.long_running_tasks.tasks_manager"
)
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY: Final[str] = (
f"{__name__}.long_running_tasks.context"
)
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Final

from aiohttp import web

from ...long_running_tasks.manager import LongRunningManager
from ...long_running_tasks.models import TaskContext
from ._constants import APP_LONG_RUNNING_MANAGER_KEY
from ._request import get_task_context


Expand All @@ -13,6 +14,10 @@ def get_task_context(request: web.Request) -> TaskContext:
return get_task_context(request)


LONG_RUNNING_MANAGER_APPKEY: Final = web.AppKey(
"LONG_RUNNING_MANAGER", AiohttpLongRunningManager
)


def get_long_running_manager(app: web.Application) -> AiohttpLongRunningManager:
output: AiohttpLongRunningManager = app[APP_LONG_RUNNING_MANAGER_KEY]
return output
return app[LONG_RUNNING_MANAGER_APPKEY]
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import Any
from typing import Any, Final

from aiohttp import web

from ._constants import RQT_LONG_RUNNING_TASKS_CONTEXT_KEY
LONG_RUNNING_TASKS_CONTEXT_REQKEY: Final = f"{__name__}.long_running_tasks.context"


def get_task_context(request: web.Request) -> dict[str, Any]:
output: dict[str, Any] = request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY]
return output
ctx: dict[str, Any] = request[LONG_RUNNING_TASKS_CONTEXT_REQKEY]
return ctx
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
)
from ..typing_extension import Handler
from . import _routes
from ._constants import (
APP_LONG_RUNNING_MANAGER_KEY,
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
)
from ._error_handlers import base_long_running_error_handler
from ._manager import AiohttpLongRunningManager, get_long_running_manager
from ._manager import (
LONG_RUNNING_MANAGER_APPKEY,
AiohttpLongRunningManager,
get_long_running_manager,
)
from ._request import (
LONG_RUNNING_TASKS_CONTEXT_REQKEY,
)


def _no_ops_decorator(handler: Handler):
Expand All @@ -44,7 +47,7 @@ def _no_ops_decorator(handler: Handler):
def _no_task_context_decorator(handler: Handler):
@wraps(handler)
async def _wrap(request: web.Request):
request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY] = {}
request[LONG_RUNNING_TASKS_CONTEXT_REQKEY] = {}
return await handler(request)

return _wrap
Expand Down Expand Up @@ -178,7 +181,7 @@ async def on_cleanup_ctx(app: web.Application) -> AsyncGenerator[None, None]:
app.middlewares.append(base_long_running_error_handler)

# add components to state
app[APP_LONG_RUNNING_MANAGER_KEY] = long_running_manager = (
app[LONG_RUNNING_MANAGER_APPKEY] = long_running_manager = (
AiohttpLongRunningManager(
stale_task_check_interval=stale_task_check_interval,
stale_task_detect_timeout=stale_task_detect_timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from pydantic import TypeAdapter, create_model
from pytest_simcore.helpers.assert_checks import assert_status
from servicelib.aiohttp import long_running_tasks, status
from servicelib.aiohttp.long_running_tasks._server import (
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
from servicelib.aiohttp.long_running_tasks._request import (
LONG_RUNNING_TASKS_CONTEXT_REQKEY,
)
from servicelib.aiohttp.requests_validation import parse_request_query_parameters_as
from servicelib.aiohttp.rest_middlewares import append_rest_middlewares
Expand Down Expand Up @@ -56,7 +56,7 @@ async def _test_task_context_decorator(
) -> web.StreamResponse:
"""this task context callback tries to get the user_id from the query if available"""
query_param = parse_request_query_parameters_as(query_model, request)
request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY] = query_param.model_dump()
request[LONG_RUNNING_TASKS_CONTEXT_REQKEY] = query_param.model_dump()
return await handler(request)

return _test_task_context_decorator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
from functools import wraps

from aiohttp import web
from models_library.utils.fastapi_encoders import jsonable_encoder
from servicelib.aiohttp.long_running_tasks._constants import (
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
from servicelib.aiohttp.long_running_tasks import (
LONG_RUNNING_TASKS_CONTEXT_REQKEY,
)
from servicelib.aiohttp.long_running_tasks.server import setup
from servicelib.aiohttp.typing_extension import Handler
Expand All @@ -31,7 +30,9 @@ async def _test_task_context_decorator(
) -> web.StreamResponse:
"""this task context callback tries to get the user_id from the query if available"""
req_ctx = AuthenticatedRequestContext.model_validate(request)
request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY] = jsonable_encoder(req_ctx)
request[LONG_RUNNING_TASKS_CONTEXT_REQKEY] = req_ctx.model_dump(
mode="json", by_alias=True
)
return await handler(request)

return _test_task_context_decorator
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import logging
from typing import Protocol, cast
from typing import Final, Protocol

from aiohttp import web
from models_library.api_schemas_webserver.permalinks import ProjectPermalink
Expand All @@ -10,7 +10,6 @@
from .exceptions import PermalinkFactoryError, PermalinkNotAllowedError
from .models import ProjectDict

_PROJECT_PERMALINK = f"{__name__}"
_logger = logging.getLogger(__name__)


Expand All @@ -24,16 +23,21 @@ async def __call__(
) -> ProjectPermalink: ...


_PROJECT_PERMALINK_FACTORY_APPKEY: Final = web.AppKey(
"PROJECT_PERMALINK_FACTORY", CreateLinkCoroutine
)


def register_factory(app: web.Application, factory_coro: CreateLinkCoroutine):
if _create := app.get(_PROJECT_PERMALINK):
if _create := app.get(_PROJECT_PERMALINK_FACTORY_APPKEY):
msg = f"Permalink factory can only be set once: registered {_create}"
raise PermalinkFactoryError(msg)
app[_PROJECT_PERMALINK] = factory_coro
app[_PROJECT_PERMALINK_FACTORY_APPKEY] = factory_coro


def _get_factory(app: web.Application) -> CreateLinkCoroutine:
if _create := app.get(_PROJECT_PERMALINK):
return cast(CreateLinkCoroutine, _create)
if _create := app.get(_PROJECT_PERMALINK_FACTORY_APPKEY):
return _create

msg = "Undefined permalink factory. Check plugin initialization."
raise PermalinkFactoryError(msg)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,15 @@
"""

import logging
from typing import Final

from aiohttp import web

from ..application_setup import ModuleCategory, app_setup_func
from ..redis import setup_redis
from ._constants import APP_CLIENT_SOCKET_REGISTRY_KEY, APP_RESOURCE_MANAGER_TASKS_KEY
from .registry import RedisResourceRegistry
from .registry import CLIENT_SOCKET_REGISTRY_APPKEY, RedisResourceRegistry

_logger = logging.getLogger(__name__)

APP_RESOURCE_MANAGER_CLIENT_KEY: Final = web.AppKey(
"APP_RESOURCE_MANAGER_CLIENT_KEY", object
)


@app_setup_func(
"simcore_service_webserver.resource_manager",
Expand All @@ -33,9 +27,7 @@
def setup_resource_manager(app: web.Application) -> bool:
"""Sets up resource manager subsystem in the application"""

app[APP_RESOURCE_MANAGER_TASKS_KEY] = []

setup_redis(app)
app[APP_CLIENT_SOCKET_REGISTRY_KEY] = RedisResourceRegistry(app)
app[CLIENT_SOCKET_REGISTRY_APPKEY] = RedisResourceRegistry(app)

return True
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
"""

import logging
from typing import Final

import redis.asyncio as aioredis
from aiohttp import web
from servicelib.redis import handle_redis_returns_union_types

from ..redis import get_redis_resources_client
from ._constants import APP_CLIENT_SOCKET_REGISTRY_KEY
from .models import (
ALIVE_SUFFIX,
RESOURCE_SUFFIX,
Expand Down Expand Up @@ -144,7 +144,12 @@ async def get_all_resource_keys(self) -> tuple[AliveSessions, DeadSessions]:
return (alive_keys, dead_keys)


CLIENT_SOCKET_REGISTRY_APPKEY: Final = web.AppKey(
"CLIENT_SOCKET_REGISTRY", RedisResourceRegistry
)


def get_registry(app: web.Application) -> RedisResourceRegistry:
client: RedisResourceRegistry = app[APP_CLIENT_SOCKET_REGISTRY_KEY]
client = app[CLIENT_SOCKET_REGISTRY_APPKEY]
assert isinstance(client, RedisResourceRegistry) # nosec
return client
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,46 @@
from collections.abc import Awaitable, Callable
from functools import wraps
from types import ModuleType
from typing import Any
from typing import Any, Final, TypeAlias

from aiohttp import web
from socketio import AsyncServer # type: ignore[import-untyped]

APP_CLIENT_SOCKET_DECORATED_HANDLERS_KEY = f"{__name__}.socketio_handlers"
APP_CLIENT_SOCKET_SERVER_KEY = f"{__name__}.socketio_socketio"
_CLIENT_SOCKET_DECORATED_HANDLERS_APPKEY: Final = web.AppKey(
"CLIENT_SOCKET_DECORATED_HANDLERS", list[Callable]
)
CLIENT_SOCKET_SERVER_APPKEY: Final = web.AppKey(
# NOTE: AsyncServer stub library is missing
"CLIENT_SOCKET_SERVER",
AsyncServer,
)


def get_socket_server(app: web.Application) -> AsyncServer:
return app[APP_CLIENT_SOCKET_SERVER_KEY]
return app[CLIENT_SOCKET_SERVER_APPKEY]


# The socket ID that was assigned to the client
SocketID = str
SocketID: TypeAlias = str

# The environ argument is a dictionary in standard WSGI format containing the request information, including HTTP headers
EnvironDict = dict[str, Any]
EnvironDict: TypeAlias = dict[str, Any]

# Connect event
SocketioConnectEventHandler = Callable[
SocketioConnectEventHandler: TypeAlias = Callable[
[SocketID, EnvironDict, web.Application], Awaitable[None]
]

# Disconnect event
SocketioDisconnectEventHandler = Callable[[SocketID, web.Application], Awaitable[None]]
SocketioDisconnectEventHandler: TypeAlias = Callable[
[SocketID, web.Application], Awaitable[None]
]

# Event
AnyData = Any
SocketioEventHandler = Callable[[SocketID, AnyData, web.Application], Awaitable[None]]
AnyData: TypeAlias = Any
SocketioEventHandler: TypeAlias = Callable[
[SocketID, AnyData, web.Application], Awaitable[None]
]

_socketio_handlers_registry: list[
(
Expand Down Expand Up @@ -76,7 +86,7 @@ def register_socketio_handlers(app: web.Application, module: ModuleType):
partial_fcts = [
_socket_io_handler(app)(func_handler) for func_handler in member_fcts
]
app[APP_CLIENT_SOCKET_DECORATED_HANDLERS_KEY] = partial_fcts
app[_CLIENT_SOCKET_DECORATED_HANDLERS_APPKEY] = partial_fcts

# register the fcts
for func in partial_fcts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ..rabbitmq_settings import get_plugin_settings as get_rabbitmq_settings
from . import _handlers
from ._utils import (
APP_CLIENT_SOCKET_SERVER_KEY,
CLIENT_SOCKET_SERVER_APPKEY,
get_socket_server,
register_socketio_handlers,
)
Expand All @@ -30,7 +30,7 @@ async def _socketio_server_cleanup_ctx(app: web.Application) -> AsyncIterator[No
)
sio_server.attach(app)

app[APP_CLIENT_SOCKET_SERVER_KEY] = sio_server
app[CLIENT_SOCKET_SERVER_APPKEY] = sio_server

register_socketio_handlers(app, _handlers)

Expand Down
Loading