From 4a55849d7aabcc1f6b57afdfbecd697e413ed6d4 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 15:14:39 +0200 Subject: [PATCH 01/22] prune trash --- .../src/servicelib/background_task.py | 13 +++-- .../src/servicelib/background_task_utils.py | 2 +- .../garbage_collector/_tasks_trash.py | 58 ++++++++----------- 3 files changed, 34 insertions(+), 39 deletions(-) diff --git a/packages/service-library/src/servicelib/background_task.py b/packages/service-library/src/servicelib/background_task.py index 508f34b99eec..b2b8488cfaac 100644 --- a/packages/service-library/src/servicelib/background_task.py +++ b/packages/service-library/src/servicelib/background_task.py @@ -38,11 +38,14 @@ def periodic( *, interval: datetime.timedelta, raise_on_error: bool = False, - early_wake_up_event: asyncio.Event | None = None, + early_wake_up_event: ( + asyncio.Event | None + ) = None, # TODO: i would argue that this should be a different decorator instead of an optional arguments since with this event, + # the funciton is not periodic anymore but rahter repeatedly triggered by the event ) -> Callable[ [Callable[P, Coroutine[Any, Any, None]]], Callable[P, Coroutine[Any, Any, None]] ]: - """Calls the function periodically with a given interval. + """Calls the function periodically with a given interval or triggered by an early wake-up event. Arguments: interval -- the interval between calls @@ -58,7 +61,7 @@ def periodic( """ def _decorator( - func: Callable[P, Coroutine[Any, Any, None]], + coro: Callable[P, Coroutine[Any, Any, None]], ) -> Callable[P, Coroutine[Any, Any, None]]: class _InternalTryAgain(TryAgain): # Local exception to prevent reacting to similarTryAgain exceptions raised by the wrapped func @@ -82,10 +85,10 @@ class _InternalTryAgain(TryAgain): ), before_sleep=before_sleep_log(_logger, logging.DEBUG), ) - @functools.wraps(func) + @functools.wraps(coro) async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None: with log_catch(_logger, reraise=True): - await func(*args, **kwargs) + await coro(*args, **kwargs) raise _InternalTryAgain return _wrapper diff --git a/packages/service-library/src/servicelib/background_task_utils.py b/packages/service-library/src/servicelib/background_task_utils.py index 1564107420dc..a166e04d0b6e 100644 --- a/packages/service-library/src/servicelib/background_task_utils.py +++ b/packages/service-library/src/servicelib/background_task_utils.py @@ -43,7 +43,7 @@ def _decorator( # Replicas will raise CouldNotAcquireLockError # SEE https://github.com/ITISFoundation/osparc-simcore/issues/7574 (CouldNotAcquireLockError,), - reason="Multiple instances of the periodic task `{coro.__module__}.{coro.__name__}` are running.", + reason=f"Multiple instances of the periodic task `{coro.__module__}.{coro.__name__}` are running.", ) @exclusive( redis_client, diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py index 46df72c0a708..f3f976f32139 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py @@ -1,17 +1,18 @@ """ - Scheduled tasks addressing users +Scheduled tasks addressing users """ import asyncio import logging from collections.abc import AsyncIterator, Callable +from datetime import timedelta from aiohttp import web +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task_utils import exclusive_periodic from servicelib.logging_utils import log_context -from tenacity import retry -from tenacity.before_sleep import before_sleep_log -from tenacity.wait import wait_exponential +from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk from ..trash import trash_service @@ -20,45 +21,36 @@ CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]] -_PERIODIC_TASK_NAME = f"{__name__}" -_APP_TASK_KEY = f"{_PERIODIC_TASK_NAME}.task" +def create_background_task_to_prune_trash(wait_s: float) -> CleanupContextFunc: + async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: -@retry( - wait=wait_exponential(min=5, max=20), - before_sleep=before_sleep_log(_logger, logging.WARNING), -) -async def _run_task(app: web.Application): - with log_context(_logger, logging.INFO, "Deleting expired trashed items"): - await trash_service.safe_delete_expired_trash_as_admin(app) - - -async def _run_periodically(app: web.Application, wait_interval_s: float): - while True: - await _run_task(app) - await asyncio.sleep(wait_interval_s) - + @exclusive_periodic( + # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently + get_redis_lock_manager_client_sdk(app), + task_interval=timedelta(seconds=wait_s), + retry_after=timedelta(minutes=5), + ) + async def _prune_trash_periodically() -> None: + with log_context(_logger, logging.INFO, "Deleting expired trashed items"): + await trash_service.safe_delete_expired_trash_as_admin(app) -def create_background_task_to_prune_trash( - wait_s: float, task_name: str = _PERIODIC_TASK_NAME -) -> CleanupContextFunc: - async def _cleanup_ctx_fun( - app: web.Application, - ) -> AsyncIterator[None]: # setup + task_name = _prune_trash_periodically.__name__ + task = asyncio.create_task( - _run_periodically(app, wait_s), + _prune_trash_periodically(), name=task_name, ) - app[_APP_TASK_KEY] = task + + # prevents premature garbage collection of the task + app_task_key = f"tasks.{task_name}" + app[app_task_key] = task yield # tear-down - task.cancel() - try: - await task - except asyncio.CancelledError: - assert task.cancelled() # nosec + await cancel_wait_task(task) + app.pop(app_task_key, None) return _cleanup_ctx_fun From aee674a3c70980b252afd8ed87538695f393c830 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 15:32:54 +0200 Subject: [PATCH 02/22] =?UTF-8?q?=F0=9F=8E=A8=20Refactor=20user=20expirati?= =?UTF-8?q?on=20update=20task:=20Implement=20periodic=20updates=20with=20e?= =?UTF-8?q?xclusive=20task=20handling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../garbage_collector/_tasks_users.py | 59 ++++++++----------- 1 file changed, 24 insertions(+), 35 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py index 3b834c71ab71..119aa503423f 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py @@ -6,13 +6,14 @@ import asyncio import logging from collections.abc import AsyncIterator, Callable +from datetime import timedelta from aiohttp import web from models_library.users import UserID +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task_utils import exclusive_periodic from servicelib.logging_utils import get_log_record_extra, log_context -from tenacity import retry -from tenacity.before_sleep import before_sleep_log -from tenacity.wait import wait_exponential +from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk from ..login import login_service from ..security import security_service @@ -23,10 +24,6 @@ CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]] -_PERIODIC_TASK_NAME = f"{__name__}.update_expired_users_periodically" -_APP_TASK_KEY = f"{_PERIODIC_TASK_NAME}.task" - - async def notify_user_logout_all_sessions( app: web.Application, user_id: UserID ) -> None: @@ -49,15 +46,7 @@ async def notify_user_logout_all_sessions( ) -@retry( - wait=wait_exponential(min=5, max=20), - before_sleep=before_sleep_log(_logger, logging.WARNING), - # NOTE: this function does suppresses all exceptions and retry indefinitly -) async def _update_expired_users(app: web.Application): - """ - It is resilient, i.e. if update goes wrong, it waits a bit and retries - """ if updated := await update_expired_users(app): # expired users might be cached in the auth. If so, any request @@ -81,36 +70,36 @@ async def _update_expired_users(app: web.Application): _logger.info("No users expired") -async def _update_expired_users_periodically( - app: web.Application, wait_interval_s: float -): - """Periodically checks expiration dates and updates user status""" +def create_background_task_for_trial_accounts(wait_s: float) -> CleanupContextFunc: - while True: - await _update_expired_users(app) - await asyncio.sleep(wait_interval_s) + async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: + @exclusive_periodic( + # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently + get_redis_lock_manager_client_sdk(app), + task_interval=timedelta(seconds=wait_s), + retry_after=timedelta(minutes=5), + ) + async def _update_expired_users_periodically() -> None: + with log_context(_logger, logging.INFO, "Updating expired users"): + await _update_expired_users(app) -def create_background_task_for_trial_accounts( - wait_s: float, task_name: str = _PERIODIC_TASK_NAME -) -> CleanupContextFunc: - async def _cleanup_ctx_fun( - app: web.Application, - ) -> AsyncIterator[None]: # setup + task_name = _update_expired_users_periodically.__name__ + task = asyncio.create_task( - _update_expired_users_periodically(app, wait_s), + _update_expired_users_periodically(), name=task_name, ) - app[_APP_TASK_KEY] = task + + # prevents premature garbage collection of the task + app_task_key = f"tasks.{task_name}" + app[app_task_key] = task yield # tear-down - task.cancel() - try: - await task - except asyncio.CancelledError: - assert task.cancelled() # nosec + await cancel_wait_task(task) + app.pop(app_task_key, None) return _cleanup_ctx_fun From f59b8319d4c78417a3cbc068634e50499776cd3a Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 15:35:33 +0200 Subject: [PATCH 03/22] =?UTF-8?q?=F0=9F=8E=A8=20Refactor=20API=20key=20pru?= =?UTF-8?q?ning=20task:=20Simplify=20periodic=20execution=20and=20enhance?= =?UTF-8?q?=20logging?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../garbage_collector/_tasks_api_keys.py | 69 +++++++++---------- 1 file changed, 31 insertions(+), 38 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py index b992d25b3876..f13d61d26751 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py @@ -6,71 +6,64 @@ import asyncio import logging from collections.abc import AsyncIterator, Callable +from datetime import timedelta from aiohttp import web -from tenacity import retry -from tenacity.before_sleep import before_sleep_log -from tenacity.wait import wait_exponential +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task_utils import exclusive_periodic +from servicelib.logging_utils import log_context +from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk from ..api_keys import api_keys_service -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]] -_PERIODIC_TASK_NAME = f"{__name__}.prune_expired_api_keys_periodically" -_APP_TASK_KEY = f"{_PERIODIC_TASK_NAME}.task" - - -@retry( - wait=wait_exponential(min=5, max=30), - before_sleep=before_sleep_log(logger, logging.WARNING), -) async def _run_task(app: web.Application): - """Periodically check expiration dates and updates user status - - It is resilient, i.e. if update goes wrong, it waits a bit and retries - """ + """Checks expiration dates and updates user status""" if deleted := await api_keys_service.prune_expired_api_keys(app): # broadcast force logout of user_id for api_key in deleted: - logger.info("API-key %s expired and was removed", f"{api_key=}") + _logger.info("API-key %s expired and was removed", f"{api_key=}") else: - logger.info("No API keys expired") + _logger.info("No API keys expired") -async def _run_periodically(app: web.Application, wait_period_s: float): - """Periodically check expiration dates and updates user status +def create_background_task_to_prune_api_keys( + wait_period_s: float, +) -> CleanupContextFunc: - It is resilient, i.e. if update goes wrong, it waits a bit and retries - """ - while True: - await _run_task(app) - await asyncio.sleep(wait_period_s) + async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: + @exclusive_periodic( + # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently + get_redis_lock_manager_client_sdk(app), + task_interval=timedelta(seconds=wait_period_s), + retry_after=timedelta(minutes=5), + ) + async def _prune_expired_api_keys_periodically() -> None: + with log_context(_logger, logging.INFO, "Pruning expired API keys"): + await _run_task(app) -def create_background_task_to_prune_api_keys( - wait_period_s: float, task_name: str = _PERIODIC_TASK_NAME -) -> CleanupContextFunc: - async def _cleanup_ctx_fun( - app: web.Application, - ) -> AsyncIterator[None]: # setup + task_name = _prune_expired_api_keys_periodically.__name__ + task = asyncio.create_task( - _run_periodically(app, wait_period_s), + _prune_expired_api_keys_periodically(), name=task_name, ) - app[_APP_TASK_KEY] = task + + # prevents premature garbage collection of the task + app_task_key = f"tasks.{task_name}" + app[app_task_key] = task yield # tear-down - task.cancel() - try: - await task - except asyncio.CancelledError: - assert task.cancelled() # nosec + await cancel_wait_task(task) + app.pop(app_task_key, None) return _cleanup_ctx_fun From 655cdd5c363c945fe8e8c591bb67ca614e46014c Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 15:41:57 +0200 Subject: [PATCH 04/22] =?UTF-8?q?=F0=9F=8E=A8=20Refactor=20garbage=20colle?= =?UTF-8?q?ction=20task:=20Update=20to=20use=20new=20background=20task=20c?= =?UTF-8?q?reation=20method?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../garbage_collector/_tasks_core.py | 110 ++++++------------ .../garbage_collector/plugin.py | 2 +- 2 files changed, 34 insertions(+), 78 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py index dfb7237d97f1..abfc8266ce6c 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py @@ -1,4 +1,4 @@ -""" Setup and running of periodic background task +"""Setup and running of periodic background task Specifics of the gc implementation should go into garbage_collector_core.py @@ -6,98 +6,54 @@ import asyncio import logging -from collections.abc import AsyncGenerator +from collections.abc import AsyncIterator, Callable +from datetime import timedelta from aiohttp import web +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task_utils import exclusive_periodic from servicelib.logging_utils import log_context +from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk from ._core import collect_garbage from .settings import GarbageCollectorSettings, get_plugin_settings _logger = logging.getLogger(__name__) +CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]] -_GC_TASK_NAME = f"background-task.{__name__}.collect_garbage_periodically" -_GC_TASK_CONFIG = f"{_GC_TASK_NAME}.config" -_GC_TASK = f"{_GC_TASK_NAME}.task" +def create_background_task_for_garbage_collection() -> CleanupContextFunc: -async def run_background_task(app: web.Application) -> AsyncGenerator: - # SETUP ------ - # create a background task to collect garbage periodically - assert not any( # nosec - t.get_name() == _GC_TASK_NAME for t in asyncio.all_tasks() - ), "Garbage collector task already running. ONLY ONE expected" # nosec + async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: + settings: GarbageCollectorSettings = get_plugin_settings(app) - gc_bg_task = asyncio.create_task( - _collect_garbage_periodically(app), name=_GC_TASK_NAME - ) - # attaches variable to the app's lifetime - app[_GC_TASK] = gc_bg_task + @exclusive_periodic( + # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently + get_redis_lock_manager_client_sdk(app), + task_interval=timedelta(seconds=settings.GARBAGE_COLLECTOR_INTERVAL_S), + retry_after=timedelta(minutes=5), + ) + async def _collect_garbage_periodically() -> None: + with log_context(_logger, logging.INFO, "Garbage collect cycle"): + await collect_garbage(app) - # FIXME: added this config to overcome the state in which the - # task cancelation is ignored and the exceptions enter in a loop - # that never stops the background task. This flag is an additional - # mechanism to enforce stopping the background task - # - # Implemented with a mutable dict to avoid - # DeprecationWarning: Changing state of started or joined application is deprecated - # - app[_GC_TASK_CONFIG] = {"force_stop": False, "name": _GC_TASK_NAME} + # setup + task_name = _collect_garbage_periodically.__name__ - yield + task = asyncio.create_task( + _collect_garbage_periodically(), + name=task_name, + ) - # TEAR-DOWN ----- - # controlled cancelation of the gc task - try: - _logger.info("Stopping garbage collector...") + # prevents premature garbage collection of the task + app_task_key = f"tasks.{task_name}" + app[app_task_key] = task - ack = gc_bg_task.cancel() - assert ack # nosec + yield - app[_GC_TASK_CONFIG]["force_stop"] = True + # tear-down + await cancel_wait_task(task) + app.pop(app_task_key, None) - await gc_bg_task - - except asyncio.CancelledError: - assert gc_bg_task.cancelled() # nosec - - -async def _collect_garbage_periodically(app: web.Application): - settings: GarbageCollectorSettings = get_plugin_settings(app) - interval = settings.GARBAGE_COLLECTOR_INTERVAL_S - - while True: - try: - while True: - with log_context(_logger, logging.INFO, "Garbage collect cycle"): - await collect_garbage(app) - - if app[_GC_TASK_CONFIG].get("force_stop", False): - msg = "Forced to stop garbage collection" - raise RuntimeError(msg) - - _logger.info("Garbage collect cycle pauses %ss", interval) - await asyncio.sleep(interval) - - except asyncio.CancelledError: # EXIT # noqa: PERF203 - _logger.info( - "Stopped: Garbage collection task was cancelled, it will not restart!" - ) - # do not catch Cancellation errors - raise - - except Exception: # RESILIENT restart # pylint: disable=broad-except - _logger.warning( - "Stopped: There was an error during garbage collection, restarting...", - exc_info=True, - ) - - if app[_GC_TASK_CONFIG].get("force_stop", False): - _logger.warning("Forced to stop garbage collection") - break - - # will wait 5 seconds to recover before restarting to avoid restart loops - # - it might be that db/redis is down, etc - # - await asyncio.sleep(5) + return _cleanup_ctx_fun diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py b/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py index 3c42457ece55..e948e9ea229b 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py @@ -34,7 +34,7 @@ def setup_garbage_collector(app: web.Application) -> None: settings = get_plugin_settings(app) - app.cleanup_ctx.append(_tasks_core.run_background_task) + app.cleanup_ctx.append(_tasks_core.create_background_task_for_garbage_collection()) set_parent_module_log_level( _logger.name, min(logging.INFO, get_application_settings(app).log_level) From 2f16b478a9f0e091bceb6c321332e51174b85394 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 15:52:04 +0200 Subject: [PATCH 05/22] =?UTF-8?q?=F0=9F=8E=A8=20Refactor=20background=20ta?= =?UTF-8?q?sk=20management:=20Introduce=20setup=5Fperiodic=5Ftask=20utilit?= =?UTF-8?q?y=20for=20cleaner=20task=20setup=20and=20teardown?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../garbage_collector/_tasks_api_keys.py | 31 +++----------- .../garbage_collector/_tasks_core.py | 26 ++---------- .../garbage_collector/_tasks_trash.py | 26 ++---------- .../garbage_collector/_tasks_users.py | 22 ++-------- .../garbage_collector/_tasks_utils.py | 41 +++++++++++++++++++ 5 files changed, 58 insertions(+), 88 deletions(-) create mode 100644 services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py index f13d61d26751..993dd037a89a 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py @@ -3,26 +3,22 @@ """ -import asyncio import logging -from collections.abc import AsyncIterator, Callable +from collections.abc import AsyncIterator from datetime import timedelta from aiohttp import web -from servicelib.async_utils import cancel_wait_task from servicelib.background_task_utils import exclusive_periodic from servicelib.logging_utils import log_context from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk from ..api_keys import api_keys_service +from ._tasks_utils import CleanupContextFunc, setup_periodic_task _logger = logging.getLogger(__name__) -CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]] - -async def _run_task(app: web.Application): - """Checks expiration dates and updates user status""" +async def _prune_expired_api_keys(app: web.Application): if deleted := await api_keys_service.prune_expired_api_keys(app): # broadcast force logout of user_id for api_key in deleted: @@ -46,24 +42,9 @@ async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: ) async def _prune_expired_api_keys_periodically() -> None: with log_context(_logger, logging.INFO, "Pruning expired API keys"): - await _run_task(app) - - # setup - task_name = _prune_expired_api_keys_periodically.__name__ - - task = asyncio.create_task( - _prune_expired_api_keys_periodically(), - name=task_name, - ) - - # prevents premature garbage collection of the task - app_task_key = f"tasks.{task_name}" - app[app_task_key] = task - - yield + await _prune_expired_api_keys(app) - # tear-down - await cancel_wait_task(task) - app.pop(app_task_key, None) + async for _ in setup_periodic_task(app, _prune_expired_api_keys_periodically): + yield return _cleanup_ctx_fun diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py index abfc8266ce6c..8216ae195bfa 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py @@ -4,24 +4,21 @@ Specifics of the gc implementation should go into garbage_collector_core.py """ -import asyncio import logging -from collections.abc import AsyncIterator, Callable +from collections.abc import AsyncIterator from datetime import timedelta from aiohttp import web -from servicelib.async_utils import cancel_wait_task from servicelib.background_task_utils import exclusive_periodic from servicelib.logging_utils import log_context from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk from ._core import collect_garbage +from ._tasks_utils import CleanupContextFunc, setup_periodic_task from .settings import GarbageCollectorSettings, get_plugin_settings _logger = logging.getLogger(__name__) -CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]] - def create_background_task_for_garbage_collection() -> CleanupContextFunc: @@ -38,22 +35,7 @@ async def _collect_garbage_periodically() -> None: with log_context(_logger, logging.INFO, "Garbage collect cycle"): await collect_garbage(app) - # setup - task_name = _collect_garbage_periodically.__name__ - - task = asyncio.create_task( - _collect_garbage_periodically(), - name=task_name, - ) - - # prevents premature garbage collection of the task - app_task_key = f"tasks.{task_name}" - app[app_task_key] = task - - yield - - # tear-down - await cancel_wait_task(task) - app.pop(app_task_key, None) + async for _ in setup_periodic_task(app, _collect_garbage_periodically): + yield return _cleanup_ctx_fun diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py index f3f976f32139..5cada15dd7b3 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py @@ -3,23 +3,20 @@ """ -import asyncio import logging -from collections.abc import AsyncIterator, Callable +from collections.abc import AsyncIterator from datetime import timedelta from aiohttp import web -from servicelib.async_utils import cancel_wait_task from servicelib.background_task_utils import exclusive_periodic from servicelib.logging_utils import log_context from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk from ..trash import trash_service +from ._tasks_utils import CleanupContextFunc, setup_periodic_task _logger = logging.getLogger(__name__) -CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]] - def create_background_task_to_prune_trash(wait_s: float) -> CleanupContextFunc: @@ -35,22 +32,7 @@ async def _prune_trash_periodically() -> None: with log_context(_logger, logging.INFO, "Deleting expired trashed items"): await trash_service.safe_delete_expired_trash_as_admin(app) - # setup - task_name = _prune_trash_periodically.__name__ - - task = asyncio.create_task( - _prune_trash_periodically(), - name=task_name, - ) - - # prevents premature garbage collection of the task - app_task_key = f"tasks.{task_name}" - app[app_task_key] = task - - yield - - # tear-down - await cancel_wait_task(task) - app.pop(app_task_key, None) + async for _ in setup_periodic_task(app, _prune_trash_periodically): + yield return _cleanup_ctx_fun diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py index 119aa503423f..97ad1dc54f0b 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py @@ -3,14 +3,12 @@ """ -import asyncio import logging from collections.abc import AsyncIterator, Callable from datetime import timedelta from aiohttp import web from models_library.users import UserID -from servicelib.async_utils import cancel_wait_task from servicelib.background_task_utils import exclusive_periodic from servicelib.logging_utils import get_log_record_extra, log_context from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk @@ -18,6 +16,7 @@ from ..login import login_service from ..security import security_service from ..users.api import update_expired_users +from ._tasks_utils import CleanupContextFunc, setup_periodic_task _logger = logging.getLogger(__name__) @@ -84,22 +83,7 @@ async def _update_expired_users_periodically() -> None: with log_context(_logger, logging.INFO, "Updating expired users"): await _update_expired_users(app) - # setup - task_name = _update_expired_users_periodically.__name__ - - task = asyncio.create_task( - _update_expired_users_periodically(), - name=task_name, - ) - - # prevents premature garbage collection of the task - app_task_key = f"tasks.{task_name}" - app[app_task_key] = task - - yield - - # tear-down - await cancel_wait_task(task) - app.pop(app_task_key, None) + async for _ in setup_periodic_task(app, _update_expired_users_periodically): + yield return _cleanup_ctx_fun diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py new file mode 100644 index 000000000000..cd4af40cf3b7 --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py @@ -0,0 +1,41 @@ +""" +Common utilities for background task management in garbage collector +""" + +import asyncio +from collections.abc import AsyncIterator, Callable, Coroutine + +from aiohttp import web +from servicelib.async_utils import cancel_wait_task + +CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]] + + +async def setup_periodic_task( + app: web.Application, + periodic_task_coro: Callable[[], Coroutine[None, None, None]], +) -> AsyncIterator[None]: + """ + Generic setup and teardown for periodic background tasks. + + Args: + app: The aiohttp web application + periodic_task_coro: The periodic task coroutine function (already decorated with @exclusive_periodic) + """ + # setup + task_name = periodic_task_coro.__name__ + + task = asyncio.create_task( + periodic_task_coro(), + name=task_name, + ) + + # prevents premature garbage collection of the task + app_task_key = f"tasks.{task_name}" + app[app_task_key] = task + + yield + + # tear-down + await cancel_wait_task(task) + app.pop(app_task_key, None) From 29a6028bc85ecba9ffd076976f6102f8e2ba9dba Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 16:08:27 +0200 Subject: [PATCH 06/22] cleanup --- .../garbage_collector/_tasks_users.py | 4 +--- .../garbage_collector/_tasks_utils.py | 6 +++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py index 97ad1dc54f0b..f59e42bf934f 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py @@ -4,7 +4,7 @@ """ import logging -from collections.abc import AsyncIterator, Callable +from collections.abc import AsyncIterator from datetime import timedelta from aiohttp import web @@ -20,8 +20,6 @@ _logger = logging.getLogger(__name__) -CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]] - async def notify_user_logout_all_sessions( app: web.Application, user_id: UserID diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py index cd4af40cf3b7..6d24b2b337a3 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py @@ -23,7 +23,7 @@ async def setup_periodic_task( periodic_task_coro: The periodic task coroutine function (already decorated with @exclusive_periodic) """ # setup - task_name = periodic_task_coro.__name__ + task_name = f"{periodic_task_coro.__module__}.{periodic_task_coro.__name__}" task = asyncio.create_task( periodic_task_coro(), @@ -32,6 +32,10 @@ async def setup_periodic_task( # prevents premature garbage collection of the task app_task_key = f"tasks.{task_name}" + if app_task_key in app: + msg = f"Task {task_name} is already registered in the app state" + raise ValueError(msg) + app[app_task_key] = task yield From 700f0b8c34caee20689a35f06363f0082f1a85f2 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 16:11:11 +0200 Subject: [PATCH 07/22] add setup --- .../src/simcore_service_webserver/garbage_collector/plugin.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py b/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py index e948e9ea229b..651fb0bc2648 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py @@ -3,6 +3,7 @@ from aiohttp import web from servicelib.aiohttp.application_setup import ModuleCategory, app_module_setup from servicelib.logging_utils import set_parent_module_log_level +from simcore_service_webserver.redis import setup_redis from ..application_settings import get_application_settings from ..login.plugin import setup_login_storage @@ -32,6 +33,8 @@ def setup_garbage_collector(app: web.Application) -> None: # - project needs access to user-api that is connected to login plugin setup_login_storage(app) + setup_redis(app) + settings = get_plugin_settings(app) app.cleanup_ctx.append(_tasks_core.create_background_task_for_garbage_collection()) From 6071dcd41fb82cf80f586d9ac58b417fabfd8251 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 16:14:58 +0200 Subject: [PATCH 08/22] inti --- .../simcore_service_webserver/garbage_collector/plugin.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py b/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py index 651fb0bc2648..325f15d9964c 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py @@ -3,12 +3,12 @@ from aiohttp import web from servicelib.aiohttp.application_setup import ModuleCategory, app_module_setup from servicelib.logging_utils import set_parent_module_log_level -from simcore_service_webserver.redis import setup_redis from ..application_settings import get_application_settings from ..login.plugin import setup_login_storage from ..products.plugin import setup_products from ..projects._projects_repository_legacy import setup_projects_db +from ..redis import setup_redis from ..socketio.plugin import setup_socketio from . import _tasks_api_keys, _tasks_core, _tasks_trash, _tasks_users from .settings import get_plugin_settings @@ -23,18 +23,20 @@ logger=_logger, ) def setup_garbage_collector(app: web.Application) -> None: + # distributed exclusive periodic tasks + setup_redis(app) + # for trashing setup_products(app) # - project-api needs access to db setup_projects_db(app) + # - project needs access to socketio via notify_project_state_update setup_socketio(app) # - project needs access to user-api that is connected to login plugin setup_login_storage(app) - setup_redis(app) - settings = get_plugin_settings(app) app.cleanup_ctx.append(_tasks_core.create_background_task_for_garbage_collection()) From a51232375b03a03d74231fe9113e3a83e16b28e3 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 16:16:56 +0200 Subject: [PATCH 09/22] equal periodic and retry --- .../garbage_collector/_tasks_api_keys.py | 5 +++-- .../garbage_collector/_tasks_core.py | 5 +++-- .../garbage_collector/_tasks_trash.py | 5 +++-- .../garbage_collector/_tasks_users.py | 5 +++-- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py index 993dd037a89a..6351eda4fcb2 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py @@ -33,12 +33,13 @@ def create_background_task_to_prune_api_keys( ) -> CleanupContextFunc: async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: + interval = timedelta(seconds=wait_period_s) @exclusive_periodic( # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently get_redis_lock_manager_client_sdk(app), - task_interval=timedelta(seconds=wait_period_s), - retry_after=timedelta(minutes=5), + task_interval=interval, + retry_after=interval, ) async def _prune_expired_api_keys_periodically() -> None: with log_context(_logger, logging.INFO, "Pruning expired API keys"): diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py index 8216ae195bfa..e1c0c9af3efc 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py @@ -24,12 +24,13 @@ def create_background_task_for_garbage_collection() -> CleanupContextFunc: async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: settings: GarbageCollectorSettings = get_plugin_settings(app) + interval = timedelta(seconds=settings.GARBAGE_COLLECTOR_INTERVAL_S) @exclusive_periodic( # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently get_redis_lock_manager_client_sdk(app), - task_interval=timedelta(seconds=settings.GARBAGE_COLLECTOR_INTERVAL_S), - retry_after=timedelta(minutes=5), + task_interval=interval, + retry_after=interval, ) async def _collect_garbage_periodically() -> None: with log_context(_logger, logging.INFO, "Garbage collect cycle"): diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py index 5cada15dd7b3..dce47241ca8b 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py @@ -21,12 +21,13 @@ def create_background_task_to_prune_trash(wait_s: float) -> CleanupContextFunc: async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: + interval = timedelta(seconds=wait_s) @exclusive_periodic( # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently get_redis_lock_manager_client_sdk(app), - task_interval=timedelta(seconds=wait_s), - retry_after=timedelta(minutes=5), + task_interval=interval, + retry_after=interval, ) async def _prune_trash_periodically() -> None: with log_context(_logger, logging.INFO, "Deleting expired trashed items"): diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py index f59e42bf934f..269b072ba5fd 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py @@ -70,12 +70,13 @@ async def _update_expired_users(app: web.Application): def create_background_task_for_trial_accounts(wait_s: float) -> CleanupContextFunc: async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: + interval = timedelta(seconds=wait_s) @exclusive_periodic( # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently get_redis_lock_manager_client_sdk(app), - task_interval=timedelta(seconds=wait_s), - retry_after=timedelta(minutes=5), + task_interval=interval, + retry_after=interval, ) async def _update_expired_users_periodically() -> None: with log_context(_logger, logging.INFO, "Updating expired users"): From 45b470962a85b547d1eaf5aca5d3ffca3c37b5f5 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 16:46:43 +0200 Subject: [PATCH 10/22] fixes pylint --- .../garbage_collector/_tasks_core.py | 6 +++++- .../garbage_collector/_tasks_utils.py | 14 ++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py index e1c0c9af3efc..d9c17db2a531 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py @@ -19,6 +19,8 @@ _logger = logging.getLogger(__name__) +_GC_TASK_NAME = f"{__name__}._collect_garbage_periodically" + def create_background_task_for_garbage_collection() -> CleanupContextFunc: @@ -36,7 +38,9 @@ async def _collect_garbage_periodically() -> None: with log_context(_logger, logging.INFO, "Garbage collect cycle"): await collect_garbage(app) - async for _ in setup_periodic_task(app, _collect_garbage_periodically): + async for _ in setup_periodic_task( + app, _collect_garbage_periodically, task_name=_GC_TASK_NAME + ): yield return _cleanup_ctx_fun diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py index 6d24b2b337a3..857b8e644f8c 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py @@ -11,9 +11,19 @@ CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]] +def create_task_name(coro: Callable) -> str: + """ + Returns a unique name for the task based on its module and function name. + This is useful for logging and debugging purposes. + """ + return f"{coro.__module__}.{coro.__name__}" + + async def setup_periodic_task( app: web.Application, periodic_task_coro: Callable[[], Coroutine[None, None, None]], + *, + task_name: str | None = None, ) -> AsyncIterator[None]: """ Generic setup and teardown for periodic background tasks. @@ -23,7 +33,7 @@ async def setup_periodic_task( periodic_task_coro: The periodic task coroutine function (already decorated with @exclusive_periodic) """ # setup - task_name = f"{periodic_task_coro.__module__}.{periodic_task_coro.__name__}" + task_name = task_name or create_task_name(periodic_task_coro) task = asyncio.create_task( periodic_task_coro(), @@ -31,7 +41,7 @@ async def setup_periodic_task( ) # prevents premature garbage collection of the task - app_task_key = f"tasks.{task_name}" + app_task_key = f"gc-tasks/{task_name}" if app_task_key in app: msg = f"Task {task_name} is already registered in the app state" raise ValueError(msg) From 2fcd20087bebdfdb5f795af3ae3ea1dedfa4eef4 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 17:05:40 +0200 Subject: [PATCH 11/22] fixes test --- .../web/server/tests/integration/01/test_garbage_collection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/web/server/tests/integration/01/test_garbage_collection.py b/services/web/server/tests/integration/01/test_garbage_collection.py index 0618647f01c4..d7e6c6059373 100644 --- a/services/web/server/tests/integration/01/test_garbage_collection.py +++ b/services/web/server/tests/integration/01/test_garbage_collection.py @@ -198,7 +198,7 @@ async def _fake_background_task(app: web.Application): await asyncio.sleep(0.1) return mocker.patch( - "simcore_service_webserver.garbage_collector.plugin._tasks_core.run_background_task", + "simcore_service_webserver.garbage_collector.plugin._tasks_core.create_background_task_for_garbage_collection", side_effect=_fake_background_task, ) From 7d0a30790a7002a26daa3b9ddd70b344d3f1cb63 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 17:41:46 +0200 Subject: [PATCH 12/22] minor --- .../simcore_service_webserver/garbage_collector/plugin.py | 6 +++--- .../server/tests/integration/01/test_garbage_collection.py | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py b/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py index 325f15d9964c..aa5ef38fdc1a 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py @@ -23,12 +23,12 @@ logger=_logger, ) def setup_garbage_collector(app: web.Application) -> None: - # distributed exclusive periodic tasks - setup_redis(app) - # for trashing setup_products(app) + # distributed exclusive periodic tasks + setup_redis(app) + # - project-api needs access to db setup_projects_db(app) diff --git a/services/web/server/tests/integration/01/test_garbage_collection.py b/services/web/server/tests/integration/01/test_garbage_collection.py index d7e6c6059373..b840385d85ab 100644 --- a/services/web/server/tests/integration/01/test_garbage_collection.py +++ b/services/web/server/tests/integration/01/test_garbage_collection.py @@ -177,7 +177,9 @@ async def client( setup_socketio(app) setup_projects(app) setup_director_v2(app) + assert setup_resource_manager(app) + setup_garbage_collector(app) return await aiohttp_client( From d20f8f79df1adb6e88877a7720ebff3aacb690ca Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 18:12:41 +0200 Subject: [PATCH 13/22] tuning --- .../integration/01/test_garbage_collection.py | 30 +++++++++---------- .../isolated/test_garbage_collector_core.py | 2 +- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/services/web/server/tests/integration/01/test_garbage_collection.py b/services/web/server/tests/integration/01/test_garbage_collection.py index b840385d85ab..3b9344aec6bc 100644 --- a/services/web/server/tests/integration/01/test_garbage_collection.py +++ b/services/web/server/tests/integration/01/test_garbage_collection.py @@ -27,6 +27,7 @@ from pytest_mock import MockerFixture from pytest_simcore.helpers.webserver_login import UserInfoDict, log_client_in from pytest_simcore.helpers.webserver_projects import create_project, empty_project_data +from servicelib.aiohttp import status from servicelib.aiohttp.application import create_safe_application from settings_library.rabbit import RabbitSettings from settings_library.redis import RedisDatabase, RedisSettings @@ -63,15 +64,16 @@ log = logging.getLogger(__name__) pytest_simcore_core_services_selection = [ - "migration", + "migration", # NOTE: rebuild! "postgres", "rabbit", "redis", - "storage", + "storage", # NOTE: rebuild! ] pytest_simcore_ops_services_selection = [ "minio", "adminer", + "redis-commander", ] @@ -98,11 +100,6 @@ async def _delete_all_redis_keys(redis_settings: RedisSettings): await client.aclose(close_connection_pool=True) -@pytest.fixture(scope="session") -def osparc_product_name() -> str: - return "osparc" - - @pytest.fixture async def director_v2_service_mock( mocker: MockerFixture, @@ -130,7 +127,7 @@ async def director_v2_service_mock( with aioresponses(passthrough=PASSTHROUGH_REQUESTS_PREFIXES) as mock: mock.get( get_computation_pattern, - status=202, + status=status.HTTP_202_ACCEPTED, payload={"state": str(RunningState.NOT_STARTED.value)}, repeat=True, ) @@ -192,16 +189,19 @@ async def client( def disable_garbage_collector_task(mocker: MockerFixture) -> mock.MagicMock: """patch the setup of the garbage collector so we can call it manually""" - async def _fake_background_task(app: web.Application): - # startup - await asyncio.sleep(0.1) - yield - # teardown - await asyncio.sleep(0.1) + def _fake_factory(): + async def _cleanup_ctx_fun(app: web.Application): + # startup + await asyncio.sleep(0.1) + yield + # teardown + await asyncio.sleep(0.1) + + return _cleanup_ctx_fun return mocker.patch( "simcore_service_webserver.garbage_collector.plugin._tasks_core.create_background_task_for_garbage_collection", - side_effect=_fake_background_task, + side_effect=_fake_factory, ) diff --git a/services/web/server/tests/unit/isolated/test_garbage_collector_core.py b/services/web/server/tests/unit/isolated/test_garbage_collector_core.py index b944b0d93c12..a6722b3dafc8 100644 --- a/services/web/server/tests/unit/isolated/test_garbage_collector_core.py +++ b/services/web/server/tests/unit/isolated/test_garbage_collector_core.py @@ -124,7 +124,7 @@ async def test_remove_orphaned_services_with_no_running_services_does_nothing( def faker_dynamic_service_get() -> Callable[[], DynamicServiceGet]: def _() -> DynamicServiceGet: return DynamicServiceGet.model_validate( - DynamicServiceGet.model_config["json_schema_extra"]["examples"][1] + DynamicServiceGet.model_json_schema()["examples"][1] ) return _ From 204d3df2addff998a5515e510e4234e06ad0dae5 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 19:03:30 +0200 Subject: [PATCH 14/22] tuning retry --- .../garbage_collector/_tasks_api_keys.py | 2 +- .../simcore_service_webserver/garbage_collector/_tasks_core.py | 2 +- .../simcore_service_webserver/garbage_collector/_tasks_trash.py | 2 +- .../simcore_service_webserver/garbage_collector/_tasks_users.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py index 6351eda4fcb2..b69f2ec407c2 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py @@ -39,7 +39,7 @@ async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently get_redis_lock_manager_client_sdk(app), task_interval=interval, - retry_after=interval, + retry_after=min(timedelta(seconds=10), interval / 10), ) async def _prune_expired_api_keys_periodically() -> None: with log_context(_logger, logging.INFO, "Pruning expired API keys"): diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py index d9c17db2a531..9790a239369b 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py @@ -32,7 +32,7 @@ async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently get_redis_lock_manager_client_sdk(app), task_interval=interval, - retry_after=interval, + retry_after=min(timedelta(seconds=10), interval / 10), ) async def _collect_garbage_periodically() -> None: with log_context(_logger, logging.INFO, "Garbage collect cycle"): diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py index dce47241ca8b..bf70a598c2f0 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py @@ -27,7 +27,7 @@ async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently get_redis_lock_manager_client_sdk(app), task_interval=interval, - retry_after=interval, + retry_after=min(timedelta(seconds=10), interval / 10), ) async def _prune_trash_periodically() -> None: with log_context(_logger, logging.INFO, "Deleting expired trashed items"): diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py index 269b072ba5fd..8f9f70ee163b 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py @@ -76,7 +76,7 @@ async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently get_redis_lock_manager_client_sdk(app), task_interval=interval, - retry_after=interval, + retry_after=min(timedelta(seconds=10), interval / 10), ) async def _update_expired_users_periodically() -> None: with log_context(_logger, logging.INFO, "Updating expired users"): From e034603e9d6e02eda366b56e931616c1818134fa Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 20:35:10 +0200 Subject: [PATCH 15/22] update --- .../garbage_collector/settings.py | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/settings.py b/services/web/server/src/simcore_service_webserver/garbage_collector/settings.py index 46863d458640..0f72f1dd2cbd 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/settings.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/settings.py @@ -1,3 +1,5 @@ +from typing import Annotated + from aiohttp import web from pydantic import Field, PositiveInt from servicelib.aiohttp.application_keys import APP_SETTINGS_KEY @@ -13,20 +15,28 @@ class GarbageCollectorSettings(BaseCustomSettings): - GARBAGE_COLLECTOR_INTERVAL_S: PositiveInt = Field( - 30 * _SEC, - description="Waiting time between consecutive runs of the garbage-colector", + GARBAGE_COLLECTOR_INTERVAL_S: Annotated[ + PositiveInt, + Field( + description="Waiting time between consecutive runs of the garbage-colector" + ), + ] = ( + 30 * _SEC ) - GARBAGE_COLLECTOR_EXPIRED_USERS_CHECK_INTERVAL_S: PositiveInt = Field( - 1 * _HOUR, - description="Time period between checks of expiration dates for trial users", + GARBAGE_COLLECTOR_EXPIRED_USERS_CHECK_INTERVAL_S: Annotated[ + PositiveInt, + Field( + description="Time period between checks of expiration dates for trial users" + ), + ] = ( + 1 * _HOUR ) - GARBAGE_COLLECTOR_PRUNE_APIKEYS_INTERVAL_S: PositiveInt = Field( - _HOUR, - description="Wait time between periodic pruning of expired API keys", - ) + GARBAGE_COLLECTOR_PRUNE_APIKEYS_INTERVAL_S: Annotated[ + PositiveInt, + Field(description="Wait time between periodic pruning of expired API keys"), + ] = _HOUR def get_plugin_settings(app: web.Application) -> GarbageCollectorSettings: From de5273884c8dab962d5727ef566b00f49c40986b Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 20:42:10 +0200 Subject: [PATCH 16/22] rm unused --- .../server/tests/integration/01/test_garbage_collection.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/services/web/server/tests/integration/01/test_garbage_collection.py b/services/web/server/tests/integration/01/test_garbage_collection.py index 3b9344aec6bc..5ce943e3b554 100644 --- a/services/web/server/tests/integration/01/test_garbage_collection.py +++ b/services/web/server/tests/integration/01/test_garbage_collection.py @@ -72,8 +72,9 @@ ] pytest_simcore_ops_services_selection = [ "minio", - "adminer", - "redis-commander", + # Only for development purposes + # "adminer", + # "redis-commander", ] From 97eba627f7b6648a274928a605f014e9fbc62537 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 24 Jun 2025 21:02:06 +0200 Subject: [PATCH 17/22] rm todo --- packages/service-library/src/servicelib/background_task.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/service-library/src/servicelib/background_task.py b/packages/service-library/src/servicelib/background_task.py index b2b8488cfaac..4670ad7ddd89 100644 --- a/packages/service-library/src/servicelib/background_task.py +++ b/packages/service-library/src/servicelib/background_task.py @@ -38,10 +38,7 @@ def periodic( *, interval: datetime.timedelta, raise_on_error: bool = False, - early_wake_up_event: ( - asyncio.Event | None - ) = None, # TODO: i would argue that this should be a different decorator instead of an optional arguments since with this event, - # the funciton is not periodic anymore but rahter repeatedly triggered by the event + early_wake_up_event: asyncio.Event | None = None, ) -> Callable[ [Callable[P, Coroutine[Any, Any, None]]], Callable[P, Coroutine[Any, Any, None]] ]: From 9d02f48b488218012bb86db34a4ab94857f28ba5 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Wed, 25 Jun 2025 08:55:43 +0200 Subject: [PATCH 18/22] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Refactor=20periodic?= =?UTF-8?q?=20task=20setup=20to=20use=20`periodic=5Ftask=5Flifespan`=20uti?= =?UTF-8?q?lity?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../garbage_collector/_tasks_api_keys.py | 6 ++++-- .../garbage_collector/_tasks_core.py | 4 ++-- .../garbage_collector/_tasks_trash.py | 4 ++-- .../garbage_collector/_tasks_users.py | 4 ++-- .../garbage_collector/_tasks_utils.py | 4 ++-- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py index b69f2ec407c2..6cb27316b0ff 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py @@ -13,7 +13,7 @@ from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk from ..api_keys import api_keys_service -from ._tasks_utils import CleanupContextFunc, setup_periodic_task +from ._tasks_utils import CleanupContextFunc, periodic_task_lifespan _logger = logging.getLogger(__name__) @@ -45,7 +45,9 @@ async def _prune_expired_api_keys_periodically() -> None: with log_context(_logger, logging.INFO, "Pruning expired API keys"): await _prune_expired_api_keys(app) - async for _ in setup_periodic_task(app, _prune_expired_api_keys_periodically): + async for _ in periodic_task_lifespan( + app, _prune_expired_api_keys_periodically + ): yield return _cleanup_ctx_fun diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py index 9790a239369b..7097af49d9f0 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py @@ -14,7 +14,7 @@ from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk from ._core import collect_garbage -from ._tasks_utils import CleanupContextFunc, setup_periodic_task +from ._tasks_utils import CleanupContextFunc, periodic_task_lifespan from .settings import GarbageCollectorSettings, get_plugin_settings _logger = logging.getLogger(__name__) @@ -38,7 +38,7 @@ async def _collect_garbage_periodically() -> None: with log_context(_logger, logging.INFO, "Garbage collect cycle"): await collect_garbage(app) - async for _ in setup_periodic_task( + async for _ in periodic_task_lifespan( app, _collect_garbage_periodically, task_name=_GC_TASK_NAME ): yield diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py index bf70a598c2f0..48b3a16e3281 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py @@ -13,7 +13,7 @@ from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk from ..trash import trash_service -from ._tasks_utils import CleanupContextFunc, setup_periodic_task +from ._tasks_utils import CleanupContextFunc, periodic_task_lifespan _logger = logging.getLogger(__name__) @@ -33,7 +33,7 @@ async def _prune_trash_periodically() -> None: with log_context(_logger, logging.INFO, "Deleting expired trashed items"): await trash_service.safe_delete_expired_trash_as_admin(app) - async for _ in setup_periodic_task(app, _prune_trash_periodically): + async for _ in periodic_task_lifespan(app, _prune_trash_periodically): yield return _cleanup_ctx_fun diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py index 8f9f70ee163b..95d5e5e3a477 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py @@ -16,7 +16,7 @@ from ..login import login_service from ..security import security_service from ..users.api import update_expired_users -from ._tasks_utils import CleanupContextFunc, setup_periodic_task +from ._tasks_utils import CleanupContextFunc, periodic_task_lifespan _logger = logging.getLogger(__name__) @@ -82,7 +82,7 @@ async def _update_expired_users_periodically() -> None: with log_context(_logger, logging.INFO, "Updating expired users"): await _update_expired_users(app) - async for _ in setup_periodic_task(app, _update_expired_users_periodically): + async for _ in periodic_task_lifespan(app, _update_expired_users_periodically): yield return _cleanup_ctx_fun diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py index 857b8e644f8c..7417b748904c 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py @@ -6,7 +6,7 @@ from collections.abc import AsyncIterator, Callable, Coroutine from aiohttp import web -from servicelib.async_utils import cancel_wait_task +from common_library.async_tools import cancel_wait_task CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]] @@ -19,7 +19,7 @@ def create_task_name(coro: Callable) -> str: return f"{coro.__module__}.{coro.__name__}" -async def setup_periodic_task( +async def periodic_task_lifespan( app: web.Application, periodic_task_coro: Callable[[], Coroutine[None, None, None]], *, From 1f430f7305d4ddbf24775c8826bf1a2c66509b51 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Thu, 26 Jun 2025 09:21:51 +0200 Subject: [PATCH 19/22] @sanderegg review: fun is not coro --- packages/service-library/src/servicelib/background_task.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/service-library/src/servicelib/background_task.py b/packages/service-library/src/servicelib/background_task.py index 99c7c7e91cd0..a283c78f6060 100644 --- a/packages/service-library/src/servicelib/background_task.py +++ b/packages/service-library/src/servicelib/background_task.py @@ -58,7 +58,7 @@ def periodic( """ def _decorator( - coro: Callable[P, Coroutine[Any, Any, None]], + async_fun: Callable[P, Coroutine[Any, Any, None]], ) -> Callable[P, Coroutine[Any, Any, None]]: class _InternalTryAgain(TryAgain): # Local exception to prevent reacting to similarTryAgain exceptions raised by the wrapped func @@ -82,10 +82,10 @@ class _InternalTryAgain(TryAgain): ), before_sleep=before_sleep_log(_logger, logging.DEBUG), ) - @functools.wraps(coro) + @functools.wraps(async_fun) async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None: with log_catch(_logger, reraise=True): - await coro(*args, **kwargs) + await async_fun(*args, **kwargs) raise _InternalTryAgain return _wrapper From ad11d6a76316360def17b225655e9b9242d7db70 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Thu, 26 Jun 2025 09:22:40 +0200 Subject: [PATCH 20/22] @sanderegg review: ops services --- .../server/tests/integration/01/test_garbage_collection.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/services/web/server/tests/integration/01/test_garbage_collection.py b/services/web/server/tests/integration/01/test_garbage_collection.py index 5ce943e3b554..3b9344aec6bc 100644 --- a/services/web/server/tests/integration/01/test_garbage_collection.py +++ b/services/web/server/tests/integration/01/test_garbage_collection.py @@ -72,9 +72,8 @@ ] pytest_simcore_ops_services_selection = [ "minio", - # Only for development purposes - # "adminer", - # "redis-commander", + "adminer", + "redis-commander", ] From 04f77e7998bfe353d61b587e1e767d7594e49bc3 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Thu, 26 Jun 2025 09:24:25 +0200 Subject: [PATCH 21/22] @sanderegg review:doc --- .../simcore_service_webserver/garbage_collector/_tasks_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py index 7417b748904c..52c6382c1250 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py @@ -40,7 +40,7 @@ async def periodic_task_lifespan( name=task_name, ) - # prevents premature garbage collection of the task + # Keeping a reference in app's state to prevent premature garbage collection of the task app_task_key = f"gc-tasks/{task_name}" if app_task_key in app: msg = f"Task {task_name} is already registered in the app state" From 01f08e36f743301e15084ca62b2813b602915ffb Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Thu, 26 Jun 2025 10:12:27 +0200 Subject: [PATCH 22/22] @bisgaard-itis review: check if decorated --- .../src/servicelib/background_task_utils.py | 2 ++ .../garbage_collector/_tasks_utils.py | 10 ++++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/service-library/src/servicelib/background_task_utils.py b/packages/service-library/src/servicelib/background_task_utils.py index a166e04d0b6e..45119649c269 100644 --- a/packages/service-library/src/servicelib/background_task_utils.py +++ b/packages/service-library/src/servicelib/background_task_utils.py @@ -54,6 +54,8 @@ def _decorator( async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None: return await coro(*args, **kwargs) + # Marks with an identifier (mostly to assert a function has been decorated with this decorator) + setattr(_wrapper, "__exclusive_periodic__", True) # noqa: B010 return _wrapper return _decorator diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py index 52c6382c1250..4971389a73f5 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_utils.py @@ -21,7 +21,7 @@ def create_task_name(coro: Callable) -> str: async def periodic_task_lifespan( app: web.Application, - periodic_task_coro: Callable[[], Coroutine[None, None, None]], + periodic_async_func: Callable[[], Coroutine[None, None, None]], *, task_name: str | None = None, ) -> AsyncIterator[None]: @@ -30,13 +30,15 @@ async def periodic_task_lifespan( Args: app: The aiohttp web application - periodic_task_coro: The periodic task coroutine function (already decorated with @exclusive_periodic) + periodic_async_func: The periodic coroutine function (already decorated with @exclusive_periodic) """ + assert getattr(periodic_async_func, "__exclusive_periodic__", False) # nosec + # setup - task_name = task_name or create_task_name(periodic_task_coro) + task_name = task_name or create_task_name(periodic_async_func) task = asyncio.create_task( - periodic_task_coro(), + periodic_async_func(), name=task_name, )