Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ class ProjectDocument(OutputSchema):
model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)


ProjectDocumentVersion: TypeAlias = int


__all__: tuple[str, ...] = (
"EmptyModel",
"ProjectCopyOverride",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ class PydanticExportParametersDict(TypedDict):

# Headers keys
X_PRODUCT_NAME_HEADER: Final[str] = "X-Simcore-Products-Name"
X_CLIENT_SESSION_ID_HEADER: Final[str] = "X-Client-Session-Id"
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ async def _update_project_state(
node_errors: list[ErrorDict] | None,
) -> None:
project = await _projects_service.update_project_node_state(
app, user_id, project_uuid, node_uuid, new_state
app,
user_id,
project_uuid,
node_uuid,
new_state,
client_session_id=None, # <-- The trigger for this update is not from the UI (its db listener)
)

await _projects_service.notify_project_node_update(
Expand Down Expand Up @@ -95,6 +100,7 @@ async def _handle_db_notification(
changed_row.run_hash,
node_errors=changed_row.errors,
ui_changed_keys=None,
client_session_id=None, # <-- The trigger for this update is not from the UI (its db listener)
)

if "state" in payload.changes and (changed_row.state is not None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from simcore_postgres_database.utils_repos import transaction_context

from ..db.plugin import get_asyncpg_engine
from ..models import ClientSessionID
from ..projects import _folders_repository as projects_folders_repository
from ..projects import _groups_repository as projects_groups_repository
from ..projects._access_rights_service import check_user_project_permission
Expand All @@ -26,6 +27,7 @@ async def move_folder_into_workspace(
folder_id: FolderID,
workspace_id: WorkspaceID | None,
product_name: ProductName,
client_session_id: ClientSessionID | None = None,
) -> None:
# 1. User needs to have delete permission on source folder
folder_db = await _folders_repository.get(
Expand Down Expand Up @@ -84,6 +86,7 @@ async def move_folder_into_workspace(
project_uuid=project_id,
patch_project_data={"workspace_id": workspace_id},
user_primary_gid=user["primary_gid"],
client_session_id=client_session_id,
)

# 5. BATCH update of folders with workspace_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

from aiohttp import web
from servicelib.aiohttp import status
from servicelib.aiohttp.requests_validation import parse_request_path_parameters_as
from servicelib.aiohttp.requests_validation import (
parse_request_headers_as,
parse_request_path_parameters_as,
)

from .._meta import api_version_prefix as VTAG
from ..login.decorators import login_required
from ..models import ClientSessionHeaderParams
from ..security.decorators import permission_required
from . import _workspaces_repository
from ._common.exceptions_handlers import handle_plugin_requests_exceptions
Expand All @@ -27,12 +31,14 @@
async def move_folder_to_workspace(request: web.Request):
req_ctx = FoldersRequestContext.model_validate(request)
path_params = parse_request_path_parameters_as(FolderWorkspacesPathParams, request)
header_params = parse_request_headers_as(ClientSessionHeaderParams, request)

await _workspaces_repository.move_folder_into_workspace(
app=request.app,
user_id=req_ctx.user_id,
folder_id=path_params.folder_id,
workspace_id=path_params.workspace_id,
product_name=req_ctx.product_name,
client_session_id=header_params.client_session_id,
)
return web.json_response(status=status.HTTP_204_NO_CONTENT)
32 changes: 31 additions & 1 deletion services/web/server/src/simcore_service_webserver/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from models_library.products import ProductName
from models_library.rest_base import RequestParameters
from models_library.users import UserID
from pydantic import ConfigDict, Field
from pydantic import ConfigDict, Field, StringConstraints
from pydantic_extra_types.phone_numbers import PhoneNumberValidator
from servicelib.request_keys import RQT_USERID_KEY
from servicelib.rest_constants import X_CLIENT_SESSION_ID_HEADER

from .constants import RQ_PRODUCT_KEY

Expand All @@ -16,6 +17,18 @@
]


ClientSessionID: TypeAlias = Annotated[
str,
StringConstraints(
strip_whitespace=True,
min_length=36,
max_length=36,
pattern=r"^[0-9a-fA-F\-]{36}$", # UUID format
strict=True,
),
]


class AuthenticatedRequestContext(RequestParameters):
"""Fields expected in the request context for authenticated endpoints"""

Expand All @@ -25,3 +38,20 @@ class AuthenticatedRequestContext(RequestParameters):
model_config = ConfigDict(
frozen=True # prevents modifications after middlewares creates this model
)


assert X_CLIENT_SESSION_ID_HEADER


class ClientSessionHeaderParams(RequestParameters):
"""Header parameters for client session tracking in collaborative features."""

client_session_id: ClientSessionID | None = Field(
default=None,
alias="X-Client-Session-Id", # X_CLIENT_SESSION_ID_HEADER,
description="Client session identifier for collaborative features (UUID string)",
)

model_config = ConfigDict(
validate_by_name=True,
)
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from servicelib.aiohttp.long_running_tasks.server import start_long_running_task
from servicelib.aiohttp.requests_validation import (
parse_request_body_as,
parse_request_headers_as,
parse_request_path_parameters_as,
parse_request_query_parameters_as,
)
Expand All @@ -59,6 +60,7 @@
from ...groups.api import get_group_from_gid, list_all_user_groups_ids
from ...groups.exceptions import GroupNotFoundError
from ...login.decorators import login_required
from ...models import ClientSessionHeaderParams
from ...security.decorators import permission_required
from ...users import users_service
from ...utils_aiohttp import envelope_json_response, get_api_base_url
Expand Down Expand Up @@ -96,6 +98,7 @@ async def create_node(request: web.Request) -> web.Response:
req_ctx = AuthenticatedRequestContext.model_validate(request)
path_params = parse_request_path_parameters_as(ProjectPathParams, request)
body = await parse_request_body_as(NodeCreate, request)
header_params = parse_request_headers_as(ClientSessionHeaderParams, request)

if await _projects_service.is_service_deprecated(
request.app,
Expand Down Expand Up @@ -124,6 +127,7 @@ async def create_node(request: web.Request) -> web.Response:
body.service_key,
body.service_version,
body.service_id,
client_session_id=header_params.client_session_id,
)
}
assert NodeCreated.model_validate(data) is not None # nosec
Expand Down Expand Up @@ -178,6 +182,7 @@ async def patch_project_node(request: web.Request) -> web.Response:
req_ctx = AuthenticatedRequestContext.model_validate(request)
path_params = parse_request_path_parameters_as(NodePathParams, request)
node_patch = await parse_request_body_as(NodePatch, request)
header_params = parse_request_headers_as(ClientSessionHeaderParams, request)

await _projects_service.patch_project_node(
request.app,
Expand All @@ -187,6 +192,7 @@ async def patch_project_node(request: web.Request) -> web.Response:
project_id=path_params.project_id,
node_id=path_params.node_id,
partial_node=node_patch.to_domain_model(),
client_session_id=header_params.client_session_id,
)

return web.json_response(status=status.HTTP_204_NO_CONTENT)
Expand All @@ -199,6 +205,7 @@ async def patch_project_node(request: web.Request) -> web.Response:
async def delete_node(request: web.Request) -> web.Response:
req_ctx = AuthenticatedRequestContext.model_validate(request)
path_params = parse_request_path_parameters_as(NodePathParams, request)
header_params = parse_request_headers_as(ClientSessionHeaderParams, request)

# ensure the project exists
await _projects_service.get_project_for_user(
Expand All @@ -213,6 +220,7 @@ async def delete_node(request: web.Request) -> web.Response:
NodeIDStr(path_params.node_id),
req_ctx.product_name,
product_api_base_url=get_api_base_url(request),
client_session_id=header_params.client_session_id,
)

return web.json_response(status=status.HTTP_204_NO_CONTENT)
Expand Down Expand Up @@ -249,6 +257,7 @@ async def update_node_outputs(request: web.Request) -> web.Response:
req_ctx = AuthenticatedRequestContext.model_validate(request)
path_params = parse_request_path_parameters_as(NodePathParams, request)
node_outputs = await parse_request_body_as(NodeOutputs, request)
header_params = parse_request_headers_as(ClientSessionHeaderParams, request)

ui_changed_keys = set()
ui_changed_keys.add(f"{path_params.node_id}")
Expand All @@ -261,6 +270,7 @@ async def update_node_outputs(request: web.Request) -> web.Response:
run_hash=None,
node_errors=None,
ui_changed_keys=ui_changed_keys,
client_session_id=header_params.client_session_id,
)
return web.json_response(status=status.HTTP_204_NO_CONTENT)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
from pydantic import BaseModel, Field, TypeAdapter
from servicelib.aiohttp.requests_validation import (
parse_request_body_as,
parse_request_headers_as,
parse_request_path_parameters_as,
)

from ..._meta import API_VTAG as VTAG
from ...login.decorators import login_required
from ...models import ClientSessionHeaderParams
from ...security.decorators import permission_required
from ...utils_aiohttp import envelope_json_response
from .. import _ports_service, _projects_service
Expand Down Expand Up @@ -88,6 +90,7 @@ async def update_project_inputs(request: web.Request) -> web.Response:
req_ctx = AuthenticatedRequestContext.model_validate(request)
path_params = parse_request_path_parameters_as(ProjectPathParams, request)
inputs_updates = await parse_request_body_as(list[ProjectInputUpdate], request)
header_params = parse_request_headers_as(ClientSessionHeaderParams, request)

assert request.app # nosec

Expand Down Expand Up @@ -123,6 +126,7 @@ async def update_project_inputs(request: web.Request) -> web.Response:
project_uuid=path_params.project_id,
product_name=req_ctx.product_name,
partial_workbench_data=jsonable_encoder(partial_workbench_data),
client_session_id=header_params.client_session_id,
)

