|
1 | 1 | import logging |
2 | | -from contextlib import asynccontextmanager |
3 | | -from datetime import datetime, timedelta, timezone |
4 | | -from typing import AsyncIterator, Final |
| 2 | +from datetime import datetime, timezone |
5 | 3 |
|
6 | | -import redis |
7 | 4 | from fastapi import FastAPI |
8 | 5 | from models_library.projects import ProjectID |
9 | | -from models_library.projects_state import ProjectLocked, ProjectStatus |
10 | | -from redis.asyncio.lock import Lock |
11 | | -from servicelib.background_task import periodic_task |
12 | | -from servicelib.logging_utils import log_context |
| 6 | +from models_library.projects_state import ProjectStatus |
| 7 | +from servicelib.project_lock import ( |
| 8 | + PROJECT_LOCK_TIMEOUT, |
| 9 | + PROJECT_REDIS_LOCK_KEY, |
| 10 | + lock_project, |
| 11 | +) |
13 | 12 | from simcore_postgres_database.utils_projects import ProjectsRepo |
14 | 13 |
|
15 | 14 | from ..core.settings import ApplicationSettings |
|
19 | 18 | _logger = logging.getLogger(__name__) |
20 | 19 |
|
21 | 20 |
|
22 | | -PROJECT_REDIS_LOCK_KEY: str = "project_lock:{}" |
23 | | -PROJECT_LOCK_TIMEOUT: Final[timedelta] = timedelta(seconds=10) |
24 | | - |
25 | | - |
26 | | -async def _auto_extend_project_lock(project_lock: Lock) -> None: |
27 | | - # NOTE: the background task already catches anything that might raise here |
28 | | - await project_lock.reacquire() |
29 | | - |
30 | | - |
31 | | -@asynccontextmanager |
32 | | -async def lock_project( |
33 | | - app: FastAPI, |
34 | | - project_uuid: ProjectID, |
35 | | - status: ProjectStatus = ProjectStatus.MAINTAINING, |
36 | | -) -> AsyncIterator[None]: |
37 | | - """Context manager to lock and unlock a project by user_id |
38 | | -
|
39 | | - Raises: |
40 | | - ProjectLockError: if project is already locked |
41 | | - """ |
42 | | - |
43 | | - redis_lock = get_redis_lock_client(app).redis.lock( |
44 | | - PROJECT_REDIS_LOCK_KEY.format(project_uuid), |
45 | | - timeout=PROJECT_LOCK_TIMEOUT.total_seconds(), |
46 | | - ) |
47 | | - try: |
48 | | - if not await redis_lock.acquire( |
49 | | - blocking=False, |
50 | | - token=ProjectLocked( |
51 | | - value=True, |
52 | | - owner=None, |
53 | | - status=status, |
54 | | - ).json(), |
55 | | - ): |
56 | | - msg = f"Lock for project {project_uuid!r} could not be acquired" |
57 | | - raise ValueError(msg) |
58 | | - |
59 | | - with log_context( |
60 | | - _logger, |
61 | | - logging.DEBUG, |
62 | | - msg=f"with lock for {project_uuid=}:{status=}", |
63 | | - ): |
64 | | - async with periodic_task( |
65 | | - _auto_extend_project_lock, |
66 | | - interval=0.6 * PROJECT_LOCK_TIMEOUT, |
67 | | - task_name=f"{PROJECT_REDIS_LOCK_KEY.format(project_uuid)}_lock_auto_extend", |
68 | | - project_lock=redis_lock, |
69 | | - ): |
70 | | - yield |
71 | | - |
72 | | - finally: |
73 | | - try: |
74 | | - if await redis_lock.owned(): |
75 | | - await redis_lock.release() |
76 | | - except (redis.exceptions.LockError, redis.exceptions.LockNotOwnedError) as exc: |
77 | | - _logger.warning( |
78 | | - "releasing %s unexpectedly raised an exception: %s", |
79 | | - f"{redis_lock=!r}", |
80 | | - f"{exc}", |
81 | | - ) |
82 | | - |
83 | | - |
84 | 21 | async def removal_policy_task(app: FastAPI) -> None: |
85 | 22 | _logger.info("Removal policy task started") |
86 | 23 |
|
@@ -111,5 +48,11 @@ async def removal_policy_task(app: FastAPI) -> None: |
111 | 48 | _project_last_change_date, |
112 | 49 | app_settings.EFS_REMOVAL_POLICY_TASK_AGE_LIMIT_TIMEDELTA, |
113 | 50 | ) |
114 | | - async with lock_project(app, project_uuid=project_id): |
| 51 | + redis_lock = get_redis_lock_client(app).redis.lock( |
| 52 | + PROJECT_REDIS_LOCK_KEY.format(project_id), |
| 53 | + timeout=PROJECT_LOCK_TIMEOUT.total_seconds(), |
| 54 | + ) |
| 55 | + async with lock_project( |
| 56 | + redis_lock, project_uuid=project_id, status=ProjectStatus.MAINTAINING |
| 57 | + ): |
115 | 58 | await efs_manager.remove_project_efs_data(project_id) |
0 commit comments