Skip to content

Commit 9103583

Browse files
committed
🎨 Refactor garbage collection task: Update to use new background task creation method
1 parent 5e67629 commit 9103583

File tree

2 files changed

+34
-78
lines changed

2 files changed

+34
-78
lines changed
Lines changed: 33 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,103 +1,59 @@
1-
""" Setup and running of periodic background task
1+
"""Setup and running of periodic background task
22
33
44
Specifics of the gc implementation should go into garbage_collector_core.py
55
"""
66

77
import asyncio
88
import logging
9-
from collections.abc import AsyncGenerator
9+
from collections.abc import AsyncIterator, Callable
10+
from datetime import timedelta
1011

1112
from aiohttp import web
13+
from servicelib.async_utils import cancel_wait_task
14+
from servicelib.background_task_utils import exclusive_periodic
1215
from servicelib.logging_utils import log_context
16+
from simcore_service_webserver.redis import get_redis_lock_manager_client_sdk
1317

1418
from ._core import collect_garbage
1519
from .settings import GarbageCollectorSettings, get_plugin_settings
1620

1721
_logger = logging.getLogger(__name__)
1822

23+
CleanupContextFunc = Callable[[web.Application], AsyncIterator[None]]
1924

20-
_GC_TASK_NAME = f"background-task.{__name__}.collect_garbage_periodically"
21-
_GC_TASK_CONFIG = f"{_GC_TASK_NAME}.config"
22-
_GC_TASK = f"{_GC_TASK_NAME}.task"
2325

26+
def create_background_task_for_garbage_collection() -> CleanupContextFunc:
2427

25-
async def run_background_task(app: web.Application) -> AsyncGenerator:
26-
# SETUP ------
27-
# create a background task to collect garbage periodically
28-
assert not any( # nosec
29-
t.get_name() == _GC_TASK_NAME for t in asyncio.all_tasks()
30-
), "Garbage collector task already running. ONLY ONE expected" # nosec
28+
async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]:
29+
settings: GarbageCollectorSettings = get_plugin_settings(app)
3130

32-
gc_bg_task = asyncio.create_task(
33-
_collect_garbage_periodically(app), name=_GC_TASK_NAME
34-
)
35-
# attaches variable to the app's lifetime
36-
app[_GC_TASK] = gc_bg_task
31+
@exclusive_periodic(
32+
# Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently
33+
get_redis_lock_manager_client_sdk(app),
34+
task_interval=timedelta(seconds=settings.GARBAGE_COLLECTOR_INTERVAL_S),
35+
retry_after=timedelta(minutes=5),
36+
)
37+
async def _collect_garbage_periodically() -> None:
38+
with log_context(_logger, logging.INFO, "Garbage collect cycle"):
39+
await collect_garbage(app)
3740

38-
# FIXME: added this config to overcome the state in which the
39-
# task cancelation is ignored and the exceptions enter in a loop
40-
# that never stops the background task. This flag is an additional
41-
# mechanism to enforce stopping the background task
42-
#
43-
# Implemented with a mutable dict to avoid
44-
# DeprecationWarning: Changing state of started or joined application is deprecated
45-
#
46-
app[_GC_TASK_CONFIG] = {"force_stop": False, "name": _GC_TASK_NAME}
41+
# setup
42+
task_name = _collect_garbage_periodically.__name__
4743

48-
yield
44+
task = asyncio.create_task(
45+
_collect_garbage_periodically(),
46+
name=task_name,
47+
)
4948

50-
# TEAR-DOWN -----
51-
# controlled cancelation of the gc task
52-
try:
53-
_logger.info("Stopping garbage collector...")
49+
# prevents premature garbage collection of the task
50+
app_task_key = f"tasks.{task_name}"
51+
app[app_task_key] = task
5452

55-
ack = gc_bg_task.cancel()
56-
assert ack # nosec
53+
yield
5754

58-
app[_GC_TASK_CONFIG]["force_stop"] = True
55+
# tear-down
56+
await cancel_wait_task(task)
57+
app.pop(app_task_key, None)
5958

60-
await gc_bg_task
61-
62-
except asyncio.CancelledError:
63-
assert gc_bg_task.cancelled() # nosec
64-
65-
66-
async def _collect_garbage_periodically(app: web.Application):
67-
settings: GarbageCollectorSettings = get_plugin_settings(app)
68-
interval = settings.GARBAGE_COLLECTOR_INTERVAL_S
69-
70-
while True:
71-
try:
72-
while True:
73-
with log_context(_logger, logging.INFO, "Garbage collect cycle"):
74-
await collect_garbage(app)
75-
76-
if app[_GC_TASK_CONFIG].get("force_stop", False):
77-
msg = "Forced to stop garbage collection"
78-
raise RuntimeError(msg)
79-
80-
_logger.info("Garbage collect cycle pauses %ss", interval)
81-
await asyncio.sleep(interval)
82-
83-
except asyncio.CancelledError: # EXIT # noqa: PERF203
84-
_logger.info(
85-
"Stopped: Garbage collection task was cancelled, it will not restart!"
86-
)
87-
# do not catch Cancellation errors
88-
raise
89-
90-
except Exception: # RESILIENT restart # pylint: disable=broad-except
91-
_logger.warning(
92-
"Stopped: There was an error during garbage collection, restarting...",
93-
exc_info=True,
94-
)
95-
96-
if app[_GC_TASK_CONFIG].get("force_stop", False):
97-
_logger.warning("Forced to stop garbage collection")
98-
break
99-
100-
# will wait 5 seconds to recover before restarting to avoid restart loops
101-
# - it might be that db/redis is down, etc
102-
#
103-
await asyncio.sleep(5)
59+
return _cleanup_ctx_fun

services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def setup_garbage_collector(app: web.Application) -> None:
3434

3535
settings = get_plugin_settings(app)
3636

37-
app.cleanup_ctx.append(_tasks_core.run_background_task)
37+
app.cleanup_ctx.append(_tasks_core.create_background_task_for_garbage_collection())
3838

3939
set_parent_module_log_level(
4040
_logger.name, min(logging.INFO, get_application_settings(app).log_level)

0 commit comments

Comments
 (0)