Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
from collections.abc import Awaitable, Callable
from datetime import timedelta
from typing import TypedDict

from fastapi import FastAPI
from servicelib.async_utils import cancel_wait_task
Expand All @@ -12,44 +11,31 @@
from .background_tasks import removal_policy_task
from .modules.redis import get_redis_lock_client


@exclusive_periodic(
get_redis_lock_client,
task_interval=timedelta(hours=1),
retry_after=timedelta(minutes=5),
)
async def periodic_removal_policy_task(app: FastAPI) -> None:
await removal_policy_task(app)


_logger = logging.getLogger(__name__)


class EfsGuardianBackgroundTask(TypedDict):
name: str
task_func: Callable


_EFS_GUARDIAN_BACKGROUND_TASKS = [
EfsGuardianBackgroundTask(
name="efs_removal_policy_task", task_func=periodic_removal_policy_task
)
]


def _on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]:
async def _startup() -> None:
with (
log_context(_logger, logging.INFO, msg="Efs Guardian startup.."),
log_context(_logger, logging.INFO, msg="Efs Guardian background task "),
log_catch(_logger, reraise=False),
):
app.state.efs_guardian_background_tasks = []
app.state.efs_guardian_removal_policy_background_task = None

# Setup periodic tasks
for task in _EFS_GUARDIAN_BACKGROUND_TASKS:
app.state.efs_guardian_background_tasks.append(
asyncio.create_task(task["task_func"](), name=task["name"])
)
_logger.info("starting efs guardian removal policy task")

@exclusive_periodic(
get_redis_lock_client(app),
task_interval=timedelta(hours=1),
retry_after=timedelta(minutes=5),
)
async def _periodic_removal_policy_task() -> None:
await removal_policy_task(app)

app.state.efs_guardian_removal_policy_background_task = asyncio.create_task(
_periodic_removal_policy_task(),
name=_periodic_removal_policy_task.__name__,
)

return _startup

Expand All @@ -63,12 +49,9 @@ async def _stop() -> None:
log_catch(_logger, reraise=False),
):
assert _app # nosec
if _app.state.efs_guardian_background_tasks:
await asyncio.gather(
*[
cancel_wait_task(task)
for task in _app.state.efs_guardian_background_tasks
]
if _app.state.efs_guardian_removal_policy_background_task:
await cancel_wait_task(
_app.state.efs_guardian_removal_policy_background_task
)

return _stop
Expand Down
Loading