|
6 | 6 | import asyncio |
7 | 7 | import logging |
8 | 8 | from collections.abc import AsyncIterator, Callable |
| 9 | +from datetime import timedelta |
9 | 10 |
|
10 | 11 | from aiohttp import web |
11 | | -from tenacity import retry |
12 | | -from tenacity.before_sleep import before_sleep_log |
13 | | -from tenacity.wait import wait_exponential |
| 12 | +from servicelib.async_utils import cancel_wait_task |
| 13 | +from servicelib.background_task_utils import exclusive_periodic |
| 14 | +from servicelib.logging_utils import log_context |
| 15 | +from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk |
14 | 16 |
|
15 | 17 | from ..api_keys import api_keys_service |
16 | 18 |
|
17 | | -logger = logging.getLogger(__name__) |
| 19 | +_logger = logging.getLogger(__name__) |
18 | 20 |
|
19 | 21 | CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]] |
20 | 22 |
|
21 | 23 |
|
22 | | -_PERIODIC_TASK_NAME = f"{__name__}.prune_expired_api_keys_periodically" |
23 | | -_APP_TASK_KEY = f"{_PERIODIC_TASK_NAME}.task" |
24 | | - |
25 | | - |
26 | | -@retry( |
27 | | - wait=wait_exponential(min=5, max=30), |
28 | | - before_sleep=before_sleep_log(logger, logging.WARNING), |
29 | | -) |
30 | 24 | async def _run_task(app: web.Application): |
31 | | - """Periodically check expiration dates and updates user status |
32 | | -
|
33 | | - It is resilient, i.e. if update goes wrong, it waits a bit and retries |
34 | | - """ |
| 25 | + """Checks expiration dates and updates user status""" |
35 | 26 | if deleted := await api_keys_service.prune_expired_api_keys(app): |
36 | 27 | # broadcast force logout of user_id |
37 | 28 | for api_key in deleted: |
38 | | - logger.info("API-key %s expired and was removed", f"{api_key=}") |
| 29 | + _logger.info("API-key %s expired and was removed", f"{api_key=}") |
39 | 30 |
|
40 | 31 | else: |
41 | | - logger.info("No API keys expired") |
| 32 | + _logger.info("No API keys expired") |
42 | 33 |
|
43 | 34 |
|
44 | | -async def _run_periodically(app: web.Application, wait_period_s: float): |
45 | | - """Periodically check expiration dates and updates user status |
| 35 | +def create_background_task_to_prune_api_keys( |
| 36 | + wait_period_s: float, |
| 37 | +) -> CleanupContextFunc: |
46 | 38 |
|
47 | | - It is resilient, i.e. if update goes wrong, it waits a bit and retries |
48 | | - """ |
49 | | - while True: |
50 | | - await _run_task(app) |
51 | | - await asyncio.sleep(wait_period_s) |
| 39 | + async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: |
52 | 40 |
|
| 41 | + @exclusive_periodic( |
| 42 | + # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently |
| 43 | + get_redis_lock_manager_client_sdk(app), |
| 44 | + task_interval=timedelta(seconds=wait_period_s), |
| 45 | + retry_after=timedelta(minutes=5), |
| 46 | + ) |
| 47 | + async def _prune_expired_api_keys_periodically() -> None: |
| 48 | + with log_context(_logger, logging.INFO, "Pruning expired API keys"): |
| 49 | + await _run_task(app) |
53 | 50 |
|
54 | | -def create_background_task_to_prune_api_keys( |
55 | | - wait_period_s: float, task_name: str = _PERIODIC_TASK_NAME |
56 | | -) -> CleanupContextFunc: |
57 | | - async def _cleanup_ctx_fun( |
58 | | - app: web.Application, |
59 | | - ) -> AsyncIterator[None]: |
60 | 51 | # setup |
| 52 | + task_name = _prune_expired_api_keys_periodically.__name__ |
| 53 | + |
61 | 54 | task = asyncio.create_task( |
62 | | - _run_periodically(app, wait_period_s), |
| 55 | + _prune_expired_api_keys_periodically(), |
63 | 56 | name=task_name, |
64 | 57 | ) |
65 | | - app[_APP_TASK_KEY] = task |
| 58 | + |
| 59 | + # prevents premature garbage collection of the task |
| 60 | + app_task_key = f"tasks.{task_name}" |
| 61 | + app[app_task_key] = task |
66 | 62 |
|
67 | 63 | yield |
68 | 64 |
|
69 | 65 | # tear-down |
70 | | - task.cancel() |
71 | | - try: |
72 | | - await task |
73 | | - except asyncio.CancelledError: |
74 | | - assert task.cancelled() # nosec |
| 66 | + await cancel_wait_task(task) |
| 67 | + app.pop(app_task_key, None) |
75 | 68 |
|
76 | 69 | return _cleanup_ctx_fun |
0 commit comments