Skip to content

Commit 693fa90

Browse files
committed
ongoing changes
1 parent a6a86a2 commit 693fa90

File tree

2 files changed

+43
-67
lines changed

2 files changed

+43
-67
lines changed
Lines changed: 24 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,87 +1,51 @@
11
import datetime
2-
import logging
3-
from asyncio.log import logger
4-
from collections.abc import AsyncIterator
5-
from contextlib import asynccontextmanager
6-
from typing import Final, TypeAlias
2+
import functools
3+
from collections.abc import Callable, Coroutine
4+
from typing import Any, Final, ParamSpec, TypeAlias, TypeVar
75

86
import redis
97
import redis.exceptions
108
from models_library.projects import ProjectID
119
from models_library.projects_access import Owner
1210
from models_library.projects_state import ProjectLocked, ProjectStatus
13-
from redis.asyncio.lock import Lock
14-
from servicelib.redis._client import RedisClientSDK
1511

16-
from .background_task import periodic_task
17-
from .logging_utils import log_context
18-
19-
_logger = logging.getLogger(__name__)
12+
from .redis import RedisClientSDK, exclusive
2013

2114
PROJECT_REDIS_LOCK_KEY: str = "project_lock:{}"
2215
PROJECT_LOCK_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(seconds=10)
23-
ProjectLock = Lock
2416

2517
ProjectLockError: TypeAlias = redis.exceptions.LockError
2618

2719

28-
async def _auto_extend_project_lock(project_lock: Lock) -> None:
29-
# NOTE: the background task already catches anything that might raise here
30-
await project_lock.reacquire()
20+
P = ParamSpec("P")
21+
R = TypeVar("R")
3122

3223

33-
@asynccontextmanager
34-
async def lock_project(
35-
redis_client: RedisClientSDK,
24+
def with_locked_project(
25+
redis_client: RedisClientSDK | Callable[..., RedisClientSDK],
3626
*,
3727
project_uuid: str | ProjectID,
3828
status: ProjectStatus,
3929
owner: Owner | None = None,
40-
) -> AsyncIterator[None]:
41-
"""Context manager to lock and unlock a project by user_id
42-
43-
Raises:
44-
ProjectLockError: if project is already locked
45-
"""
46-
47-
redis_lock = redis_client.create_lock(
48-
PROJECT_REDIS_LOCK_KEY.format(project_uuid),
49-
ttl=PROJECT_LOCK_TIMEOUT,
50-
)
51-
52-
try:
53-
if not await redis_lock.acquire(
54-
blocking=False,
55-
token=ProjectLocked(
30+
) -> Callable[
31+
[Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]]
32+
]:
33+
def _decorator(
34+
func: Callable[P, Coroutine[Any, Any, R]],
35+
) -> Callable[P, Coroutine[Any, Any, R]]:
36+
@exclusive(
37+
redis_client,
38+
lock_key=PROJECT_REDIS_LOCK_KEY.format(project_uuid),
39+
lock_value=ProjectLocked(
5640
value=True,
5741
owner=owner,
5842
status=status,
5943
).model_dump_json(),
60-
):
61-
msg = f"Lock for project {project_uuid!r} owner {owner!r} could not be acquired"
62-
raise ProjectLockError(msg)
44+
)
45+
@functools.wraps(func)
46+
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
47+
return await func(*args, **kwargs)
6348

64-
with log_context(
65-
_logger,
66-
logging.DEBUG,
67-
msg=f"with lock for {owner=}:{project_uuid=}:{status=}",
68-
):
69-
async with periodic_task(
70-
_auto_extend_project_lock,
71-
interval=0.6 * PROJECT_LOCK_TIMEOUT,
72-
task_name=f"{PROJECT_REDIS_LOCK_KEY.format(project_uuid)}_lock_auto_extend",
73-
project_lock=redis_lock,
74-
):
75-
yield
49+
return _wrapper
7650

77-
finally:
78-
# let's ensure we release that stuff
79-
try:
80-
if await redis_lock.owned():
81-
await redis_lock.release()
82-
except (redis.exceptions.LockError, redis.exceptions.LockNotOwnedError) as exc:
83-
logger.warning(
84-
"releasing %s unexpectedly raised an exception: %s",
85-
f"{redis_lock=!r}",
86-
f"{exc}",
87-
)
51+
return _decorator

services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from models_library.projects import ProjectID
66
from models_library.projects_state import ProjectStatus
77
from servicelib.logging_utils import log_context
8-
from servicelib.project_lock import lock_project
8+
from servicelib.project_lock import with_locked_project
99
from simcore_postgres_database.utils_projects import (
1010
DBProjectNotFoundError,
1111
ProjectsRepo,
@@ -18,6 +18,23 @@
1818
_logger = logging.getLogger(__name__)
1919

2020

21+
async def _remove_data_with_lock(app: FastAPI, project_id: ProjectID) -> None:
22+
# Decorate a new function that will call the necessary coroutine
23+
efs_manager: EfsManager = app.state.efs_manager
24+
25+
@with_locked_project(
26+
get_redis_lock_client(app),
27+
project_uuid=project_id,
28+
status=ProjectStatus.MAINTAINING,
29+
)
30+
async def _remove():
31+
# Call the actual coroutine function
32+
await efs_manager.remove_project_efs_data(project_id)
33+
34+
# Execute the decorated function
35+
await _remove()
36+
37+
2138
async def removal_policy_task(app: FastAPI) -> None:
2239
_logger.info("Removal policy task started")
2340

@@ -56,9 +73,4 @@ async def removal_policy_task(app: FastAPI) -> None:
5673
logging.INFO,
5774
msg=f"Removing data for project {project_id} started, project last change date {_project_last_change_date}, efs removal policy task age limit timedelta {app_settings.EFS_REMOVAL_POLICY_TASK_AGE_LIMIT_TIMEDELTA}",
5875
):
59-
async with lock_project(
60-
get_redis_lock_client(app),
61-
project_uuid=project_id,
62-
status=ProjectStatus.MAINTAINING,
63-
):
64-
await efs_manager.remove_project_efs_data(project_id)
76+
await _remove_data_with_lock(app, project_id)

0 commit comments

Comments
 (0)