Skip to content

Commit 71d3b18

Browse files
committed
change usage
1 parent 2430054 commit 71d3b18

File tree

6 files changed

+47
-50
lines changed

6 files changed

+47
-50
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import logging
23
from datetime import timedelta
34
from functools import cached_property
@@ -8,7 +9,8 @@
89
from models_library.projects_nodes_io import NodeID
910
from pydantic import NonNegativeFloat, NonNegativeInt
1011
from servicelib.async_utils import cancel_wait_task
11-
from servicelib.redis import create_exclusive_periodic_task
12+
from servicelib.background_task_utils import exclusive_periodic
13+
from servicelib.redis._client import RedisClientSDK
1214
from servicelib.utils import limited_gather
1315
from settings_library.redis import RedisDatabase
1416

@@ -60,6 +62,10 @@ def _can_be_removed(model: TrackedServiceModel) -> bool:
6062
return False
6163

6264

65+
def _get_redis_client_from_monitor(monitor: "Monitor") -> RedisClientSDK:
66+
return get_redis_client(monitor.app, RedisDatabase.LOCKS)
67+
68+
6369
class Monitor:
6470
def __init__(self, app: FastAPI, status_worker_interval: timedelta) -> None:
6571
self.app = app
@@ -69,6 +75,11 @@ def __init__(self, app: FastAPI, status_worker_interval: timedelta) -> None:
6975
def status_worker_interval_seconds(self) -> NonNegativeFloat:
7076
return self.status_worker_interval.total_seconds()
7177

