Skip to content

Commit 6041cec

Browse files
authored
♻️ web-server: Upgrade GC periodic tasks to new servicelib.background_task (#7970)
1 parent 7f10771 commit 6041cec

File tree

11 files changed

+196
-241
lines changed

11 files changed

+196
-241
lines changed

packages/service-library/src/servicelib/background_task.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def periodic(
4242
) -> Callable[
4343
[Callable[P, Coroutine[Any, Any, None]]], Callable[P, Coroutine[Any, Any, None]]
4444
]:
45-
"""Calls the function periodically with a given interval.
45+
"""Calls the function periodically with a given interval or triggered by an early wake-up event.
4646
4747
Arguments:
4848
interval -- the interval between calls
@@ -58,7 +58,7 @@ def periodic(
5858
"""
5959

6060
def _decorator(
61-
func: Callable[P, Coroutine[Any, Any, None]],
61+
async_fun: Callable[P, Coroutine[Any, Any, None]],
6262
) -> Callable[P, Coroutine[Any, Any, None]]:
6363
class _InternalTryAgain(TryAgain):
6464
# Local exception to prevent reacting to similarTryAgain exceptions raised by the wrapped func
@@ -82,10 +82,10 @@ class _InternalTryAgain(TryAgain):
8282
),
8383
before_sleep=before_sleep_log(_logger, logging.DEBUG),
8484
)
85-
@functools.wraps(func)
85+
@functools.wraps(async_fun)
8686
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
8787
with log_catch(_logger, reraise=True):
88-
await func(*args, **kwargs)
88+
await async_fun(*args, **kwargs)
8989
raise _InternalTryAgain
9090

9191
return _wrapper

packages/service-library/src/servicelib/background_task_utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def _decorator(
4343
# Replicas will raise CouldNotAcquireLockError
4444
# SEE https://github.com/ITISFoundation/osparc-simcore/issues/7574
4545
(CouldNotAcquireLockError,),
46-
reason="Multiple instances of the periodic task `{coro.__module__}.{coro.__name__}` are running.",
46+
reason=f"Multiple instances of the periodic task `{coro.__module__}.{coro.__name__}` are running.",
4747
)
4848
@exclusive(
4949
redis_client,
@@ -54,6 +54,8 @@ def _decorator(
5454
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
5555
return await coro(*args, **kwargs)
5656

57+
# Marks with an identifier (mostly to assert a function has been decorated with this decorator)
58+
setattr(_wrapper, "__exclusive_periodic__", True) # noqa: B010
5759
return _wrapper
5860

5961
return _decorator

services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py

Lines changed: 27 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,71 +3,51 @@
33
44
"""
55

6-
import asyncio
76
import logging
8-
from collections.abc import AsyncIterator, Callable
7+
from collections.abc import AsyncIterator
8+
from datetime import timedelta
99

1010
from aiohttp import web
11-
from common_library.async_tools import cancel_wait_task
12-
from tenacity import retry
13-
from tenacity.before_sleep import before_sleep_log
14-
from tenacity.wait import wait_exponential
11+
from servicelib.background_task_utils import exclusive_periodic
12+
from servicelib.logging_utils import log_context
13+
from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk
1514

1615
from ..api_keys import api_keys_service
16+
from ._tasks_utils import CleanupContextFunc, periodic_task_lifespan
1717

18-
logger = logging.getLogger(__name__)
18+
_logger = logging.getLogger(__name__)
1919

20-
CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]]
2120

22-
23-
_PERIODIC_TASK_NAME = f"{__name__}.prune_expired_api_keys_periodically"
24-
_APP_TASK_KEY = f"{_PERIODIC_TASK_NAME}.task"
25-
26-
27-
@retry(
28-
wait=wait_exponential(min=5, max=30),
29-
before_sleep=before_sleep_log(logger, logging.WARNING),
30-
)
31-
async def _run_task(app: web.Application):
32-
"""Periodically check expiration dates and updates user status
33-
34-
It is resilient, i.e. if update goes wrong, it waits a bit and retries
35-
"""
21+
async def _prune_expired_api_keys(app: web.Application):
3622
if deleted := await api_keys_service.prune_expired_api_keys(app):
3723
# broadcast force logout of user_id
3824
for api_key in deleted:
39-
logger.info("API-key %s expired and was removed", f"{api_key=}")
25+
_logger.info("API-key %s expired and was removed", f"{api_key=}")
4026

4127
else:
42-
logger.info("No API keys expired")
43-
44-
45-
async def _run_periodically(app: web.Application, wait_period_s: float):
46-
"""Periodically check expiration dates and updates user status
47-
48-
It is resilient, i.e. if update goes wrong, it waits a bit and retries
49-
"""
50-
while True:
51-
await _run_task(app)
52-
await asyncio.sleep(wait_period_s)
28+
_logger.info("No API keys expired")
5329

