Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4a55849
prune trash
pcrespov Jun 24, 2025
aee674a
🎨 Refactor user expiration update task: Implement periodic updates wi…
pcrespov Jun 24, 2025
f59b831
🎨 Refactor API key pruning task: Simplify periodic execution and enha…
pcrespov Jun 24, 2025
655cdd5
🎨 Refactor garbage collection task: Update to use new background task…
pcrespov Jun 24, 2025
2f16b47
🎨 Refactor background task management: Introduce setup_periodic_task …
pcrespov Jun 24, 2025
29a6028
cleanup
pcrespov Jun 24, 2025
700f0b8
add setup
pcrespov Jun 24, 2025
6071dcd
inti
pcrespov Jun 24, 2025
a512323
equal periodic and retry
pcrespov Jun 24, 2025
45b4709
fixes pylint
pcrespov Jun 24, 2025
2fcd200
fixes test
pcrespov Jun 24, 2025
7d0a307
minor
pcrespov Jun 24, 2025
d20f8f7
tuning
pcrespov Jun 24, 2025
204d3df
tuning retry
pcrespov Jun 24, 2025
e034603
update
pcrespov Jun 24, 2025
de52738
rm unused
pcrespov Jun 24, 2025
97eba62
rm todo
pcrespov Jun 24, 2025
a9d6b6a
Merge branch 'master' into is7961/upgrade-gc-periodic-task
pcrespov Jun 25, 2025
9d02f48
♻️ Refactor periodic task setup to use `periodic_task_lifespan` utility
pcrespov Jun 25, 2025
631e7f3
Merge branch 'master' into is7961/upgrade-gc-periodic-task
pcrespov Jun 26, 2025
1f430f7
@sanderegg review: fun is not coro
pcrespov Jun 26, 2025
ad11d6a
@sanderegg review: ops services
pcrespov Jun 26, 2025
04f77e7
@sanderegg review:doc
pcrespov Jun 26, 2025
01f08e3
@bisgaard-itis review: check if decorated
pcrespov Jun 26, 2025
8584f23
Merge branch 'master' into is7961/upgrade-gc-periodic-task
pcrespov Jun 26, 2025
99edd6b
Merge branch 'master' into is7961/upgrade-gc-periodic-task
mergify[bot] Jun 26, 2025
b4a644c
Merge branch 'master' into is7961/upgrade-gc-periodic-task
mergify[bot] Jun 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/service-library/src/servicelib/background_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def periodic(
) -> Callable[
[Callable[P, Coroutine[Any, Any, None]]], Callable[P, Coroutine[Any, Any, None]]
]:
"""Calls the function periodically with a given interval.
"""Calls the function periodically with a given interval or triggered by an early wake-up event.

Arguments:
interval -- the interval between calls
Expand All @@ -58,7 +58,7 @@ def periodic(
"""

def _decorator(
func: Callable[P, Coroutine[Any, Any, None]],
coro: Callable[P, Coroutine[Any, Any, None]],
) -> Callable[P, Coroutine[Any, Any, None]]:
class _InternalTryAgain(TryAgain):
# Local exception to prevent reacting to similarTryAgain exceptions raised by the wrapped func
Expand All @@ -82,10 +82,10 @@ class _InternalTryAgain(TryAgain):
),
before_sleep=before_sleep_log(_logger, logging.DEBUG),
)
@functools.wraps(func)
@functools.wraps(coro)
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
with log_catch(_logger, reraise=True):
await func(*args, **kwargs)
await coro(*args, **kwargs)
raise _InternalTryAgain

return _wrapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def _decorator(
# Replicas will raise CouldNotAcquireLockError
# SEE https://github.com/ITISFoundation/osparc-simcore/issues/7574
(CouldNotAcquireLockError,),
reason="Multiple instances of the periodic task `{coro.__module__}.{coro.__name__}` are running.",
reason=f"Multiple instances of the periodic task `{coro.__module__}.{coro.__name__}` are running.",
)
@exclusive(
redis_client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,71 +3,51 @@

"""

import asyncio
import logging
from collections.abc import AsyncIterator, Callable
from collections.abc import AsyncIterator
from datetime import timedelta

from aiohttp import web
from common_library.async_tools import cancel_wait_task
from tenacity import retry
from tenacity.before_sleep import before_sleep_log
from tenacity.wait import wait_exponential
from servicelib.background_task_utils import exclusive_periodic
from servicelib.logging_utils import log_context
from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk

from ..api_keys import api_keys_service
from ._tasks_utils import CleanupContextFunc, periodic_task_lifespan

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)

CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]]


_PERIODIC_TASK_NAME = f"{__name__}.prune_expired_api_keys_periodically"
_APP_TASK_KEY = f"{_PERIODIC_TASK_NAME}.task"


@retry(
wait=wait_exponential(min=5, max=30),
before_sleep=before_sleep_log(logger, logging.WARNING),
)
async def _run_task(app: web.Application):
"""Periodically check expiration dates and updates user status

It is resilient, i.e. if update goes wrong, it waits a bit and retries
"""
async def _prune_expired_api_keys(app: web.Application):
if deleted := await api_keys_service.prune_expired_api_keys(app):
# broadcast force logout of user_id
for api_key in deleted:
logger.info("API-key %s expired and was removed", f"{api_key=}")
_logger.info("API-key %s expired and was removed", f"{api_key=}")

else:
logger.info("No API keys expired")


async def _run_periodically(app: web.Application, wait_period_s: float):
"""Periodically check expiration dates and updates user status

It is resilient, i.e. if update goes wrong, it waits a bit and retries
"""
while True:
await _run_task(app)
await asyncio.sleep(wait_period_s)
_logger.info("No API keys expired")


def create_background_task_to_prune_api_keys(
wait_period_s: float, task_name: str = _PERIODIC_TASK_NAME
wait_period_s: float,
) -> CleanupContextFunc:
async def _cleanup_ctx_fun(
app: web.Application,
) -> AsyncIterator[None]:
# setup
task = asyncio.create_task(
_run_periodically(app, wait_period_s),
name=task_name,
)
app[_APP_TASK_KEY] = task

yield
async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]:
interval = timedelta(seconds=wait_period_s)

# tear-down
await cancel_wait_task(task)
@exclusive_periodic(
# Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently
get_redis_lock_manager_client_sdk(app),
task_interval=interval,
retry_after=min(timedelta(seconds=10), interval / 10),
)
async def _prune_expired_api_keys_periodically() -> None:
with log_context(_logger, logging.INFO, "Pruning expired API keys"):
await _prune_expired_api_keys(app)

async for _ in periodic_task_lifespan(
app, _prune_expired_api_keys_periodically
):
yield

return _cleanup_ctx_fun
Original file line number Diff line number Diff line change
Expand Up @@ -4,97 +4,43 @@
Specifics of the gc implementation should go into garbage_collector_core.py
"""

import asyncio
import logging
from collections.abc import AsyncGenerator
from collections.abc import AsyncIterator
from datetime import timedelta

from aiohttp import web
from common_library.async_tools import cancel_wait_task
from servicelib.background_task_utils import exclusive_periodic
from servicelib.logging_utils import log_context
from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk

from ._core import collect_garbage
from ._tasks_utils import CleanupContextFunc, periodic_task_lifespan
from .settings import GarbageCollectorSettings, get_plugin_settings

_logger = logging.getLogger(__name__)

_GC_TASK_NAME = f"{__name__}._collect_garbage_periodically"

_GC_TASK_NAME = f"background-task.{__name__}.collect_garbage_periodically"
_GC_TASK_CONFIG = f"{_GC_TASK_NAME}.config"
_GC_TASK = f"{_GC_TASK_NAME}.task"

def create_background_task_for_garbage_collection() -> CleanupContextFunc:

async def run_background_task(app: web.Application) -> AsyncGenerator:
# SETUP ------
# create a background task to collect garbage periodically
assert not any( # nosec
t.get_name() == _GC_TASK_NAME for t in asyncio.all_tasks()
), "Garbage collector task already running. ONLY ONE expected" # nosec
async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]:
settings: GarbageCollectorSettings = get_plugin_settings(app)
interval = timedelta(seconds=settings.GARBAGE_COLLECTOR_INTERVAL_S)

gc_bg_task = asyncio.create_task(
_collect_garbage_periodically(app), name=_GC_TASK_NAME
)
# attaches variable to the app's lifetime
app[_GC_TASK] = gc_bg_task
@exclusive_periodic(
# Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently
get_redis_lock_manager_client_sdk(app),
task_interval=interval,
retry_after=min(timedelta(seconds=10), interval / 10),
)
async def _collect_garbage_periodically() -> None:
with log_context(_logger, logging.INFO, "Garbage collect cycle"):
await collect_garbage(app)

# FIXME: added this config to overcome the state in which the
# task cancelation is ignored and the exceptions enter in a loop
# that never stops the background task. This flag is an additional
# mechanism to enforce stopping the background task
#
# Implemented with a mutable dict to avoid
# DeprecationWarning: Changing state of started or joined application is deprecated
#
app[_GC_TASK_CONFIG] = {"force_stop": False, "name": _GC_TASK_NAME}
async for _ in periodic_task_lifespan(
app, _collect_garbage_periodically, task_name=_GC_TASK_NAME
):
yield

yield

# TEAR-DOWN -----
# controlled cancelation of the gc task
_logger.info("Stopping garbage collector...")

ack = gc_bg_task.cancel()
assert ack # nosec

app[_GC_TASK_CONFIG]["force_stop"] = True

await cancel_wait_task(gc_bg_task)


async def _collect_garbage_periodically(app: web.Application):
settings: GarbageCollectorSettings = get_plugin_settings(app)
interval = settings.GARBAGE_COLLECTOR_INTERVAL_S

while True:
try:
while True:
with log_context(_logger, logging.INFO, "Garbage collect cycle"):
await collect_garbage(app)

if app[_GC_TASK_CONFIG].get("force_stop", False):
msg = "Forced to stop garbage collection"
raise RuntimeError(msg)

_logger.info("Garbage collect cycle pauses %ss", interval)
await asyncio.sleep(interval)

except asyncio.CancelledError: # EXIT # noqa: PERF203
_logger.info(
"Stopped: Garbage collection task was cancelled, it will not restart!"
)
# do not catch Cancellation errors
raise

except Exception: # RESILIENT restart # pylint: disable=broad-except
_logger.warning(
"Stopped: There was an error during garbage collection, restarting...",
exc_info=True,
)

if app[_GC_TASK_CONFIG].get("force_stop", False):
_logger.warning("Forced to stop garbage collection")
break

# will wait 5 seconds to recover before restarting to avoid restart loops
# - it might be that db/redis is down, etc
#
await asyncio.sleep(5)
return _cleanup_ctx_fun
Original file line number Diff line number Diff line change
Expand Up @@ -3,59 +3,37 @@

"""

