Skip to content

Commit 1356488

Browse files
committed
remove usage of lock_context
1 parent ac7fafd commit 1356488

File tree

3 files changed

+41
-28
lines changed

3 files changed

+41
-28
lines changed

services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/clusters.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
import datetime
2+
13
from aws_library.ec2 import EC2InstanceData
24
from aws_library.ec2._errors import EC2InstanceNotFoundError
35
from fastapi import FastAPI
46
from models_library.api_schemas_clusters_keeper.clusters import OnDemandCluster
57
from models_library.users import UserID
68
from models_library.wallets import WalletID
79
from servicelib.rabbitmq import RPCRouter
10+
from servicelib.redis._decorators import exclusive
811

912
from ..core.settings import get_application_settings
1013
from ..modules import clusters
@@ -29,11 +32,14 @@ async def get_or_create_cluster(
2932
redis = get_redis_client(app)
3033
dask_scheduler_ready = False
3134
cluster_auth = get_scheduler_auth(app)
32-
async with redis.lock_context(
33-
f"get_or_create_cluster-{user_id=}-{wallet_id=}",
35+
36+
@exclusive(
37+
redis,
38+
lock_key=f"get_or_create_cluster-{user_id=}-{wallet_id=}",
3439
blocking=True,
35-
blocking_timeout_s=10,
36-
):
40+
blocking_timeout=datetime.timedelta(seconds=10),
41+
)
42+
async def _locked_get_or_create_cluster() -> None:
3743
try:
3844
ec2_instance = await clusters.get_cluster(
3945
app, user_id=user_id, wallet_id=wallet_id
@@ -52,6 +58,9 @@ async def get_or_create_cluster(
5258
)
5359
if dask_scheduler_ready:
5460
await clusters.cluster_heartbeat(app, user_id=user_id, wallet_id=wallet_id)
61+
62+
await _locked_get_or_create_cluster()
63+
5564
assert ec2_instance is not None # nosec
5665
app_settings = get_application_settings(app)
5766
assert app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES # nosec

services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
import logging
23

34
from fastapi import FastAPI
@@ -8,6 +9,7 @@
89
from servicelib.rabbitmq.rpc_interfaces.dynamic_sidecar.disk_usage import (
910
update_disk_usage,
1011
)
12+
from servicelib.redis._decorators import exclusive
1113
from servicelib.utils import fire_and_forget_task
1214

1315
from ..core.settings import get_application_settings
@@ -77,13 +79,13 @@ async def process_dynamic_service_running_message(app: FastAPI, data: bytes) ->
7779
msg = f"Removing write permissions inside of EFS starts for project ID: {rabbit_message.project_id}, node ID: {rabbit_message.node_id}, current user: {rabbit_message.user_id}, size: {size}, upper limit: {settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES}"
7880
with log_context(_logger, logging.WARNING, msg=msg):
7981
redis = get_redis_lock_client(app)
80-
async with redis.lock_context(
81-
f"efs_remove_write_permissions-{rabbit_message.project_id=}-{rabbit_message.node_id=}",
82+
await exclusive(
83+
redis,
84+
lock_key=f"efs_remove_write_permissions-{rabbit_message.project_id=}-{rabbit_message.node_id=}",
8285
blocking=True,
83-
blocking_timeout_s=10,
84-
):
85-
await efs_manager.remove_project_node_data_write_permissions(
86-
project_id=rabbit_message.project_id, node_id=rabbit_message.node_id
87-
)
86+
blocking_timeout=datetime.timedelta(seconds=10),
87+
)(efs_manager.remove_project_node_data_write_permissions)(
88+
project_id=rabbit_message.project_id, node_id=rabbit_message.node_id
89+
)
8890

8991
return True

services/web/server/src/simcore_service_webserver/projects/projects_api.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
""" Interface to other subsystems
1+
"""Interface to other subsystems
22
3-
- Data validation
4-
- Operations on projects
5-
- are NOT handlers, therefore do not return web.Response
6-
- return data and successful HTTP responses (or raise them)
7-
- upon failure raise errors that can be also HTTP reponses
3+
- Data validation
4+
- Operations on projects
5+
- are NOT handlers, therefore do not return web.Response
6+
- return data and successful HTTP responses (or raise them)
7+
- upon failure raise errors that can be also HTTP reponses
88
"""
99

1010
import asyncio
@@ -81,6 +81,7 @@
8181
ServiceWaitingForManualInterventionError,
8282
ServiceWasNotFoundError,
8383
)
84+
from servicelib.redis._decorators import exclusive
8485
from servicelib.utils import fire_and_forget_task, logged_gather
8586
from simcore_postgres_database.models.users import UserRole
8687
from simcore_postgres_database.utils_projects_nodes import (
@@ -488,11 +489,7 @@ def _by_type_name(ec2: EC2InstanceTypeGet) -> bool:
488489

489490
except KeyError as exc:
490491
raise InvalidKeysInResourcesSpecsError(missing_key=f"{exc}") from exc
491-
except (
492-
RemoteMethodNotRegisteredError,
493-
RPCServerError,
494-
asyncio.TimeoutError,
495-
) as exc:
492+
except (TimeoutError, RemoteMethodNotRegisteredError, RPCServerError) as exc:
496493
raise ClustersKeeperNotAvailableError from exc
497494

498495

@@ -503,7 +500,6 @@ async def _check_project_node_has_all_required_inputs(
503500
project_uuid: ProjectID,
504501
node_id: NodeID,
505502
) -> None:
506-
507503
product_name = await db.get_project_product(project_uuid)
508504
await check_user_project_permission(
509505
app,
@@ -603,13 +599,17 @@ async def _start_dynamic_service(
603599
redis_client_sdk = get_redis_lock_manager_client_sdk(request.app)
604600
project_settings: ProjectsSettings = get_plugin_settings(request.app)
605601

606-
async with redis_client_sdk.lock_context(
607-
lock_key,
602+
@exclusive(
603+
redis_client_sdk,
604+
lock_key=lock_key,
608605
blocking=True,
609-
blocking_timeout_s=_nodes_api.get_total_project_dynamic_nodes_creation_interval(
610-
project_settings.PROJECTS_MAX_NUM_RUNNING_DYNAMIC_NODES
606+
blocking_timeout=datetime.timedelta(
607+
seconds=_nodes_api.get_total_project_dynamic_nodes_creation_interval(
608+
project_settings.PROJECTS_MAX_NUM_RUNNING_DYNAMIC_NODES
609+
)
611610
),
612-
):
611+
)
612+
async def _() -> None:
613613
project_running_nodes = await dynamic_scheduler_api.list_dynamic_services(
614614
request.app, user_id=user_id, project_id=project_uuid
615615
)
@@ -759,6 +759,8 @@ async def _start_dynamic_service(
759759
),
760760
)
761761

762+
await _()
763+
762764

763765
async def add_project_node(
764766
request: web.Request,

0 commit comments

Comments
 (0)