5430

5531
def create_background_task_to_prune_api_keys(
56-
wait_period_s: float, task_name: str = _PERIODIC_TASK_NAME
32+
wait_period_s: float,
5733
) -> CleanupContextFunc:
58-
async def _cleanup_ctx_fun(
59-
app: web.Application,
60-
) -> AsyncIterator[None]:
61-
# setup
62-
task = asyncio.create_task(
63-
_run_periodically(app, wait_period_s),
64-
name=task_name,
65-
)
66-
app[_APP_TASK_KEY] = task
6734

68-
yield
35+
async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]:
36+
interval = timedelta(seconds=wait_period_s)
6937

70-
# tear-down
71-
await cancel_wait_task(task)
38+
@exclusive_periodic(
39+
# Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently
40+
get_redis_lock_manager_client_sdk(app),
41+
task_interval=interval,
42+
retry_after=min(timedelta(seconds=10), interval / 10),
43+
)
44+
async def _prune_expired_api_keys_periodically() -> None:
45+
with log_context(_logger, logging.INFO, "Pruning expired API keys"):
46+
await _prune_expired_api_keys(app)
47+
48+
async for _ in periodic_task_lifespan(
49+
app, _prune_expired_api_keys_periodically
50+
):
51+
yield
7252

7353
return _cleanup_ctx_fun

services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py

Lines changed: 24 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -4,97 +4,43 @@
44
Specifics of the gc implementation should go into garbage_collector_core.py
55
"""
66

7-
import asyncio
87
import logging
9-
from collections.abc import AsyncGenerator
8+
from collections.abc import AsyncIterator
9+
from datetime import timedelta
1010

1111
from aiohttp import web
12-
from common_library.async_tools import cancel_wait_task
12+
from servicelib.background_task_utils import exclusive_periodic
1313
from servicelib.logging_utils import log_context
14+
from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk
1415

1516
from ._core import collect_garbage
17+
from ._tasks_utils import CleanupContextFunc, periodic_task_lifespan
1618
from .settings import GarbageCollectorSettings, get_plugin_settings
1719

1820
_logger = logging.getLogger(__name__)
1921

22+
_GC_TASK_NAME = f"{__name__}._collect_garbage_periodically"
2023

21-
_GC_TASK_NAME = f"background-task.{__name__}.collect_garbage_periodically"
22-
_GC_TASK_CONFIG = f"{_GC_TASK_NAME}.config"
23-
_GC_TASK = f"{_GC_TASK_NAME}.task"
2424

25+
def create_background_task_for_garbage_collection() -> CleanupContextFunc:
2526

26-
async def run_background_task(app: web.Application) -> AsyncGenerator:
27-
# SETUP ------
28-
# create a background task to collect garbage periodically
29-
assert not any( # nosec
30-
t.get_name() == _GC_TASK_NAME for t in asyncio.all_tasks()
31-
), "Garbage collector task already running. ONLY ONE expected" # nosec
27+
async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]:
28+
settings: GarbageCollectorSettings = get_plugin_settings(app)
29+
interval = timedelta(seconds=settings.GARBAGE_COLLECTOR_INTERVAL_S)
3230

33-
gc_bg_task = asyncio.create_task(
34-
_collect_garbage_periodically(app), name=_GC_TASK_NAME
35-
)
36-
# attaches variable to the app's lifetime
37-
app[_GC_TASK] = gc_bg_task
31+
@exclusive_periodic(
32+
# Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently
33+
get_redis_lock_manager_client_sdk(app),
34+
task_interval=interval,
35+
retry_after=min(timedelta(seconds=10), interval / 10),
36+
)
37+
async def _collect_garbage_periodically() -> None:
38+
with log_context(_logger, logging.INFO, "Garbage collect cycle"):
39+
await collect_garbage(app)
3840

39-
# FIXME: added this config to overcome the state in which the
40-
# task cancelation is ignored and the exceptions enter in a loop
41-
# that never stops the background task. This flag is an additional
42-
# mechanism to enforce stopping the background task
43-
#
44-
# Implemented with a mutable dict to avoid
45-
# DeprecationWarning: Changing state of started or joined application is deprecated
46-
#
47-
app[_GC_TASK_CONFIG] = {"force_stop": False, "name": _GC_TASK_NAME}
41+
async for _ in periodic_task_lifespan(
42+
app, _collect_garbage_periodically, task_name=_GC_TASK_NAME
43+
):
44+
yield
4845

49-
yield
50-
51-
# TEAR-DOWN -----
52-
# controlled cancelation of the gc task
53-
_logger.info("Stopping garbage collector...")
54-
55-
ack = gc_bg_task.cancel()
56-
assert ack # nosec
57-
58-
app[_GC_TASK_CONFIG]["force_stop"] = True
59-
60-
await cancel_wait_task(gc_bg_task)
61-
62-
63-
async def _collect_garbage_periodically(app: web.Application):
64-
settings: GarbageCollectorSettings = get_plugin_settings(app)
65-
interval = settings.GARBAGE_COLLECTOR_INTERVAL_S
66-
67-
while True:
68-
try:
69-
while True:
70-
with log_context(_logger, logging.INFO, "Garbage collect cycle"):
71-
await collect_garbage(app)
72-
73-
if app[_GC_TASK_CONFIG].get("force_stop", False):
74-
msg = "Forced to stop garbage collection"
75-
raise RuntimeError(msg)
76-
77-
_logger.info("Garbage collect cycle pauses %ss", interval)
78-
await asyncio.sleep(interval)
79-
80-
except asyncio.CancelledError: # EXIT # noqa: PERF203
81-
_logger.info(
82-
"Stopped: Garbage collection task was cancelled, it will not restart!"
83-
)
84-
# do not catch Cancellation errors
85-
raise
86-
87-
except Exception: # RESILIENT restart # pylint: disable=broad-except
88-
_logger.warning(
89-
"Stopped: There was an error during garbage collection, restarting...",
90-
exc_info=True,
91-
)
92-
93-
if app[_GC_TASK_CONFIG].get("force_stop", False):
94-
_logger.warning("Forced to stop garbage collection")
95-
break
96-
97-
# will wait 5 seconds to recover before restarting to avoid restart loops
98-
# - it might be that db/redis is down, etc
99-
#
100-
await asyncio.sleep(5)
46+
return _cleanup_ctx_fun

services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py

Lines changed: 18 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,59 +3,37 @@
33
44
"""
55