workbench = TypeAdapter(dict[NodeID, Node]).validate_python(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from ..._meta import API_VTAG as VTAG
from ...login.decorators import login_required
from ...models import ClientSessionHeaderParams
from ...redis import get_redis_lock_manager_client_sdk
from ...resource_manager.user_sessions import PROJECT_ID_KEY, managed_resource
from ...security import security_web
Expand Down Expand Up @@ -312,13 +313,15 @@ async def patch_project(request: web.Request):
req_ctx = AuthenticatedRequestContext.model_validate(request)
path_params = parse_request_path_parameters_as(ProjectPathParams, request)
project_patch = await parse_request_body_as(ProjectPatch, request)
header_params = parse_request_headers_as(ClientSessionHeaderParams, request)

await _projects_service.patch_project_for_user(
request.app,
user_id=req_ctx.user_id,
project_uuid=path_params.project_id,
project_patch=project_patch,
product_name=req_ctx.product_name,
client_session_id=header_params.client_session_id,
)

return web.json_response(status=status.HTTP_204_NO_CONTENT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
from models_library.workspaces import WorkspaceID
from pydantic import BaseModel, BeforeValidator, ConfigDict, Field
from servicelib.aiohttp import status
from servicelib.aiohttp.requests_validation import parse_request_path_parameters_as
from servicelib.aiohttp.requests_validation import (
parse_request_headers_as,
parse_request_path_parameters_as,
)

from ..._meta import api_version_prefix as VTAG
from ...login.decorators import login_required
from ...models import ClientSessionHeaderParams
from ...security.decorators import permission_required
from .. import _workspaces_service
from ._rest_exceptions import handle_plugin_requests_exceptions
Expand Down Expand Up @@ -43,12 +47,14 @@ async def move_project_to_workspace(request: web.Request):
path_params = parse_request_path_parameters_as(
_ProjectWorkspacesPathParams, request
)
header_params = parse_request_headers_as(ClientSessionHeaderParams, request)

await _workspaces_service.move_project_into_workspace(
app=request.app,
user_id=req_ctx.user_id,
project_id=path_params.project_id,
workspace_id=path_params.workspace_id,
product_name=req_ctx.product_name,
client_session_id=header_params.client_session_id,
)
return web.json_response(status=status.HTTP_204_NO_CONTENT)
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Utility functions for project document management.
This module contains common utilities for building and versioning project documents.
"""

from typing import cast

from aiohttp import web
from models_library.api_schemas_webserver.projects import (
ProjectDocument,
ProjectDocumentVersion,
)
from models_library.projects import ProjectID, ProjectTemplateType
from models_library.projects import ProjectType as ProjectTypeAPI
from servicelib.redis import (
PROJECT_DB_UPDATE_REDIS_LOCK_KEY,
exclusive,
increment_and_return_project_document_version,
)

from ..redis import (
get_redis_document_manager_client_sdk,
get_redis_lock_manager_client_sdk,
)
from . import _projects_repository


async def create_project_document_and_increment_version(
app: web.Application, project_uuid: ProjectID
) -> tuple[ProjectDocument, ProjectDocumentVersion]:
"""Build project document and increment version with Redis lock protection.
This function is protected by Redis exclusive lock because:
- the project document and its version must be kept in sync
Args:
app: The web application instance
project_uuid: UUID of the project
Returns:
Tuple containing the project document and its version number
"""

@exclusive(
get_redis_lock_manager_client_sdk(app),
lock_key=PROJECT_DB_UPDATE_REDIS_LOCK_KEY.format(project_uuid),
blocking=True,
blocking_timeout=None, # NOTE: this is a blocking call, a timeout has undefined effects
)
async def _create_project_document_and_increment_version() -> (
tuple[ProjectDocument, int]
):
"""This function is protected because
- the project document and its version must be kept in sync
"""
# Get the full project with workbench for document creation
project_with_workbench = await _projects_repository.get_project_with_workbench(
app=app, project_uuid=project_uuid
)
# Create project document
project_document = ProjectDocument(
uuid=project_with_workbench.uuid,
workspace_id=project_with_workbench.workspace_id,
name=project_with_workbench.name,
description=project_with_workbench.description,
thumbnail=project_with_workbench.thumbnail,
last_change_date=project_with_workbench.last_change_date,
classifiers=project_with_workbench.classifiers,
dev=project_with_workbench.dev,
quality=project_with_workbench.quality,
workbench=project_with_workbench.workbench,
ui=project_with_workbench.ui,
type=cast(ProjectTypeAPI, project_with_workbench.type),
template_type=cast(
ProjectTemplateType, project_with_workbench.template_type
),
)
# Increment document version
redis_client_sdk = get_redis_document_manager_client_sdk(app)
document_version = await increment_and_return_project_document_version(
redis_client=redis_client_sdk, project_uuid=project_uuid
)

return project_document, document_version

return await _create_project_document_and_increment_version()
Loading
Loading