import asyncio
import logging
from collections.abc import AsyncIterator, Callable
from collections.abc import AsyncIterator
from datetime import timedelta

from aiohttp import web
from common_library.async_tools import cancel_wait_task
from servicelib.background_task_utils import exclusive_periodic
from servicelib.logging_utils import log_context
from tenacity import retry
from tenacity.before_sleep import before_sleep_log
from tenacity.wait import wait_exponential
from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk

from ..trash import trash_service
from ._tasks_utils import CleanupContextFunc, periodic_task_lifespan

_logger = logging.getLogger(__name__)

CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]]

def create_background_task_to_prune_trash(wait_s: float) -> CleanupContextFunc:

_PERIODIC_TASK_NAME = f"{__name__}"
_APP_TASK_KEY = f"{_PERIODIC_TASK_NAME}.task"
async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]:
interval = timedelta(seconds=wait_s)


@retry(
wait=wait_exponential(min=5, max=20),
before_sleep=before_sleep_log(_logger, logging.WARNING),
)
async def _run_task(app: web.Application):
with log_context(_logger, logging.INFO, "Deleting expired trashed items"):
await trash_service.safe_delete_expired_trash_as_admin(app)


async def _run_periodically(app: web.Application, wait_interval_s: float):
while True:
await _run_task(app)
await asyncio.sleep(wait_interval_s)


def create_background_task_to_prune_trash(
wait_s: float, task_name: str = _PERIODIC_TASK_NAME
) -> CleanupContextFunc:
async def _cleanup_ctx_fun(
app: web.Application,
) -> AsyncIterator[None]:
# setup
task = asyncio.create_task(
_run_periodically(app, wait_s),
name=task_name,
@exclusive_periodic(
# Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently
get_redis_lock_manager_client_sdk(app),
task_interval=interval,
retry_after=min(timedelta(seconds=10), interval / 10),
)
app[_APP_TASK_KEY] = task

yield
async def _prune_trash_periodically() -> None:
with log_context(_logger, logging.INFO, "Deleting expired trashed items"):
await trash_service.safe_delete_expired_trash_as_admin(app)

# tear-down
await cancel_wait_task(task)
async for _ in periodic_task_lifespan(app, _prune_trash_periodically):
yield

return _cleanup_ctx_fun
Loading
Loading