-
Notifications
You must be signed in to change notification settings - Fork 32
♻️ web-server: Upgrade GC periodic tasks to new servicelib.background_task
#7970
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
pcrespov
merged 27 commits into
ITISFoundation:master
from
pcrespov:is7961/upgrade-gc-periodic-task
Jun 26, 2025
Merged
Changes from 17 commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
4a55849
prune trash
pcrespov aee674a
🎨 Refactor user expiration update task: Implement periodic updates wi…
pcrespov f59b831
🎨 Refactor API key pruning task: Simplify periodic execution and enha…
pcrespov 655cdd5
🎨 Refactor garbage collection task: Update to use new background task…
pcrespov 2f16b47
🎨 Refactor background task management: Introduce setup_periodic_task …
pcrespov 29a6028
cleanup
pcrespov 700f0b8
add setup
pcrespov 6071dcd
inti
pcrespov a512323
equal periodic and retry
pcrespov 45b4709
fixes pylint
pcrespov 2fcd200
fixes test
pcrespov 7d0a307
minor
pcrespov d20f8f7
tuning
pcrespov 204d3df
tuning retry
pcrespov e034603
update
pcrespov de52738
rm unused
pcrespov 97eba62
rm todo
pcrespov a9d6b6a
Merge branch 'master' into is7961/upgrade-gc-periodic-task
pcrespov 9d02f48
♻️ Refactor periodic task setup to use `periodic_task_lifespan` utility
pcrespov 631e7f3
Merge branch 'master' into is7961/upgrade-gc-periodic-task
pcrespov 1f430f7
@sanderegg review: fun is not coro
pcrespov ad11d6a
@sanderegg review: ops services
pcrespov 04f77e7
@sanderegg review:doc
pcrespov 01f08e3
@bisgaard-itis review: check if decorated
pcrespov 8584f23
Merge branch 'master' into is7961/upgrade-gc-periodic-task
pcrespov 99edd6b
Merge branch 'master' into is7961/upgrade-gc-periodic-task
mergify[bot] b4a644c
Merge branch 'master' into is7961/upgrade-gc-periodic-task
mergify[bot] File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
107 changes: 25 additions & 82 deletions
107
services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,103 +1,46 @@ | ||
| """ Setup and running of periodic background task | ||
| """Setup and running of periodic background task | ||
| Specifics of the gc implementation should go into garbage_collector_core.py | ||
| """ | ||
|
|
||
| import asyncio | ||
| import logging | ||
| from collections.abc import AsyncGenerator | ||
| from collections.abc import AsyncIterator | ||
| from datetime import timedelta | ||
|
|
||
| from aiohttp import web | ||
| from servicelib.background_task_utils import exclusive_periodic | ||
| from servicelib.logging_utils import log_context | ||
| from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk | ||
|
|
||
| from ._core import collect_garbage | ||
| from ._tasks_utils import CleanupContextFunc, setup_periodic_task | ||
| from .settings import GarbageCollectorSettings, get_plugin_settings | ||
|
|
||
| _logger = logging.getLogger(__name__) | ||
|
|
||
| _GC_TASK_NAME = f"{__name__}._collect_garbage_periodically" | ||
|
|
||
| _GC_TASK_NAME = f"background-task.{__name__}.collect_garbage_periodically" | ||
| _GC_TASK_CONFIG = f"{_GC_TASK_NAME}.config" | ||
| _GC_TASK = f"{_GC_TASK_NAME}.task" | ||
|
|
||
| def create_background_task_for_garbage_collection() -> CleanupContextFunc: | ||
|
|
||
| async def run_background_task(app: web.Application) -> AsyncGenerator: | ||
| # SETUP ------ | ||
| # create a background task to collect garbage periodically | ||
| assert not any( # nosec | ||
| t.get_name() == _GC_TASK_NAME for t in asyncio.all_tasks() | ||
| ), "Garbage collector task already running. ONLY ONE expected" # nosec | ||
| async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: | ||
| settings: GarbageCollectorSettings = get_plugin_settings(app) | ||
| interval = timedelta(seconds=settings.GARBAGE_COLLECTOR_INTERVAL_S) | ||
|
|
||
| gc_bg_task = asyncio.create_task( | ||
| _collect_garbage_periodically(app), name=_GC_TASK_NAME | ||
| ) | ||
| # attaches variable to the app's lifetime | ||
| app[_GC_TASK] = gc_bg_task | ||
| @exclusive_periodic( | ||
| # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently | ||
pcrespov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| get_redis_lock_manager_client_sdk(app), | ||
| task_interval=interval, | ||
| retry_after=min(timedelta(seconds=10), interval / 10), | ||
| ) | ||
| async def _collect_garbage_periodically() -> None: | ||
| with log_context(_logger, logging.INFO, "Garbage collect cycle"): | ||
| await collect_garbage(app) | ||
|
|
||
| # FIXME: added this config to overcome the state in which the | ||
| # task cancelation is ignored and the exceptions enter in a loop | ||
| # that never stops the background task. This flag is an additional | ||
| # mechanism to enforce stopping the background task | ||
| # | ||
| # Implemented with a mutable dict to avoid | ||
| # DeprecationWarning: Changing state of started or joined application is deprecated | ||
| # | ||
| app[_GC_TASK_CONFIG] = {"force_stop": False, "name": _GC_TASK_NAME} | ||
| async for _ in setup_periodic_task( | ||
| app, _collect_garbage_periodically, task_name=_GC_TASK_NAME | ||
| ): | ||
| yield | ||
|
|
||
| yield | ||
|
|
||
| # TEAR-DOWN ----- | ||
| # controlled cancelation of the gc task | ||
| try: | ||
| _logger.info("Stopping garbage collector...") | ||
|
|
||
| ack = gc_bg_task.cancel() | ||
| assert ack # nosec | ||
|
|
||
| app[_GC_TASK_CONFIG]["force_stop"] = True | ||
|
|
||
| await gc_bg_task | ||
|
|
||
| except asyncio.CancelledError: | ||
| assert gc_bg_task.cancelled() # nosec | ||
|
|
||
|
|
||
| async def _collect_garbage_periodically(app: web.Application): | ||
| settings: GarbageCollectorSettings = get_plugin_settings(app) | ||
| interval = settings.GARBAGE_COLLECTOR_INTERVAL_S | ||
|
|
||
| while True: | ||
| try: | ||
| while True: | ||
| with log_context(_logger, logging.INFO, "Garbage collect cycle"): | ||
| await collect_garbage(app) | ||
|
|
||
| if app[_GC_TASK_CONFIG].get("force_stop", False): | ||
| msg = "Forced to stop garbage collection" | ||
| raise RuntimeError(msg) | ||
|
|
||
| _logger.info("Garbage collect cycle pauses %ss", interval) | ||
| await asyncio.sleep(interval) | ||
|
|
||
| except asyncio.CancelledError: # EXIT # noqa: PERF203 | ||
| _logger.info( | ||
| "Stopped: Garbage collection task was cancelled, it will not restart!" | ||
| ) | ||
| # do not catch Cancellation errors | ||
| raise | ||
|
|
||
| except Exception: # RESILIENT restart # pylint: disable=broad-except | ||
| _logger.warning( | ||
| "Stopped: There was an error during garbage collection, restarting...", | ||
| exc_info=True, | ||
| ) | ||
|
|
||
| if app[_GC_TASK_CONFIG].get("force_stop", False): | ||
| _logger.warning("Forced to stop garbage collection") | ||
| break | ||
|
|
||
| # will wait 5 seconds to recover before restarting to avoid restart loops | ||
| # - it might be that db/redis is down, etc | ||
| # | ||
| await asyncio.sleep(5) | ||
| return _cleanup_ctx_fun | ||
63 changes: 19 additions & 44 deletions
63
services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,64 +1,39 @@ | ||
| """ | ||
| Scheduled tasks addressing users | ||
| Scheduled tasks addressing users | ||
|
|
||
| """ | ||
|
|
||
| import asyncio | ||
| import logging | ||
| from collections.abc import AsyncIterator, Callable | ||
| from collections.abc import AsyncIterator | ||
| from datetime import timedelta | ||
|
|
||
| from aiohttp import web | ||
| from servicelib.background_task_utils import exclusive_periodic | ||
| from servicelib.logging_utils import log_context | ||
| from tenacity import retry | ||
| from tenacity.before_sleep import before_sleep_log | ||
| from tenacity.wait import wait_exponential | ||
| from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk | ||
|
|
||
| from ..trash import trash_service | ||
| from ._tasks_utils import CleanupContextFunc, setup_periodic_task | ||
|
|
||
| _logger = logging.getLogger(__name__) | ||
|
|
||
| CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]] | ||
|
|
||
| def create_background_task_to_prune_trash(wait_s: float) -> CleanupContextFunc: | ||
|
|
||
| _PERIODIC_TASK_NAME = f"{__name__}" | ||
| _APP_TASK_KEY = f"{_PERIODIC_TASK_NAME}.task" | ||
| async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: | ||
| interval = timedelta(seconds=wait_s) | ||
|
|
||
|
|
||
| @retry( | ||
| wait=wait_exponential(min=5, max=20), | ||
| before_sleep=before_sleep_log(_logger, logging.WARNING), | ||
| ) | ||
| async def _run_task(app: web.Application): | ||
| with log_context(_logger, logging.INFO, "Deleting expired trashed items"): | ||
| await trash_service.safe_delete_expired_trash_as_admin(app) | ||
|
|
||
|
|
||
| async def _run_periodically(app: web.Application, wait_interval_s: float): | ||
| while True: | ||
| await _run_task(app) | ||
| await asyncio.sleep(wait_interval_s) | ||
|
|
||
|
|
||
| def create_background_task_to_prune_trash( | ||
| wait_s: float, task_name: str = _PERIODIC_TASK_NAME | ||
| ) -> CleanupContextFunc: | ||
| async def _cleanup_ctx_fun( | ||
| app: web.Application, | ||
| ) -> AsyncIterator[None]: | ||
| # setup | ||
| task = asyncio.create_task( | ||
| _run_periodically(app, wait_s), | ||
| name=task_name, | ||
| @exclusive_periodic( | ||
| # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently | ||
| get_redis_lock_manager_client_sdk(app), | ||
| task_interval=interval, | ||
| retry_after=min(timedelta(seconds=10), interval / 10), | ||
| ) | ||
| app[_APP_TASK_KEY] = task | ||
|
|
||
| yield | ||
| async def _prune_trash_periodically() -> None: | ||
| with log_context(_logger, logging.INFO, "Deleting expired trashed items"): | ||
| await trash_service.safe_delete_expired_trash_as_admin(app) | ||
|
|
||
| # tear-down | ||
| task.cancel() | ||
| try: | ||
| await task | ||
| except asyncio.CancelledError: | ||
| assert task.cancelled() # nosec | ||
| async for _ in setup_periodic_task(app, _prune_trash_periodically): | ||
| yield | ||
|
|
||
| return _cleanup_ctx_fun |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.