78+
@exclusive_periodic(
79+
_get_redis_client_from_monitor,
80+
task_interval=_INTERVAL_BETWEEN_CHECKS,
81+
retry_after=_INTERVAL_BETWEEN_CHECKS,
82+
)
7283
async def _worker_check_services_require_status_update(self) -> None:
7384
"""
7485
Check if any service requires it's status to be polled.
@@ -133,12 +144,9 @@ async def _worker_check_services_require_status_update(self) -> None:
133144
)
134145

135146
async def setup(self) -> None:
136-
self.app.state.status_monitor_background_task = create_exclusive_periodic_task(
137-
get_redis_client(self.app, RedisDatabase.LOCKS),
138-
self._worker_check_services_require_status_update,
139-
task_period=_INTERVAL_BETWEEN_CHECKS,
140-
retry_after=_INTERVAL_BETWEEN_CHECKS,
141-
task_name="periodic_service_status_update",
147+
self.app.state.status_monitor_background_task = asyncio.create_task(
148+
self._worker_check_services_require_status_update(),
149+
name="periodic_service_status_update",
142150
)
143151

144152
async def shutdown(self) -> None:

services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import logging
2-
from datetime import datetime, timezone
2+
from datetime import UTC, datetime, timedelta
33

44
from fastapi import FastAPI
55
from models_library.projects import ProjectID
66
from models_library.projects_state import ProjectStatus
7+
from servicelib.background_task_utils import exclusive_periodic
78
from servicelib.logging_utils import log_context
89
from servicelib.project_lock import (
910
PROJECT_LOCK_TIMEOUT,
@@ -22,14 +23,19 @@
2223
_logger = logging.getLogger(__name__)
2324

2425

26+
@exclusive_periodic(
27+
get_redis_lock_client,
28+
task_interval=timedelta(hours=1),
29+
retry_after=timedelta(minutes=5),
30+
)
2531
async def removal_policy_task(app: FastAPI) -> None:
2632
_logger.info("Removal policy task started")
2733

2834
app_settings: ApplicationSettings = app.state.settings
2935
assert app_settings # nosec
3036
efs_manager: EfsManager = app.state.efs_manager
3137

32-
base_start_timestamp = datetime.now(tz=timezone.utc)
38+
base_start_timestamp = datetime.now(tz=UTC)
3339

3440
efs_project_ids: list[
3541
ProjectID

services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,16 @@
11
import asyncio
22
import logging
33
from collections.abc import Awaitable, Callable
4-
from datetime import timedelta
54
from typing import TypedDict
65

76
from fastapi import FastAPI
87
from servicelib.async_utils import cancel_wait_task
98
from servicelib.logging_utils import log_catch, log_context
10-
from servicelib.redis import create_exclusive_periodic_task
119

1210
from .background_tasks import removal_policy_task
13-
from .modules.redis import get_redis_lock_client
1411

1512
_logger = logging.getLogger(__name__)
1613

17-
_SEC = 1 # in s
18-
_MIN = 60 * _SEC # in s
19-
_HOUR = 60 * _MIN # in s
20-
2114

2215
class EfsGuardianBackgroundTask(TypedDict):
2316
name: str
@@ -41,15 +34,9 @@ async def _startup() -> None:
4134

4235
# Setup periodic tasks
4336
for task in _EFS_GUARDIAN_BACKGROUND_TASKS:
44-
exclusive_task = create_exclusive_periodic_task(
45-
get_redis_lock_client(app),
46-
task["task_func"],
47-
task_period=timedelta(seconds=1 * _HOUR),
48-
retry_after=timedelta(seconds=5 * _MIN),
49-
task_name=task["name"],
50-
app=app,
37+
app.state.efs_guardian_background_tasks.append(
38+
asyncio.create_task(task["task_func"](), name=task["name"])
5139
)
52-
app.state.efs_guardian_background_tasks.append(exclusive_task)
5340

5441
return _startup
5542

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ async def _close_unhealthy_service(
126126
)
127127

128128

129-
async def periodic_check_of_running_services_task(app: FastAPI) -> None:
129+
async def check_running_services(app: FastAPI) -> None:
130130
_logger.info("Periodic check started")
131131

132132
# This check runs across all products

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check_setup.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
1+
import asyncio
12
import logging
23
from collections.abc import Awaitable, Callable
34
from typing import TypedDict
45

56
from fastapi import FastAPI
67
from servicelib.async_utils import cancel_wait_task
8+
from servicelib.background_task_utils import exclusive_periodic
79
from servicelib.logging_utils import log_catch, log_context
8-
from servicelib.redis import create_exclusive_periodic_task
910

1011
from ..core.settings import ApplicationSettings
11-
from .background_task_periodic_heartbeat_check import (
12-
periodic_check_of_running_services_task,
13-
)
12+
from .background_task_periodic_heartbeat_check import check_running_services
1413
from .modules.redis import get_redis_lock_client
1514

1615
_logger = logging.getLogger(__name__)
@@ -38,17 +37,19 @@ async def _startup() -> None:
3837

3938
app.state.rut_background_task__periodic_check_of_running_services = None
4039

41-
# Setup periodic task
42-
exclusive_task = create_exclusive_periodic_task(
40+
@exclusive_periodic(
4341
get_redis_lock_client(app),
44-
periodic_check_of_running_services_task,
45-
task_period=app_settings.RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_INTERVAL_SEC,
42+
task_interval=app_settings.RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_INTERVAL_SEC,
4643
retry_after=app_settings.RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_INTERVAL_SEC,
47-
task_name=_TASK_NAME_PERIODICALY_CHECK_RUNNING_SERVICES,
48-
app=app,
4944
)
45+
async def _periodic_check_running_services() -> None:
46+
await check_running_services(app)
47+
5048
app.state.rut_background_task__periodic_check_of_running_services = (
51-
exclusive_task
49+
asyncio.create_task(
50+
_periodic_check_running_services(),
51+
name=_TASK_NAME_PERIODICALY_CHECK_RUNNING_SERVICES,
52+
)
5253
)
5354

5455
return _startup

services/storage/src/simcore_service_storage/dsm_cleaner.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525

2626
from aiohttp import web
2727
from servicelib.async_utils import cancel_wait_task
28+
from servicelib.background_task_utils import exclusive_periodic
2829
from servicelib.logging_utils import log_catch, log_context
29-
from servicelib.redis import create_exclusive_periodic_task
3030

3131
from .constants import APP_CONFIG_KEY, APP_DSM_KEY
3232
from .dsm_factory import DataManagerProvider
@@ -45,14 +45,7 @@ async def dsm_cleaner_task(app: web.Application) -> None:
4545
simcore_s3_dsm: SimcoreS3DataManager = cast(
4646
SimcoreS3DataManager, dsm.get(SimcoreS3DataManager.get_location_id())
4747
)
48-
try:
49-
await simcore_s3_dsm.clean_expired_uploads()
50-
51-
except asyncio.CancelledError: # noqa: PERF203
52-
_logger.info("cancelled dsm cleaner task")
53-
raise
54-
except Exception: # pylint: disable=broad-except
55-
_logger.exception("Unhandled error in dsm cleaner task, restarting task...")
48+
await simcore_s3_dsm.clean_expired_uploads()
5649

5750

5851
def setup_dsm_cleaner(app: web.Application):
@@ -64,15 +57,17 @@ async def _setup(app: web.Application):
6457
cfg: Settings = app[APP_CONFIG_KEY]
6558
assert cfg.STORAGE_CLEANER_INTERVAL_S # nosec
6659

67-
storage_background_task = create_exclusive_periodic_task(
60+
@exclusive_periodic(
6861
get_redis_client(app),
69-
dsm_cleaner_task,
70-
task_period=timedelta(seconds=cfg.STORAGE_CLEANER_INTERVAL_S),
62+
task_interval=timedelta(seconds=cfg.STORAGE_CLEANER_INTERVAL_S),
7163
retry_after=timedelta(minutes=5),
72-
task_name=_TASK_NAME_PERIODICALY_CLEAN_DSM,
73-
app=app,
7464
)
65+
async def _periodic_dsm_clean() -> None:
66+
await dsm_cleaner_task(app)
7567

68+
storage_background_task = asyncio.create_task(
69+
_periodic_dsm_clean(), name=_TASK_NAME_PERIODICALY_CLEAN_DSM
70+
)
7671
yield
7772

7873
await cancel_wait_task(storage_background_task)

0 commit comments

Comments
 (0)