diff --git a/packages/models-library/src/models_library/api_schemas_webserver/projects.py b/packages/models-library/src/models_library/api_schemas_webserver/projects.py index 90a031f876a..39bdfe4be26 100644 --- a/packages/models-library/src/models_library/api_schemas_webserver/projects.py +++ b/packages/models-library/src/models_library/api_schemas_webserver/projects.py @@ -302,6 +302,9 @@ class ProjectDocument(OutputSchema): model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True) +ProjectDocumentVersion: TypeAlias = int + + __all__: tuple[str, ...] = ( "EmptyModel", "ProjectCopyOverride", diff --git a/packages/service-library/src/servicelib/rest_constants.py b/packages/service-library/src/servicelib/rest_constants.py index d763657b6c9..4791b189df7 100644 --- a/packages/service-library/src/servicelib/rest_constants.py +++ b/packages/service-library/src/servicelib/rest_constants.py @@ -23,3 +23,4 @@ class PydanticExportParametersDict(TypedDict): # Headers keys X_PRODUCT_NAME_HEADER: Final[str] = "X-Simcore-Products-Name" +X_CLIENT_SESSION_ID_HEADER: Final[str] = "X-Client-Session-Id" diff --git a/services/web/server/src/simcore_service_webserver/db_listener/_db_comp_tasks_listening_task.py b/services/web/server/src/simcore_service_webserver/db_listener/_db_comp_tasks_listening_task.py index 223018ac90a..79573c7689a 100644 --- a/services/web/server/src/simcore_service_webserver/db_listener/_db_comp_tasks_listening_task.py +++ b/services/web/server/src/simcore_service_webserver/db_listener/_db_comp_tasks_listening_task.py @@ -52,7 +52,12 @@ async def _update_project_state( node_errors: list[ErrorDict] | None, ) -> None: project = await _projects_service.update_project_node_state( - app, user_id, project_uuid, node_uuid, new_state + app, + user_id, + project_uuid, + node_uuid, + new_state, + client_session_id=None, # <-- The trigger for this update is not from the UI (its db listener) ) await _projects_service.notify_project_node_update( @@ -95,6 +100,7 @@ async def _handle_db_notification( changed_row.run_hash, node_errors=changed_row.errors, ui_changed_keys=None, + client_session_id=None, # <-- The trigger for this update is not from the UI (its db listener) ) if "state" in payload.changes and (changed_row.state is not None): diff --git a/services/web/server/src/simcore_service_webserver/folders/_workspaces_repository.py b/services/web/server/src/simcore_service_webserver/folders/_workspaces_repository.py index 8fdb5e700a6..9d56d0c7236 100644 --- a/services/web/server/src/simcore_service_webserver/folders/_workspaces_repository.py +++ b/services/web/server/src/simcore_service_webserver/folders/_workspaces_repository.py @@ -8,6 +8,7 @@ from simcore_postgres_database.utils_repos import transaction_context from ..db.plugin import get_asyncpg_engine +from ..models import ClientSessionID from ..projects import _folders_repository as projects_folders_repository from ..projects import _groups_repository as projects_groups_repository from ..projects._access_rights_service import check_user_project_permission @@ -26,6 +27,7 @@ async def move_folder_into_workspace( folder_id: FolderID, workspace_id: WorkspaceID | None, product_name: ProductName, + client_session_id: ClientSessionID | None = None, ) -> None: # 1. User needs to have delete permission on source folder folder_db = await _folders_repository.get( @@ -84,6 +86,7 @@ async def move_folder_into_workspace( project_uuid=project_id, patch_project_data={"workspace_id": workspace_id}, user_primary_gid=user["primary_gid"], + client_session_id=client_session_id, ) # 5. BATCH update of folders with workspace_id diff --git a/services/web/server/src/simcore_service_webserver/folders/_workspaces_rest.py b/services/web/server/src/simcore_service_webserver/folders/_workspaces_rest.py index b327e84e574..a7ae1e1d67d 100644 --- a/services/web/server/src/simcore_service_webserver/folders/_workspaces_rest.py +++ b/services/web/server/src/simcore_service_webserver/folders/_workspaces_rest.py @@ -2,10 +2,14 @@ from aiohttp import web from servicelib.aiohttp import status -from servicelib.aiohttp.requests_validation import parse_request_path_parameters_as +from servicelib.aiohttp.requests_validation import ( + parse_request_headers_as, + parse_request_path_parameters_as, +) from .._meta import api_version_prefix as VTAG from ..login.decorators import login_required +from ..models import ClientSessionHeaderParams from ..security.decorators import permission_required from . import _workspaces_repository from ._common.exceptions_handlers import handle_plugin_requests_exceptions @@ -27,6 +31,7 @@ async def move_folder_to_workspace(request: web.Request): req_ctx = FoldersRequestContext.model_validate(request) path_params = parse_request_path_parameters_as(FolderWorkspacesPathParams, request) + header_params = parse_request_headers_as(ClientSessionHeaderParams, request) await _workspaces_repository.move_folder_into_workspace( app=request.app, @@ -34,5 +39,6 @@ async def move_folder_to_workspace(request: web.Request): folder_id=path_params.folder_id, workspace_id=path_params.workspace_id, product_name=req_ctx.product_name, + client_session_id=header_params.client_session_id, ) return web.json_response(status=status.HTTP_204_NO_CONTENT) diff --git a/services/web/server/src/simcore_service_webserver/models.py b/services/web/server/src/simcore_service_webserver/models.py index 14975acc96a..3a3b21d7c3b 100644 --- a/services/web/server/src/simcore_service_webserver/models.py +++ b/services/web/server/src/simcore_service_webserver/models.py @@ -3,9 +3,10 @@ from models_library.products import ProductName from models_library.rest_base import RequestParameters from models_library.users import UserID -from pydantic import ConfigDict, Field +from pydantic import ConfigDict, Field, StringConstraints from pydantic_extra_types.phone_numbers import PhoneNumberValidator from servicelib.request_keys import RQT_USERID_KEY +from servicelib.rest_constants import X_CLIENT_SESSION_ID_HEADER from .constants import RQ_PRODUCT_KEY @@ -16,6 +17,18 @@ ] +ClientSessionID: TypeAlias = Annotated[ + str, + StringConstraints( + strip_whitespace=True, + min_length=36, + max_length=36, + pattern=r"^[0-9a-fA-F\-]{36}$", # UUID format + strict=True, + ), +] + + class AuthenticatedRequestContext(RequestParameters): """Fields expected in the request context for authenticated endpoints""" @@ -25,3 +38,20 @@ class AuthenticatedRequestContext(RequestParameters): model_config = ConfigDict( frozen=True # prevents modifications after middlewares creates this model ) + + +assert X_CLIENT_SESSION_ID_HEADER + + +class ClientSessionHeaderParams(RequestParameters): + """Header parameters for client session tracking in collaborative features.""" + + client_session_id: ClientSessionID | None = Field( + default=None, + alias="X-Client-Session-Id", # X_CLIENT_SESSION_ID_HEADER, + description="Client session identifier for collaborative features (UUID string)", + ) + + model_config = ConfigDict( + validate_by_name=True, + ) diff --git a/services/web/server/src/simcore_service_webserver/projects/_controller/nodes_rest.py b/services/web/server/src/simcore_service_webserver/projects/_controller/nodes_rest.py index 2e2c6eb12d3..f0637cc256e 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_controller/nodes_rest.py +++ b/services/web/server/src/simcore_service_webserver/projects/_controller/nodes_rest.py @@ -35,6 +35,7 @@ from servicelib.aiohttp.long_running_tasks.server import start_long_running_task from servicelib.aiohttp.requests_validation import ( parse_request_body_as, + parse_request_headers_as, parse_request_path_parameters_as, parse_request_query_parameters_as, ) @@ -59,6 +60,7 @@ from ...groups.api import get_group_from_gid, list_all_user_groups_ids from ...groups.exceptions import GroupNotFoundError from ...login.decorators import login_required +from ...models import ClientSessionHeaderParams from ...security.decorators import permission_required from ...users import users_service from ...utils_aiohttp import envelope_json_response, get_api_base_url @@ -96,6 +98,7 @@ async def create_node(request: web.Request) -> web.Response: req_ctx = AuthenticatedRequestContext.model_validate(request) path_params = parse_request_path_parameters_as(ProjectPathParams, request) body = await parse_request_body_as(NodeCreate, request) + header_params = parse_request_headers_as(ClientSessionHeaderParams, request) if await _projects_service.is_service_deprecated( request.app, @@ -124,6 +127,7 @@ async def create_node(request: web.Request) -> web.Response: body.service_key, body.service_version, body.service_id, + client_session_id=header_params.client_session_id, ) } assert NodeCreated.model_validate(data) is not None # nosec @@ -178,6 +182,7 @@ async def patch_project_node(request: web.Request) -> web.Response: req_ctx = AuthenticatedRequestContext.model_validate(request) path_params = parse_request_path_parameters_as(NodePathParams, request) node_patch = await parse_request_body_as(NodePatch, request) + header_params = parse_request_headers_as(ClientSessionHeaderParams, request) await _projects_service.patch_project_node( request.app, @@ -187,6 +192,7 @@ async def patch_project_node(request: web.Request) -> web.Response: project_id=path_params.project_id, node_id=path_params.node_id, partial_node=node_patch.to_domain_model(), + client_session_id=header_params.client_session_id, ) return web.json_response(status=status.HTTP_204_NO_CONTENT) @@ -199,6 +205,7 @@ async def patch_project_node(request: web.Request) -> web.Response: async def delete_node(request: web.Request) -> web.Response: req_ctx = AuthenticatedRequestContext.model_validate(request) path_params = parse_request_path_parameters_as(NodePathParams, request) + header_params = parse_request_headers_as(ClientSessionHeaderParams, request) # ensure the project exists await _projects_service.get_project_for_user( @@ -213,6 +220,7 @@ async def delete_node(request: web.Request) -> web.Response: NodeIDStr(path_params.node_id), req_ctx.product_name, product_api_base_url=get_api_base_url(request), + client_session_id=header_params.client_session_id, ) return web.json_response(status=status.HTTP_204_NO_CONTENT) @@ -249,6 +257,7 @@ async def update_node_outputs(request: web.Request) -> web.Response: req_ctx = AuthenticatedRequestContext.model_validate(request) path_params = parse_request_path_parameters_as(NodePathParams, request) node_outputs = await parse_request_body_as(NodeOutputs, request) + header_params = parse_request_headers_as(ClientSessionHeaderParams, request) ui_changed_keys = set() ui_changed_keys.add(f"{path_params.node_id}") @@ -261,6 +270,7 @@ async def update_node_outputs(request: web.Request) -> web.Response: run_hash=None, node_errors=None, ui_changed_keys=ui_changed_keys, + client_session_id=header_params.client_session_id, ) return web.json_response(status=status.HTTP_204_NO_CONTENT) diff --git a/services/web/server/src/simcore_service_webserver/projects/_controller/ports_rest.py b/services/web/server/src/simcore_service_webserver/projects/_controller/ports_rest.py index c753b704343..f9336f6e7c3 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_controller/ports_rest.py +++ b/services/web/server/src/simcore_service_webserver/projects/_controller/ports_rest.py @@ -17,11 +17,13 @@ from pydantic import BaseModel, Field, TypeAdapter from servicelib.aiohttp.requests_validation import ( parse_request_body_as, + parse_request_headers_as, parse_request_path_parameters_as, ) from ..._meta import API_VTAG as VTAG from ...login.decorators import login_required +from ...models import ClientSessionHeaderParams from ...security.decorators import permission_required from ...utils_aiohttp import envelope_json_response from .. import _ports_service, _projects_service @@ -88,6 +90,7 @@ async def update_project_inputs(request: web.Request) -> web.Response: req_ctx = AuthenticatedRequestContext.model_validate(request) path_params = parse_request_path_parameters_as(ProjectPathParams, request) inputs_updates = await parse_request_body_as(list[ProjectInputUpdate], request) + header_params = parse_request_headers_as(ClientSessionHeaderParams, request) assert request.app # nosec @@ -123,6 +126,7 @@ async def update_project_inputs(request: web.Request) -> web.Response: project_uuid=path_params.project_id, product_name=req_ctx.product_name, partial_workbench_data=jsonable_encoder(partial_workbench_data), + client_session_id=header_params.client_session_id, ) workbench = TypeAdapter(dict[NodeID, Node]).validate_python( diff --git a/services/web/server/src/simcore_service_webserver/projects/_controller/projects_rest.py b/services/web/server/src/simcore_service_webserver/projects/_controller/projects_rest.py index d910769cb1d..538644fc2a2 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_controller/projects_rest.py +++ b/services/web/server/src/simcore_service_webserver/projects/_controller/projects_rest.py @@ -28,6 +28,7 @@ from ..._meta import API_VTAG as VTAG from ...login.decorators import login_required +from ...models import ClientSessionHeaderParams from ...redis import get_redis_lock_manager_client_sdk from ...resource_manager.user_sessions import PROJECT_ID_KEY, managed_resource from ...security import security_web @@ -312,6 +313,7 @@ async def patch_project(request: web.Request): req_ctx = AuthenticatedRequestContext.model_validate(request) path_params = parse_request_path_parameters_as(ProjectPathParams, request) project_patch = await parse_request_body_as(ProjectPatch, request) + header_params = parse_request_headers_as(ClientSessionHeaderParams, request) await _projects_service.patch_project_for_user( request.app, @@ -319,6 +321,7 @@ async def patch_project(request: web.Request): project_uuid=path_params.project_id, project_patch=project_patch, product_name=req_ctx.product_name, + client_session_id=header_params.client_session_id, ) return web.json_response(status=status.HTTP_204_NO_CONTENT) diff --git a/services/web/server/src/simcore_service_webserver/projects/_controller/workspaces_rest.py b/services/web/server/src/simcore_service_webserver/projects/_controller/workspaces_rest.py index 7c657dab8d0..8ca8858c983 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_controller/workspaces_rest.py +++ b/services/web/server/src/simcore_service_webserver/projects/_controller/workspaces_rest.py @@ -7,10 +7,14 @@ from models_library.workspaces import WorkspaceID from pydantic import BaseModel, BeforeValidator, ConfigDict, Field from servicelib.aiohttp import status -from servicelib.aiohttp.requests_validation import parse_request_path_parameters_as +from servicelib.aiohttp.requests_validation import ( + parse_request_headers_as, + parse_request_path_parameters_as, +) from ..._meta import api_version_prefix as VTAG from ...login.decorators import login_required +from ...models import ClientSessionHeaderParams from ...security.decorators import permission_required from .. import _workspaces_service from ._rest_exceptions import handle_plugin_requests_exceptions @@ -43,6 +47,7 @@ async def move_project_to_workspace(request: web.Request): path_params = parse_request_path_parameters_as( _ProjectWorkspacesPathParams, request ) + header_params = parse_request_headers_as(ClientSessionHeaderParams, request) await _workspaces_service.move_project_into_workspace( app=request.app, @@ -50,5 +55,6 @@ async def move_project_to_workspace(request: web.Request): project_id=path_params.project_id, workspace_id=path_params.workspace_id, product_name=req_ctx.product_name, + client_session_id=header_params.client_session_id, ) return web.json_response(status=status.HTTP_204_NO_CONTENT) diff --git a/services/web/server/src/simcore_service_webserver/projects/_project_document_service.py b/services/web/server/src/simcore_service_webserver/projects/_project_document_service.py new file mode 100644 index 00000000000..540e791a2cc --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/projects/_project_document_service.py @@ -0,0 +1,86 @@ +"""Utility functions for project document management. + +This module contains common utilities for building and versioning project documents. +""" + +from typing import cast + +from aiohttp import web +from models_library.api_schemas_webserver.projects import ( + ProjectDocument, + ProjectDocumentVersion, +) +from models_library.projects import ProjectID, ProjectTemplateType +from models_library.projects import ProjectType as ProjectTypeAPI +from servicelib.redis import ( + PROJECT_DB_UPDATE_REDIS_LOCK_KEY, + exclusive, + increment_and_return_project_document_version, +) + +from ..redis import ( + get_redis_document_manager_client_sdk, + get_redis_lock_manager_client_sdk, +) +from . import _projects_repository + + +async def create_project_document_and_increment_version( + app: web.Application, project_uuid: ProjectID +) -> tuple[ProjectDocument, ProjectDocumentVersion]: + """Build project document and increment version with Redis lock protection. + + This function is protected by Redis exclusive lock because: + - the project document and its version must be kept in sync + + Args: + app: The web application instance + project_uuid: UUID of the project + + Returns: + Tuple containing the project document and its version number + """ + + @exclusive( + get_redis_lock_manager_client_sdk(app), + lock_key=PROJECT_DB_UPDATE_REDIS_LOCK_KEY.format(project_uuid), + blocking=True, + blocking_timeout=None, # NOTE: this is a blocking call, a timeout has undefined effects + ) + async def _create_project_document_and_increment_version() -> ( + tuple[ProjectDocument, int] + ): + """This function is protected because + - the project document and its version must be kept in sync + """ + # Get the full project with workbench for document creation + project_with_workbench = await _projects_repository.get_project_with_workbench( + app=app, project_uuid=project_uuid + ) + # Create project document + project_document = ProjectDocument( + uuid=project_with_workbench.uuid, + workspace_id=project_with_workbench.workspace_id, + name=project_with_workbench.name, + description=project_with_workbench.description, + thumbnail=project_with_workbench.thumbnail, + last_change_date=project_with_workbench.last_change_date, + classifiers=project_with_workbench.classifiers, + dev=project_with_workbench.dev, + quality=project_with_workbench.quality, + workbench=project_with_workbench.workbench, + ui=project_with_workbench.ui, + type=cast(ProjectTypeAPI, project_with_workbench.type), + template_type=cast( + ProjectTemplateType, project_with_workbench.template_type + ), + ) + # Increment document version + redis_client_sdk = get_redis_document_manager_client_sdk(app) + document_version = await increment_and_return_project_document_version( + redis_client=redis_client_sdk, project_uuid=project_uuid + ) + + return project_document, document_version + + return await _create_project_document_and_increment_version() diff --git a/services/web/server/src/simcore_service_webserver/projects/_projects_repository_legacy.py b/services/web/server/src/simcore_service_webserver/projects/_projects_repository_legacy.py index 59f9ff2dc31..a6a051a5595 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_projects_repository_legacy.py +++ b/services/web/server/src/simcore_service_webserver/projects/_projects_repository_legacy.py @@ -15,7 +15,6 @@ from aiopg.sa import Engine from aiopg.sa.connection import SAConnection from aiopg.sa.result import ResultProxy, RowProxy -from models_library.api_schemas_webserver.projects import ProjectDocument from models_library.basic_types import IDStr from models_library.folders import FolderQuery, FolderScope from models_library.groups import GroupID @@ -26,7 +25,6 @@ ProjectListAtDB, ProjectTemplateType, ) -from models_library.projects import ProjectType as ProjectTypeAPI from models_library.projects_comments import CommentID, ProjectsCommentsDB from models_library.projects_nodes import Node from models_library.projects_nodes_io import NodeID, NodeIDStr @@ -44,11 +42,6 @@ from pydantic.types import PositiveInt from servicelib.aiohttp.application_keys import APP_AIOPG_ENGINE_KEY from servicelib.logging_utils import get_log_record_extra, log_context -from servicelib.redis import ( - PROJECT_DB_UPDATE_REDIS_LOCK_KEY, - exclusive, - increment_and_return_project_document_version, -) from simcore_postgres_database.aiopg_errors import UniqueViolation from simcore_postgres_database.models.groups import user_to_groups from simcore_postgres_database.models.project_to_groups import project_to_groups @@ -82,12 +75,8 @@ from tenacity.asyncio import AsyncRetrying from tenacity.retry import retry_if_exception_type -from ..redis import ( - get_redis_document_manager_client_sdk, - get_redis_lock_manager_client_sdk, -) +from ..models import ClientSessionID from ..utils import now_str -from . import _projects_repository from ._comments_repository import ( create_project_comment, delete_project_comment, @@ -96,6 +85,7 @@ total_project_comments, update_project_comment, ) +from ._project_document_service import create_project_document_and_increment_version from ._projects_repository import PROJECT_DB_COLS from ._projects_repository_legacy_utils import ( ANY_USER_ID_SENTINEL, @@ -881,6 +871,7 @@ async def update_project_node_data( node_id: NodeID, product_name: str | None, new_node_data: dict[str, Any], + client_session_id: ClientSessionID | None, ) -> tuple[ProjectDict, dict[NodeIDStr, Any]]: with log_context( _logger, @@ -897,6 +888,7 @@ async def update_project_node_data( project_uuid=project_uuid, product_name=product_name, allow_workbench_changes=False, + client_session_id=client_session_id, ) async def update_project_multiple_node_data( @@ -906,6 +898,7 @@ async def update_project_multiple_node_data( project_uuid: ProjectID, product_name: str | None, partial_workbench_data: dict[NodeIDStr, dict[str, Any]], + client_session_id: ClientSessionID | None, ) -> tuple[ProjectDict, dict[NodeIDStr, Any]]: """ Raises: @@ -923,6 +916,7 @@ async def update_project_multiple_node_data( project_uuid=project_uuid, product_name=product_name, allow_workbench_changes=False, + client_session_id=client_session_id, ) async def _update_project_workbench_with_lock_and_notify( @@ -933,6 +927,7 @@ async def _update_project_workbench_with_lock_and_notify( project_uuid: ProjectID, product_name: str | None = None, allow_workbench_changes: bool, + client_session_id: ClientSessionID | None, ) -> tuple[ProjectDict, dict[NodeIDStr, Any]]: """ Updates project workbench with Redis lock and user notification. @@ -953,73 +948,25 @@ async def _update_project_workbench_with_lock_and_notify( async with self.engine.acquire() as conn: user_primary_gid = await self._get_user_primary_group_gid(conn, user_id) - # 10 concurrent calls - @exclusive( - get_redis_lock_manager_client_sdk(self._app), - lock_key=PROJECT_DB_UPDATE_REDIS_LOCK_KEY.format(project_uuid), - blocking=True, - blocking_timeout=None, # NOTE: this is a blocking call, a timeout has undefined effects + # Update the workbench + updated_project, changed_entries = await self._update_project_workbench( + partial_workbench_data, + user_id=user_id, + project_uuid=f"{project_uuid}", + product_name=product_name, + allow_workbench_changes=allow_workbench_changes, ) - async def _update_workbench_and_notify() -> ( - tuple[ProjectDict, dict[NodeIDStr, Any], ProjectDocument, int] - ): - """This function is protected because - - the project document and its version must be kept in sync - """ - # Update the workbench work since it's atomic - updated_project, changed_entries = await self._update_project_workbench( - partial_workbench_data, - user_id=user_id, - project_uuid=f"{project_uuid}", - product_name=product_name, - allow_workbench_changes=allow_workbench_changes, - ) - # the update project with last_modified timestamp latest is the last - - # Get the full project with workbench for document creation - project_with_workbench = ( - await _projects_repository.get_project_with_workbench( - app=self._app, project_uuid=project_uuid - ) - ) - - # Create project document - project_document = ProjectDocument( - uuid=project_with_workbench.uuid, - workspace_id=project_with_workbench.workspace_id, - name=project_with_workbench.name, - description=project_with_workbench.description, - thumbnail=project_with_workbench.thumbnail, - last_change_date=project_with_workbench.last_change_date, - classifiers=project_with_workbench.classifiers, - dev=project_with_workbench.dev, - quality=project_with_workbench.quality, - workbench=project_with_workbench.workbench, - ui=project_with_workbench.ui, - type=cast(ProjectTypeAPI, project_with_workbench.type), - template_type=cast( - ProjectTemplateType, project_with_workbench.template_type - ), - ) - - # Increment document version and notify users - redis_client_sdk = get_redis_document_manager_client_sdk(self._app) - document_version = await increment_and_return_project_document_version( - redis_client=redis_client_sdk, project_uuid=project_uuid - ) - - return updated_project, changed_entries, project_document, document_version ( - updated_project, - changed_entries, project_document, document_version, - ) = await _update_workbench_and_notify() + ) = await create_project_document_and_increment_version(self._app, project_uuid) + await notify_project_document_updated( app=self._app, project_id=project_uuid, user_primary_gid=user_primary_gid, + client_session_id=client_session_id, version=document_version, document=project_document, ) @@ -1103,6 +1050,7 @@ async def add_project_node( node: ProjectNodeCreate, old_struct_node: Node, product_name: str, + client_session_id: ClientSessionID | None, ) -> None: # NOTE: permission check is done currently in update_project_workbench! partial_workbench_data: dict[NodeIDStr, Any] = { @@ -1117,13 +1065,18 @@ async def add_project_node( project_uuid=project_id, product_name=product_name, allow_workbench_changes=True, + client_session_id=client_session_id, ) project_nodes_repo = ProjectNodesRepo(project_uuid=project_id) async with self.engine.acquire() as conn: await project_nodes_repo.add(conn, nodes=[node]) async def remove_project_node( - self, user_id: UserID, project_id: ProjectID, node_id: NodeID + self, + user_id: UserID, + project_id: ProjectID, + node_id: NodeID, + client_session_id: ClientSessionID | None, ) -> None: # NOTE: permission check is done currently in update_project_workbench! partial_workbench_data: dict[NodeIDStr, Any] = { @@ -1134,6 +1087,7 @@ async def remove_project_node( user_id=user_id, project_uuid=project_id, allow_workbench_changes=True, + client_session_id=client_session_id, ) project_nodes_repo = ProjectNodesRepo(project_uuid=project_id) async with self.engine.acquire() as conn: diff --git a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py index ca9da7873c6..96ea59ba24e 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py @@ -30,16 +30,15 @@ DynamicServiceStop, ) from models_library.api_schemas_webserver.projects import ( - ProjectDocument, ProjectGet, ProjectPatch, ) +from models_library.api_schemas_webserver.socketio import SocketIORoomStr from models_library.basic_types import KeyIDStr from models_library.errors import ErrorDict from models_library.groups import GroupID from models_library.products import ProductName -from models_library.projects import Project, ProjectID, ProjectTemplateType -from models_library.projects import ProjectType as ProjectTypeAPI +from models_library.projects import Project, ProjectID from models_library.projects_access import Owner from models_library.projects_nodes import Node, NodeState, PartialNode from models_library.projects_nodes_io import NodeID, NodeIDStr, PortLink @@ -84,10 +83,8 @@ ServiceWasNotFoundError, ) from servicelib.redis import ( - PROJECT_DB_UPDATE_REDIS_LOCK_KEY, exclusive, get_project_locked_state, - increment_and_return_project_document_version, is_project_locked, with_project_locked, ) @@ -104,12 +101,10 @@ from ..catalog import catalog_service from ..director_v2 import director_v2_service from ..dynamic_scheduler import api as dynamic_scheduler_service +from ..models import ClientSessionID from ..products import products_web from ..rabbitmq import get_rabbitmq_rpc_client -from ..redis import ( - get_redis_document_manager_client_sdk, - get_redis_lock_manager_client_sdk, -) +from ..redis import get_redis_lock_manager_client_sdk from ..resource_manager.models import UserSession from ..resource_manager.user_sessions import ( PROJECT_ID_KEY, @@ -122,6 +117,7 @@ send_message_to_standard_group, send_message_to_user, ) +from ..socketio.server import get_socket_server from ..storage import api as storage_service from ..user_preferences import user_preferences_service from ..user_preferences.user_preferences_service import ( @@ -144,6 +140,7 @@ has_user_project_access_rights, ) from ._nodes_utils import set_reservation_same_as_limit, validate_new_service_resources +from ._project_document_service import create_project_document_and_increment_version from ._projects_repository_legacy import APP_PROJECT_DBAPI, ProjectDBAPI from ._projects_repository_legacy_utils import PermissionStr from ._socketio_service import notify_project_document_updated @@ -169,9 +166,7 @@ from .settings import ProjectsSettings, get_plugin_settings from .utils import extract_dns_without_default_port -log = logging.getLogger(__name__) - -PROJECT_REDIS_LOCK_KEY: str = "project:{}" +_logger = logging.getLogger(__name__) async def patch_project_and_notify_users( @@ -180,6 +175,7 @@ async def patch_project_and_notify_users( project_uuid: ProjectID, patch_project_data: dict[str, Any], user_primary_gid: GroupID, + client_session_id: ClientSessionID | None, ) -> None: """ Patches a project and notifies users involved in the project with version control. @@ -202,52 +198,20 @@ async def patch_project_and_notify_users( thread-safe operations on the project document. """ - @exclusive( - get_redis_lock_manager_client_sdk(app), - lock_key=PROJECT_DB_UPDATE_REDIS_LOCK_KEY.format(project_uuid), - blocking=True, - blocking_timeout=None, # NOTE: this is a blocking call, a timeout has undefined effects + await _projects_repository.patch_project( + app=app, + project_uuid=project_uuid, + new_partial_project_data=patch_project_data, ) - async def _patch_and_create_project_document() -> tuple[ProjectDocument, int]: - """This function is protected because - - the project document and its version must be kept in sync - """ - await _projects_repository.patch_project( - app=app, - project_uuid=project_uuid, - new_partial_project_data=patch_project_data, - ) - project_with_workbench = await _projects_repository.get_project_with_workbench( - app=app, project_uuid=project_uuid - ) - project_document = ProjectDocument( - uuid=project_with_workbench.uuid, - workspace_id=project_with_workbench.workspace_id, - name=project_with_workbench.name, - description=project_with_workbench.description, - thumbnail=project_with_workbench.thumbnail, - last_change_date=project_with_workbench.last_change_date, - classifiers=project_with_workbench.classifiers, - dev=project_with_workbench.dev, - quality=project_with_workbench.quality, - workbench=project_with_workbench.workbench, - ui=project_with_workbench.ui, - type=cast(ProjectTypeAPI, project_with_workbench.type), - template_type=cast( - ProjectTemplateType, project_with_workbench.template_type - ), - ) - redis_client_sdk = get_redis_document_manager_client_sdk(app) - document_version = await increment_and_return_project_document_version( - redis_client=redis_client_sdk, project_uuid=project_uuid - ) - return project_document, document_version - project_document, document_version = await _patch_and_create_project_document() + project_document, document_version = ( + await create_project_document_and_increment_version(app, project_uuid) + ) await notify_project_document_updated( app=app, project_id=project_uuid, user_primary_gid=user_primary_gid, + client_session_id=client_session_id, version=document_version, document=project_document, ) @@ -335,17 +299,17 @@ async def get_project_for_user( async def get_project_type( app: web.Application, project_uuid: ProjectID ) -> ProjectType: - db: ProjectDBAPI = app[APP_PROJECT_DBAPI] - assert db # nosec - return await db.get_project_type(project_uuid) + db_legacy: ProjectDBAPI = app[APP_PROJECT_DBAPI] + assert db_legacy # nosec + return await db_legacy.get_project_type(project_uuid) async def get_project_dict_legacy( app: web.Application, project_uuid: ProjectID ) -> ProjectDict: - db: ProjectDBAPI = app[APP_PROJECT_DBAPI] - assert db # nosec - project, _ = await db.get_project_dict_and_type( + db_legacy: ProjectDBAPI = app[APP_PROJECT_DBAPI] + assert db_legacy # nosec + project, _ = await db_legacy.get_project_dict_and_type( f"{project_uuid}", ) return project @@ -394,12 +358,19 @@ async def patch_project_for_user( project_uuid: ProjectID, project_patch: ProjectPatch | ProjectPatchInternalExtended, product_name: ProductName, + client_session_id: ClientSessionID | None, ): + # client_session_id (str | None): The session ID of the frontend client making the request. + # This is used to distinguish between multiple sessions a user may have open. + # In scenarios with optimistic UI updates, if a change is made from one session, + # that session can ignore the notification it published to all sessions (including other users), + # preventing redundant updates in the originating session. + patch_project_data = project_patch.to_domain_model() - db: ProjectDBAPI = app[APP_PROJECT_DBAPI] + db_legacy: ProjectDBAPI = app[APP_PROJECT_DBAPI] # 1. Get project - project_db = await db.get_project_db(project_uuid=project_uuid) + project_db = await db_legacy.get_project_db(project_uuid=project_uuid) # 2. Check user permissions _user_project_access_rights = await check_user_project_permission( @@ -456,6 +427,7 @@ async def patch_project_for_user( project_uuid=project_uuid, patch_project_data=patch_project_data, user_primary_gid=current_user["primary_gid"], + client_session_id=client_session_id, ) @@ -516,7 +488,7 @@ async def submit_delete_project_task( user_id, simcore_user_agent, remove_project_dynamic_services, - log, + _logger, ) return task @@ -612,7 +584,7 @@ def _by_type_name(ec2: EC2InstanceTypeGet) -> bool: service_to_resources[1].resources["RAM"].limit ), ) - log.debug( + _logger.debug( "the most hungry service is %s", f"{scalable_service_name=}:{hungry_service_resources}", ) @@ -750,7 +722,7 @@ async def _start_dynamic_service( # noqa: C901 ) except ProjectNodeRequiredInputsNotSetError as e: if graceful_start: - log.info( + _logger.info( "Did not start '%s' because of missing required inputs: %s", node_uuid, e, @@ -940,8 +912,9 @@ async def add_project_node( service_key: ServiceKey, service_version: ServiceVersion, service_id: str | None, + client_session_id: ClientSessionID | None, ) -> NodeID: - log.debug( + _logger.debug( "starting node %s:%s in project %s for user %s", service_key, service_version, @@ -962,9 +935,9 @@ async def add_project_node( default_resources = await catalog_service.get_service_resources( request.app, user_id, service_key, service_version ) - db: ProjectDBAPI = ProjectDBAPI.get_from_app_context(request.app) - assert db # nosec - await db.add_project_node( + db_legacy: ProjectDBAPI = ProjectDBAPI.get_from_app_context(request.app) + assert db_legacy # nosec + await db_legacy.add_project_node( user_id, ProjectID(project["uuid"]), ProjectNodeCreate( @@ -982,6 +955,7 @@ async def add_project_node( } ), product_name, + client_session_id=client_session_id, ) # also ensure the project is updated by director-v2 since services @@ -1075,8 +1049,9 @@ async def delete_project_node( node_uuid: NodeIDStr, product_name: ProductName, product_api_base_url: str, + client_session_id: ClientSessionID | None, ) -> None: - log.debug( + _logger.debug( "deleting node %s in project %s for user %s", node_uuid, project_uuid, user_id ) @@ -1114,9 +1089,11 @@ async def delete_project_node( ) # remove the node from the db - db: ProjectDBAPI = request.app[APP_PROJECT_DBAPI] - assert db # nosec - await db.remove_project_node(user_id, project_uuid, NodeID(node_uuid)) + db_legacy: ProjectDBAPI = request.app[APP_PROJECT_DBAPI] + assert db_legacy # nosec + await db_legacy.remove_project_node( + user_id, project_uuid, NodeID(node_uuid), client_session_id=client_session_id + ) # also ensure the project is updated by director-v2 since services product_name = products_web.get_product_name(request) await director_v2_service.create_or_update_pipeline( @@ -1130,9 +1107,11 @@ async def delete_project_node( async def update_project_linked_product( app: web.Application, project_id: ProjectID, product_name: str ) -> None: - with log_context(log, level=logging.DEBUG, msg="updating project linked product"): - db: ProjectDBAPI = app[APP_PROJECT_DBAPI] - await db.upsert_project_linked_product(project_id, product_name) + with log_context( + _logger, level=logging.DEBUG, msg="updating project linked product" + ): + db_legacy: ProjectDBAPI = app[APP_PROJECT_DBAPI] + await db_legacy.upsert_project_linked_product(project_id, product_name) async def update_project_node_state( @@ -1141,16 +1120,17 @@ async def update_project_node_state( project_id: ProjectID, node_id: NodeID, new_state: str, + client_session_id: ClientSessionID | None, ) -> dict: - log.debug( + _logger.debug( "updating node %s current state in project %s for user %s", node_id, project_id, user_id, ) - db: ProjectDBAPI = app[APP_PROJECT_DBAPI] - product_name = await db.get_project_product(project_id) + db_legacy: ProjectDBAPI = app[APP_PROJECT_DBAPI] + product_name = await db_legacy.get_project_product(project_id) await check_user_project_permission( app, project_id=project_id, @@ -1161,12 +1141,13 @@ async def update_project_node_state( # Delete this once workbench is removed from the projects table # See: https://github.com/ITISFoundation/osparc-simcore/issues/7046 - await db.update_project_node_data( + await db_legacy.update_project_node_data( user_id=user_id, project_uuid=project_id, node_id=node_id, product_name=None, new_node_data={"state": {"currentStatus": new_state}}, + client_session_id=client_session_id, ) await _projects_nodes_repository.update( @@ -1183,8 +1164,8 @@ async def update_project_node_state( async def is_project_hidden(app: web.Application, project_id: ProjectID) -> bool: - db: ProjectDBAPI = app[APP_PROJECT_DBAPI] - return await db.is_hidden(project_id) + db_legacy: ProjectDBAPI = app[APP_PROJECT_DBAPI] + return await db_legacy.is_hidden(project_id) async def patch_project_node( @@ -1196,12 +1177,13 @@ async def patch_project_node( project_id: ProjectID, node_id: NodeID, partial_node: PartialNode, + client_session_id: ClientSessionID | None, ) -> None: _node_patch_exclude_unset: dict[str, Any] = partial_node.model_dump( mode="json", exclude_unset=True, by_alias=True ) - _projects_repository = ProjectDBAPI.get_from_app_context(app) + _projects_repository_legacy = ProjectDBAPI.get_from_app_context(app) # 1. Check user permissions await check_user_project_permission( @@ -1214,7 +1196,7 @@ async def patch_project_node( # 2. If patching service key or version make sure it's valid if _node_patch_exclude_unset.get("key") or _node_patch_exclude_unset.get("version"): - _project, _ = await _projects_repository.get_project_dict_and_type( + _project, _ = await _projects_repository_legacy.get_project_dict_and_type( project_uuid=f"{project_id}" ) _project_node_data = _project["workbench"][f"{node_id}"] @@ -1233,12 +1215,13 @@ async def patch_project_node( ) # 3. Patch the project node - updated_project, _ = await _projects_repository.update_project_node_data( + updated_project, _ = await _projects_repository_legacy.update_project_node_data( user_id=user_id, project_uuid=project_id, node_id=node_id, product_name=product_name, new_node_data=_node_patch_exclude_unset, + client_session_id=client_session_id, ) await _projects_nodes_repository.update( @@ -1282,11 +1265,12 @@ async def update_project_node_outputs( node_id: NodeID, new_outputs: dict | None, new_run_hash: str | None, + client_session_id: ClientSessionID | None, ) -> tuple[dict, list[str]]: """ Updates outputs of a given node in a project with 'data' """ - log.debug( + _logger.debug( "updating node %s outputs in project %s for user %s with %s: run_hash [%s]", node_id, project_id, @@ -1297,8 +1281,8 @@ async def update_project_node_outputs( ) new_outputs = new_outputs or {} - db: ProjectDBAPI = app[APP_PROJECT_DBAPI] - product_name = await db.get_project_product(project_id) + db_legacy: ProjectDBAPI = app[APP_PROJECT_DBAPI] + product_name = await db_legacy.get_project_product(project_id) await check_user_project_permission( app, project_id=project_id, @@ -1307,12 +1291,13 @@ async def update_project_node_outputs( permission="write", # NOTE: MD: before only read was sufficient, double check this ) - updated_project, changed_entries = await db.update_project_node_data( + updated_project, changed_entries = await db_legacy.update_project_node_data( user_id=user_id, project_uuid=project_id, node_id=node_id, product_name=None, new_node_data={"outputs": new_outputs, "runHash": new_run_hash}, + client_session_id=client_session_id, ) await _projects_nodes_repository.update( @@ -1324,7 +1309,7 @@ async def update_project_node_outputs( ), ) - log.debug( + _logger.debug( "patched project %s, following entries changed: %s", project_id, pformat(changed_entries), @@ -1348,8 +1333,8 @@ async def list_node_ids_in_project( project_uuid: ProjectID, ) -> set[NodeID]: """Returns a set with all the node_ids from a project's workbench""" - db: ProjectDBAPI = app[APP_PROJECT_DBAPI] - return await db.list_node_ids_in_project(project_uuid) + db_legacy: ProjectDBAPI = app[APP_PROJECT_DBAPI] + return await db_legacy.list_node_ids_in_project(project_uuid) async def is_node_id_present_in_any_project_workbench( @@ -1357,8 +1342,8 @@ async def is_node_id_present_in_any_project_workbench( node_id: NodeID, ) -> bool: """If the node_id is presnet in one of the projects' workbenche returns True""" - db: ProjectDBAPI = app[APP_PROJECT_DBAPI] - return await db.node_id_exists(node_id) + db_legacy: ProjectDBAPI = app[APP_PROJECT_DBAPI] + return await db_legacy.node_id_exists(node_id) async def _safe_retrieve( @@ -1367,7 +1352,7 @@ async def _safe_retrieve( try: await dynamic_scheduler_service.retrieve_inputs(app, node_id, port_keys) except RPCServerError as exc: - log.warning( + _logger.warning( "Unable to call :retrieve endpoint on service %s, keys: [%s]: error: [%s]", node_id, port_keys, @@ -1381,7 +1366,7 @@ async def _trigger_connected_service_retrieve( project_id = project["uuid"] if await is_project_locked(get_redis_lock_manager_client_sdk(app), project_id): # NOTE: we log warn since this function is fire&forget and raise an exception would not be anybody to handle it - log.warning( + _logger.warning( "Skipping service retrieval because project with %s is currently locked." "Operation triggered by %s", f"{project_id=}", @@ -1457,7 +1442,7 @@ async def _clean_user_disconnected_clients( for u in users_sessions_ids: with managed_resource(u.user_id, u.client_session_id, app) as user_session: if await user_session.get_socket_id() is None: - log.debug( + _logger.debug( "removing disconnected project of user %s/%s", u.user_id, u.client_session_id, @@ -1465,6 +1450,35 @@ async def _clean_user_disconnected_clients( await user_session.remove(PROJECT_ID_KEY) +async def _leave_project_room( + *, + app: web.Application, + user_id: UserID, + client_session_id: ClientSessionID, + project_uuid: ProjectID, + user_session, +) -> None: + """Helper function to leave a project room via socketio""" + socket_id = await user_session.get_socket_id() + if socket_id is not None: + _logger.debug( + "User %s/%s is leaving project room %s with socket_id %s", + user_id, + client_session_id, + project_uuid, + socket_id, + ) + sio = get_socket_server(app) + await sio.leave_room(socket_id, SocketIORoomStr.from_project_id(project_uuid)) + else: + _logger.error( + "User %s/%s has no socket_id, cannot leave project room %s", + user_id, + client_session_id, + project_uuid, + ) + + def create_user_notification_cb( user_id: UserID, project_uuid: ProjectID, app: web.Application ): @@ -1477,7 +1491,7 @@ async def _notification_cb() -> None: async def try_open_project_for_user( user_id: UserID, project_uuid: ProjectID, - client_session_id: str, + client_session_id: ClientSessionID, app: web.Application, *, max_number_of_opened_projects_per_user: int | None, @@ -1583,7 +1597,7 @@ async def _open_project() -> bool: async def close_project_for_user( user_id: UserID, project_uuid: ProjectID, - client_session_id: str, + client_session_id: ClientSessionID, app: web.Application, simcore_user_agent: str, *, @@ -1603,9 +1617,18 @@ async def close_project_for_user( # remove the project from our list of opened ones await user_session.remove(key=PROJECT_ID_KEY) + # remove the clent session from the project room + await _leave_project_room( + app=app, + user_id=user_id, + client_session_id=client_session_id, + project_uuid=project_uuid, + user_session=user_session, + ) + # check it is not opened by someone else all_user_sessions_with_project.remove(current_user_session) - log.debug("remaining user_to_session_ids: %s", all_user_sessions_with_project) + _logger.debug("remaining user_to_session_ids: %s", all_user_sessions_with_project) if not all_user_sessions_with_project: # NOTE: depending on the garbage collector speed, it might already be removing it remove_services_task = remove_project_dynamic_services( @@ -1647,7 +1670,7 @@ async def _get_project_share_state( ) app_settings = get_application_settings(app) if prj_locked_state: - log.debug( + _logger.debug( "project [%s] is currently locked: %s", f"{project_uuid=}", f"{prj_locked_state=}", @@ -1695,12 +1718,12 @@ async def _get_project_share_state( if not set_user_ids: # no one has the project, so it is unlocked and closed. - log.debug("project [%s] is not in use", f"{project_uuid=}") + _logger.debug("project [%s] is not in use", f"{project_uuid=}") return ProjectShareState( status=ProjectStatus.CLOSED, locked=False, current_user_groupids=[] ) - log.debug( + _logger.debug( "project [%s] might be used by the following users: [%s]", f"{project_uuid=}", f"{set_user_ids=}", @@ -1714,7 +1737,7 @@ async def _get_project_share_state( user_sessions_with_project, app ): # in this case the project is re-openable by the same user until it gets closed - log.debug( + _logger.debug( "project [%s] is in use by the same user [%s] that is currently disconnected, so it is unlocked for this specific user and opened", f"{project_uuid=}", f"{set_user_ids=}", @@ -1751,7 +1774,7 @@ async def add_project_states_for_user( is_template: bool, app: web.Application, ) -> ProjectDict: - log.debug( + _logger.debug( "adding project states for %s with project %s", f"{user_id=}", f"{project['uuid']=}", @@ -1983,7 +2006,7 @@ async def remove_project_dynamic_services( # NOTE: during the closing process, which might take awhile, # the project is locked so no one opens it at the same time - log.debug( + _logger.debug( "removing project interactive services for project [%s] and user [%s]", project_uuid, user_id, diff --git a/services/web/server/src/simcore_service_webserver/projects/_socketio_service.py b/services/web/server/src/simcore_service_webserver/projects/_socketio_service.py index be1322f35dd..97b39eece44 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_socketio_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/_socketio_service.py @@ -8,6 +8,7 @@ from pydantic import AliasGenerator, BaseModel, ConfigDict from pydantic.alias_generators import to_camel +from ..models import ClientSessionID from ..socketio.messages import send_message_to_project_room SOCKET_IO_PROJECT_DOCUMENT_UPDATED_EVENT: Final[str] = "projectDocument:updated" @@ -26,6 +27,7 @@ class BaseEvent(BaseModel): class ProjectDocumentEvent(BaseEvent): project_id: ProjectID user_primary_gid: GroupID + client_session_id: ClientSessionID | None version: int document: ProjectDocument @@ -35,6 +37,7 @@ async def notify_project_document_updated( *, project_id: ProjectID, user_primary_gid: GroupID, + client_session_id: ClientSessionID | None, version: int, document: ProjectDocument, ) -> None: @@ -44,6 +47,7 @@ async def notify_project_document_updated( **ProjectDocumentEvent( project_id=project_id, user_primary_gid=user_primary_gid, + client_session_id=client_session_id, version=version, document=document, ).model_dump(mode="json", by_alias=True), diff --git a/services/web/server/src/simcore_service_webserver/projects/_trash_service.py b/services/web/server/src/simcore_service_webserver/projects/_trash_service.py index a1c711688ad..ca85d54384e 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_trash_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/_trash_service.py @@ -98,6 +98,7 @@ async def trash_project( trashed_explicitly=explicit, trashed_by=user_id, ), + client_session_id=None, ) @@ -117,6 +118,7 @@ async def untrash_project( project_patch=ProjectPatchInternalExtended( trashed_at=None, trashed_explicitly=False, trashed_by=None ), + client_session_id=None, ) diff --git a/services/web/server/src/simcore_service_webserver/projects/_workspaces_service.py b/services/web/server/src/simcore_service_webserver/projects/_workspaces_service.py index afeaf66b584..bb57236b800 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_workspaces_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/_workspaces_service.py @@ -8,6 +8,7 @@ from simcore_postgres_database.utils_repos import transaction_context from ..db.plugin import get_asyncpg_engine +from ..models import ClientSessionID from ..users import users_service from ..workspaces.api import check_user_workspace_access from . import _folders_repository, _groups_repository, _projects_service @@ -24,6 +25,7 @@ async def move_project_into_workspace( project_id: ProjectID, workspace_id: WorkspaceID | None, product_name: ProductName, + client_session_id: ClientSessionID | None = None, ) -> None: # 1. User needs to have delete permission on project project_access_rights = await get_user_project_access_rights( @@ -57,6 +59,7 @@ async def move_project_into_workspace( project_uuid=project_id, patch_project_data={"workspace_id": workspace_id}, user_primary_gid=user["primary_gid"], + client_session_id=client_session_id, ) # 5. Remove all project permissions, leave only the user who moved the project diff --git a/services/web/server/src/simcore_service_webserver/projects/nodes_utils.py b/services/web/server/src/simcore_service_webserver/projects/nodes_utils.py index 4cf8a690aee..00f9c25b8a3 100644 --- a/services/web/server/src/simcore_service_webserver/projects/nodes_utils.py +++ b/services/web/server/src/simcore_service_webserver/projects/nodes_utils.py @@ -1,6 +1,4 @@ import logging -from collections import deque -from collections.abc import Coroutine from typing import Any from aiohttp import web @@ -8,12 +6,11 @@ from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.users import UserID -from servicelib.aiohttp.application_keys import APP_FIRE_AND_FORGET_TASKS_KEY from servicelib.logging_utils import log_decorator -from servicelib.utils import fire_and_forget_task, logged_gather +from servicelib.utils import logged_gather +from ..models import ClientSessionID from . import _projects_service -from .utils import get_frontend_node_outputs_changes log = logging.getLogger(__name__) @@ -42,6 +39,7 @@ async def update_node_outputs( outputs: dict, run_hash: str | None, node_errors: list[ErrorDict] | None, + client_session_id: ClientSessionID | None, *, ui_changed_keys: set[str] | None, ) -> None: @@ -53,6 +51,7 @@ async def update_node_outputs( node_uuid, new_outputs=outputs, new_run_hash=run_hash, + client_session_id=client_session_id, ) await _projects_service.notify_project_node_update( @@ -89,48 +88,3 @@ async def update_node_outputs( await _projects_service.post_trigger_connected_service_retrieve( app=app, project=project, updated_node_uuid=f"{node_uuid}", changed_keys=keys ) - - -async def update_frontend_outputs( - app: web.Application, - user_id: UserID, - project_uuid: ProjectID, - old_project: dict[str, Any], - new_project: dict[str, Any], -) -> None: - old_workbench = old_project["workbench"] - new_workbench = new_project["workbench"] - frontend_nodes_update_tasks: deque[Coroutine] = deque() - - for node_key, node in new_workbench.items(): - old_node = old_workbench.get(node_key) - if not old_node: - continue - - # check if there were any changes in the outputs of - # frontend services - # NOTE: for now only file-picker is handled - outputs_changes: set[str] = get_frontend_node_outputs_changes( - new_node=node, old_node=old_node - ) - - if len(outputs_changes) > 0: - frontend_nodes_update_tasks.append( - update_node_outputs( - app=app, - user_id=user_id, - project_uuid=project_uuid, - node_uuid=node_key, - outputs=node.get("outputs", {}), - run_hash=None, - node_errors=None, - ui_changed_keys=outputs_changes, - ) - ) - - for task_index, frontend_node_update_task in enumerate(frontend_nodes_update_tasks): - fire_and_forget_task( - frontend_node_update_task, - task_suffix_name=f"frontend_node_update_task_{task_index}", - fire_and_forget_tasks_collection=app[APP_FIRE_AND_FORGET_TASKS_KEY], - ) diff --git a/services/web/server/tests/unit/with_dbs/02/test_projects_crud_handlers__patch.py b/services/web/server/tests/unit/with_dbs/02/test_projects_crud_handlers__patch.py index bf949203f93..b07bb757b7c 100644 --- a/services/web/server/tests/unit/with_dbs/02/test_projects_crud_handlers__patch.py +++ b/services/web/server/tests/unit/with_dbs/02/test_projects_crud_handlers__patch.py @@ -7,7 +7,9 @@ import json +import uuid from http import HTTPStatus +from unittest.mock import patch import pytest from aiohttp.test_utils import TestClient @@ -185,3 +187,150 @@ async def test_patch_project( assert data["ui"] == _patch_ui_2["ui"] assert data["quality"] == _patch_quality["quality"] assert data["dev"] == _patch_dev["dev"] + + +@pytest.mark.parametrize( + "user_role,expected", [(UserRole.USER, status.HTTP_204_NO_CONTENT)] +) +async def test_patch_project_with_client_session_header( + client: TestClient, + logged_user: UserInfoDict, + user_project: ProjectDict, + expected: HTTPStatus, +): + assert client.app + base_url = client.app.router["patch_project"].url_for( + project_id=user_project["uuid"] + ) + + # Generate a valid UUID for client session ID + client_session_id = str(uuid.uuid4()) + + # Test patch with X-Client-Session-Id header - should succeed + resp = await client.patch( + f"{base_url}", + data=json.dumps( + { + "name": "testing-name-with-session", + "description": "testing-description-with-session", + } + ), + headers={"X-Client-Session-Id": client_session_id}, + ) + await assert_status(resp, expected) + + # Test patch without X-Client-Session-Id header - should also succeed (header is optional) + resp = await client.patch( + f"{base_url}", + data=json.dumps( + { + "name": "testing-name-without-session", + "description": "testing-description-without-session", + } + ), + ) + await assert_status(resp, expected) + + # Test patch with invalid X-Client-Session-Id header - should fail with validation error + resp = await client.patch( + f"{base_url}", + data=json.dumps( + { + "name": "testing-name-invalid-session", + "description": "testing-description-invalid-session", + } + ), + headers={"X-Client-Session-Id": "invalid-uuid-format"}, + ) + # This should fail validation since it's not a proper UUID + await assert_status(resp, status.HTTP_422_UNPROCESSABLE_ENTITY) + + +@pytest.mark.parametrize( + "user_role,expected", [(UserRole.USER, status.HTTP_204_NO_CONTENT)] +) +async def test_patch_project_with_mocked_header_parsing( + client: TestClient, + logged_user: UserInfoDict, + user_project: ProjectDict, + expected: HTTPStatus, +): + """Test that header_params = parse_request_headers_as(ClientSessionHeaderParams, request) works correctly""" + assert client.app + base_url = client.app.router["patch_project"].url_for( + project_id=user_project["uuid"] + ) + + # Generate a valid client session ID + test_client_session_id = str(uuid.uuid4()) + + # Mock the _projects_service.patch_project_for_user to spy on the client_session_id parameter + with patch( + "simcore_service_webserver.projects._controller.projects_rest._projects_service.patch_project_for_user" + ) as mock_patch_project_service: + # Make the service call succeed + mock_patch_project_service.return_value = None + + # Make the PATCH request with client session header + resp = await client.patch( + f"{base_url}", + data=json.dumps( + { + "name": "testing-name-with-mocked-header", + "description": "testing-description-with-mocked-header", + } + ), + headers={"X-Client-Session-Id": test_client_session_id}, + ) + await assert_status(resp, expected) + + # Verify that patch_project_for_user was called with the correct client_session_id + mock_patch_project_service.assert_called_once() + call_args = mock_patch_project_service.call_args + + # Extract the client_session_id from the call arguments + assert "client_session_id" in call_args.kwargs + assert call_args.kwargs["client_session_id"] == test_client_session_id + + +@pytest.mark.parametrize( + "user_role,expected", [(UserRole.USER, status.HTTP_204_NO_CONTENT)] +) +async def test_patch_project_without_client_session_header( + client: TestClient, + logged_user: UserInfoDict, + user_project: ProjectDict, + expected: HTTPStatus, +): + """Test patch project works when X-Client-Session-Id header is not provided""" + assert client.app + base_url = client.app.router["patch_project"].url_for( + project_id=user_project["uuid"] + ) + + # Mock the _projects_service.patch_project_for_user to spy on the client_session_id parameter + with patch( + "simcore_service_webserver.projects._controller.projects_rest._projects_service.patch_project_for_user" + ) as mock_patch_project_service: + # Make the service call succeed + mock_patch_project_service.return_value = None + + # Make the PATCH request WITHOUT client session header + resp = await client.patch( + f"{base_url}", + data=json.dumps( + { + "name": "testing-name-without-header", + "description": "testing-description-without-header", + } + ), + ) + await assert_status(resp, expected) + + # Verify that patch_project_for_user was called with client_session_id=None + mock_patch_project_service.assert_called_once() + call_args = mock_patch_project_service.call_args + + # Extract the client_session_id from the call arguments + assert "client_session_id" in call_args.kwargs + assert call_args.kwargs["client_session_id"] is None diff --git a/services/web/server/tests/unit/with_dbs/02/test_projects_patch_project_and_notify_users_locking.py b/services/web/server/tests/unit/with_dbs/02/test_projects_patch_project_and_notify_users_locking.py index 03e2e10e784..80b7ea6fb47 100644 --- a/services/web/server/tests/unit/with_dbs/02/test_projects_patch_project_and_notify_users_locking.py +++ b/services/web/server/tests/unit/with_dbs/02/test_projects_patch_project_and_notify_users_locking.py @@ -69,6 +69,7 @@ async def test_patch_project_and_notify_users_sequential( project_uuid=project_uuid, patch_project_data=patch_data_1, user_primary_gid=user_primary_gid, + client_session_id=None, ) # Get version after first patch @@ -83,6 +84,7 @@ async def test_patch_project_and_notify_users_sequential( project_uuid=project_uuid, patch_project_data=patch_data_2, user_primary_gid=user_primary_gid, + client_session_id=None, ) # Get version after second patch @@ -126,6 +128,7 @@ async def test_patch_project_and_notify_users_concurrent_locking( project_uuid=project_uuid, patch_project_data=patch_data, user_primary_gid=user_primary_gid, + client_session_id=None, ) for patch_data in concurrent_patch_data_list ] @@ -189,6 +192,7 @@ async def test_patch_project_and_notify_users_concurrent_different_projects( project_uuid=project_uuid_1, patch_project_data=patch_data, user_primary_gid=user_primary_gid, + client_session_id=None, ) # Get final versions