|
1 | 1 | """ |
2 | | - Scheduled tasks addressing users |
| 2 | +Scheduled tasks addressing users |
3 | 3 |
|
4 | 4 | """ |
5 | 5 |
|
6 | 6 | import asyncio |
7 | 7 | import logging |
8 | 8 | from collections.abc import AsyncIterator, Callable |
| 9 | +from datetime import timedelta |
9 | 10 |
|
10 | 11 | from aiohttp import web |
| 12 | +from servicelib.async_utils import cancel_wait_task |
| 13 | +from servicelib.background_task_utils import exclusive_periodic |
11 | 14 | from servicelib.logging_utils import log_context |
12 | | -from tenacity import retry |
13 | | -from tenacity.before_sleep import before_sleep_log |
14 | | -from tenacity.wait import wait_exponential |
| 15 | +from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk |
15 | 16 |
|
16 | 17 | from ..trash import trash_service |
17 | 18 |
|
|
20 | 21 | CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]] |
21 | 22 |
|
22 | 23 |
|
23 | | -_PERIODIC_TASK_NAME = f"{__name__}" |
24 | | -_APP_TASK_KEY = f"{_PERIODIC_TASK_NAME}.task" |
| 24 | +def create_background_task_to_prune_trash(wait_s: float) -> CleanupContextFunc: |
25 | 25 |
|
| 26 | + async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: |
26 | 27 |
|
27 | | -@retry( |
28 | | - wait=wait_exponential(min=5, max=20), |
29 | | - before_sleep=before_sleep_log(_logger, logging.WARNING), |
30 | | -) |
31 | | -async def _run_task(app: web.Application): |
32 | | - with log_context(_logger, logging.INFO, "Deleting expired trashed items"): |
33 | | - await trash_service.safe_delete_expired_trash_as_admin(app) |
34 | | - |
35 | | - |
36 | | -async def _run_periodically(app: web.Application, wait_interval_s: float): |
37 | | - while True: |
38 | | - await _run_task(app) |
39 | | - await asyncio.sleep(wait_interval_s) |
40 | | - |
| 28 | + @exclusive_periodic( |
| 29 | + # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently |
| 30 | + get_redis_lock_manager_client_sdk(app), |
| 31 | + task_interval=timedelta(seconds=wait_s), |
| 32 | + retry_after=timedelta(minutes=5), |
| 33 | + ) |
| 34 | + async def _prune_trash_periodically() -> None: |
| 35 | + with log_context(_logger, logging.INFO, "Deleting expired trashed items"): |
| 36 | + await trash_service.safe_delete_expired_trash_as_admin(app) |
41 | 37 |
|
42 | | -def create_background_task_to_prune_trash( |
43 | | - wait_s: float, task_name: str = _PERIODIC_TASK_NAME |
44 | | -) -> CleanupContextFunc: |
45 | | - async def _cleanup_ctx_fun( |
46 | | - app: web.Application, |
47 | | - ) -> AsyncIterator[None]: |
48 | 38 | # setup |
| 39 | + task_name = _prune_trash_periodically.__name__ |
| 40 | + |
49 | 41 | task = asyncio.create_task( |
50 | | - _run_periodically(app, wait_s), |
| 42 | + _prune_trash_periodically(), |
51 | 43 | name=task_name, |
52 | 44 | ) |
53 | | - app[_APP_TASK_KEY] = task |
| 45 | + |
| 46 | + # prevents premature garbage collection of the task |
| 47 | + app_task_key = f"tasks.{task_name}" |
| 48 | + app[app_task_key] = task |
54 | 49 |
|
55 | 50 | yield |
56 | 51 |
|
57 | 52 | # tear-down |
58 | | - task.cancel() |
59 | | - try: |
60 | | - await task |
61 | | - except asyncio.CancelledError: |
62 | | - assert task.cancelled() # nosec |
| 53 | + await cancel_wait_task(task) |
| 54 | + app.pop(app_task_key, None) |
63 | 55 |
|
64 | 56 | return _cleanup_ctx_fun |
0 commit comments