Skip to content

Commit b332b27

Browse files
committed
dsm
1 parent faaf5be commit b332b27

File tree

1 file changed

+27
-26
lines changed

1 file changed

+27
-26
lines changed

services/storage/src/simcore_service_storage/dsm_cleaner.py

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,16 @@
2020

2121
import asyncio
2222
import logging
23+
from collections.abc import Awaitable, Callable
2324
from datetime import timedelta
2425
from typing import cast
2526

27+
from fastapi import FastAPI
2628
from servicelib.async_utils import cancel_wait_task
2729
from servicelib.background_task_utils import exclusive_periodic
28-
from servicelib.logging_utils import log_catch, log_context
2930

30-
from .constants import APP_CONFIG_KEY, APP_DSM_KEY
31-
from .core.settings import ApplicationSettings
32-
from .dsm_factory import DataManagerProvider
31+
from .core.settings import get_application_settings
32+
from .dsm import get_dsm_provider
3333
from .modules.redis import get_redis_client
3434
from .simcore_s3_dsm import SimcoreS3DataManager
3535

@@ -40,35 +40,36 @@
4040

4141
async def dsm_cleaner_task(app: FastAPI) -> None:
4242
_logger.info("starting dsm cleaner task...")
43-
dsm: DataManagerProvider = app[APP_DSM_KEY]
43+
dsm = get_dsm_provider(app)
4444
simcore_s3_dsm: SimcoreS3DataManager = cast(
4545
SimcoreS3DataManager, dsm.get(SimcoreS3DataManager.get_location_id())
4646
)
4747
await simcore_s3_dsm.clean_expired_uploads()
4848

4949

50-
def setup_dsm_cleaner(app: FastAPI):
51-
async def _setup(app: FastAPI):
52-
with (
53-
log_context(_logger, logging.INFO, msg="setup dsm cleaner"),
54-
log_catch(_logger, reraise=False),
55-
):
56-
cfg: ApplicationSettings = app[APP_CONFIG_KEY]
57-
assert cfg.STORAGE_CLEANER_INTERVAL_S # nosec
50+
def setup_dsm_cleaner(app: FastAPI) -> None:
51+
async def _on_startup(app: FastAPI) -> None:
52+
cfg = get_application_settings(app)
53+
assert cfg.STORAGE_CLEANER_INTERVAL_S # nosec
5854

59-
@exclusive_periodic(
60-
get_redis_client(app),
61-
task_interval=timedelta(seconds=cfg.STORAGE_CLEANER_INTERVAL_S),
62-
retry_after=timedelta(minutes=5),
63-
)
64-
async def _periodic_dsm_clean() -> None:
65-
await dsm_cleaner_task(app)
55+
@exclusive_periodic(
56+
get_redis_client(app),
57+
task_interval=timedelta(seconds=cfg.STORAGE_CLEANER_INTERVAL_S),
58+
retry_after=timedelta(minutes=5),
59+
)
60+
async def _periodic_dsm_clean() -> None:
61+
await dsm_cleaner_task(app)
6662

67-
storage_background_task = asyncio.create_task(
68-
_periodic_dsm_clean(), name=_TASK_NAME_PERIODICALY_CLEAN_DSM
69-
)
70-
yield
63+
app.state.dsm_cleaner_task = asyncio.create_task(
64+
_periodic_dsm_clean(), name=_TASK_NAME_PERIODICALY_CLEAN_DSM
65+
)
7166

72-
await cancel_wait_task(storage_background_task)
67+
def _on_shutdown(app: FastAPI) -> Callable[[], Awaitable[None]]:
68+
async def _stop() -> None:
69+
assert isinstance(app.state.dsm_cleaner_task, asyncio.Task) # nosec
70+
await cancel_wait_task(app.state.dsm_cleaner_task)
7371

74-
app.cleanup_ctx.append(_setup)
72+
return _stop
73+
74+
app.add_event_handler("startup", _on_startup)
75+
app.add_event_handler("shutdown", _on_shutdown(app))

0 commit comments

Comments
 (0)