6-
import asyncio
76
import logging
8-
from collections.abc import AsyncIterator, Callable
7+
from collections.abc import AsyncIterator
8+
from datetime import timedelta
99

1010
from aiohttp import web
11-
from common_library.async_tools import cancel_wait_task
11+
from servicelib.background_task_utils import exclusive_periodic
1212
from servicelib.logging_utils import log_context
13-
from tenacity import retry
14-
from tenacity.before_sleep import before_sleep_log
15-
from tenacity.wait import wait_exponential
13+
from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk
1614

1715
from ..trash import trash_service
16+
from ._tasks_utils import CleanupContextFunc, periodic_task_lifespan
1817

1918
_logger = logging.getLogger(__name__)
2019

21-
CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]]
2220

21+
def create_background_task_to_prune_trash(wait_s: float) -> CleanupContextFunc:
2322

24-
_PERIODIC_TASK_NAME = f"{__name__}"
25-
_APP_TASK_KEY = f"{_PERIODIC_TASK_NAME}.task"
23+
async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]:
24+
interval = timedelta(seconds=wait_s)
2625

27-
28-
@retry(
29-
wait=wait_exponential(min=5, max=20),
30-
before_sleep=before_sleep_log(_logger, logging.WARNING),
31-
)
32-
async def _run_task(app: web.Application):
33-
with log_context(_logger, logging.INFO, "Deleting expired trashed items"):
34-
await trash_service.safe_delete_expired_trash_as_admin(app)
35-
36-
37-
async def _run_periodically(app: web.Application, wait_interval_s: float):
38-
while True:
39-
await _run_task(app)
40-
await asyncio.sleep(wait_interval_s)
41-
42-
43-
def create_background_task_to_prune_trash(
44-
wait_s: float, task_name: str = _PERIODIC_TASK_NAME
45-
) -> CleanupContextFunc:
46-
async def _cleanup_ctx_fun(
47-
app: web.Application,
48-
) -> AsyncIterator[None]:
49-
# setup
50-
task = asyncio.create_task(
51-
_run_periodically(app, wait_s),
52-
name=task_name,
26+
@exclusive_periodic(
27+
# Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently
28+
get_redis_lock_manager_client_sdk(app),
29+
task_interval=interval,
30+
retry_after=min(timedelta(seconds=10), interval / 10),
5331
)
54-
app[_APP_TASK_KEY] = task
55-
56-
yield
32+
async def _prune_trash_periodically() -> None:
33+
with log_context(_logger, logging.INFO, "Deleting expired trashed items"):
34+
await trash_service.safe_delete_expired_trash_as_admin(app)
5735

58-
# tear-down
59-
await cancel_wait_task(task)
36+
async for _ in periodic_task_lifespan(app, _prune_trash_periodically):
37+
yield
6038

6139
return _cleanup_ctx_fun

0 commit comments

Comments
 (0)