Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ class ProjectDocument(OutputSchema):
model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)


ProjectDocumentVersion: TypeAlias = int


__all__: tuple[str, ...] = (
"EmptyModel",
"ProjectCopyOverride",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ class PydanticExportParametersDict(TypedDict):

# Headers keys
X_PRODUCT_NAME_HEADER: Final[str] = "X-Simcore-Products-Name"
X_CLIENT_SESSION_ID_HEADER: Final[str] = "X-Client-Session-Id"
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ async def _update_project_state(
node_errors: list[ErrorDict] | None,
) -> None:
project = await _projects_service.update_project_node_state(
app, user_id, project_uuid, node_uuid, new_state
app,
user_id,
project_uuid,
node_uuid,
new_state,
client_session_id=None, # <-- The trigger for this update is not from the UI (its db listener)
)

await _projects_service.notify_project_node_update(
Expand Down Expand Up @@ -95,6 +100,7 @@ async def _handle_db_notification(
changed_row.run_hash,
node_errors=changed_row.errors,
ui_changed_keys=None,
client_session_id=None, # <-- The trigger for this update is not from the UI (its db listener)
)

if "state" in payload.changes and (changed_row.state is not None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async def move_folder_into_workspace(
folder_id: FolderID,
workspace_id: WorkspaceID | None,
product_name: ProductName,
client_session_id: str | None = None,
) -> None:
# 1. User needs to have delete permission on source folder
folder_db = await _folders_repository.get(
Expand Down Expand Up @@ -84,6 +85,7 @@ async def move_folder_into_workspace(
project_uuid=project_id,
patch_project_data={"workspace_id": workspace_id},
user_primary_gid=user["primary_gid"],
client_session_id=client_session_id,
)

# 5. BATCH update of folders with workspace_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from aiohttp import web
from servicelib.aiohttp import status
from servicelib.aiohttp.requests_validation import parse_request_path_parameters_as
from servicelib.rest_constants import X_CLIENT_SESSION_ID_HEADER

from .._meta import api_version_prefix as VTAG
from ..login.decorators import login_required
Expand All @@ -28,11 +29,14 @@ async def move_folder_to_workspace(request: web.Request):
req_ctx = FoldersRequestContext.model_validate(request)
path_params = parse_request_path_parameters_as(FolderWorkspacesPathParams, request)

client_session_id: str | None = request.headers.get(X_CLIENT_SESSION_ID_HEADER)

await _workspaces_repository.move_folder_into_workspace(
app=request.app,
user_id=req_ctx.user_id,
folder_id=path_params.folder_id,
workspace_id=path_params.workspace_id,
product_name=req_ctx.product_name,
client_session_id=client_session_id,
)
return web.json_response(status=status.HTTP_204_NO_CONTENT)
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
ServiceWaitingForManualInterventionError,
ServiceWasNotFoundError,
)
from servicelib.rest_constants import X_CLIENT_SESSION_ID_HEADER
from servicelib.services_utils import get_status_as_dict
from simcore_postgres_database.models.users import UserRole

Expand Down Expand Up @@ -97,6 +98,8 @@ async def create_node(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(ProjectPathParams, request)
body = await parse_request_body_as(NodeCreate, request)

client_session_id = request.headers.get(X_CLIENT_SESSION_ID_HEADER)

if await _projects_service.is_service_deprecated(
request.app,
req_ctx.user_id,
Expand Down Expand Up @@ -124,6 +127,7 @@ async def create_node(request: web.Request) -> web.Response:
body.service_key,
body.service_version,
body.service_id,
client_session_id=client_session_id,
)
}
assert NodeCreated.model_validate(data) is not None # nosec
Expand Down Expand Up @@ -179,6 +183,8 @@ async def patch_project_node(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(NodePathParams, request)
node_patch = await parse_request_body_as(NodePatch, request)

client_session_id = request.headers.get(X_CLIENT_SESSION_ID_HEADER)

await _projects_service.patch_project_node(
request.app,
product_name=req_ctx.product_name,
Expand All @@ -187,6 +193,7 @@ async def patch_project_node(request: web.Request) -> web.Response:
project_id=path_params.project_id,
node_id=path_params.node_id,
partial_node=node_patch.to_domain_model(),
client_session_id=client_session_id,
)

return web.json_response(status=status.HTTP_204_NO_CONTENT)
Expand All @@ -200,6 +207,8 @@ async def delete_node(request: web.Request) -> web.Response:
req_ctx = AuthenticatedRequestContext.model_validate(request)
path_params = parse_request_path_parameters_as(NodePathParams, request)

client_session_id = request.headers.get(X_CLIENT_SESSION_ID_HEADER)

# ensure the project exists
await _projects_service.get_project_for_user(
request.app,
Expand All @@ -213,6 +222,7 @@ async def delete_node(request: web.Request) -> web.Response:
NodeIDStr(path_params.node_id),
req_ctx.product_name,
product_api_base_url=get_api_base_url(request),
client_session_id=client_session_id,
)

return web.json_response(status=status.HTTP_204_NO_CONTENT)
Expand Down Expand Up @@ -250,6 +260,8 @@ async def update_node_outputs(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(NodePathParams, request)
node_outputs = await parse_request_body_as(NodeOutputs, request)

client_session_id = request.headers.get(X_CLIENT_SESSION_ID_HEADER)

ui_changed_keys = set()
ui_changed_keys.add(f"{path_params.node_id}")
await nodes_utils.update_node_outputs(
Expand All @@ -261,6 +273,7 @@ async def update_node_outputs(request: web.Request) -> web.Response:
run_hash=None,
node_errors=None,
ui_changed_keys=ui_changed_keys,
client_session_id=client_session_id,
)
return web.json_response(status=status.HTTP_204_NO_CONTENT)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
parse_request_body_as,
parse_request_path_parameters_as,
)
from servicelib.rest_constants import X_CLIENT_SESSION_ID_HEADER

from ..._meta import API_VTAG as VTAG
from ...login.decorators import login_required
Expand Down Expand Up @@ -89,6 +90,8 @@ async def update_project_inputs(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(ProjectPathParams, request)
inputs_updates = await parse_request_body_as(list[ProjectInputUpdate], request)

client_session_id: str | None = request.headers.get(X_CLIENT_SESSION_ID_HEADER)

assert request.app # nosec

workbench = await _get_validated_workbench_model(
Expand Down Expand Up @@ -123,6 +126,7 @@ async def update_project_inputs(request: web.Request) -> web.Response:
project_uuid=path_params.project_id,
product_name=req_ctx.product_name,
partial_workbench_data=jsonable_encoder(partial_workbench_data),
client_session_id=client_session_id,
)

workbench = TypeAdapter(dict[NodeID, Node]).validate_python(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
X_SIMCORE_USER_AGENT,
)
from servicelib.redis import get_project_locked_state
from servicelib.rest_constants import X_CLIENT_SESSION_ID_HEADER

from ..._meta import API_VTAG as VTAG
from ...login.decorators import login_required
Expand Down Expand Up @@ -313,12 +314,15 @@ async def patch_project(request: web.Request):
path_params = parse_request_path_parameters_as(ProjectPathParams, request)
project_patch = await parse_request_body_as(ProjectPatch, request)

client_session_id: str | None = request.headers.get(X_CLIENT_SESSION_ID_HEADER)

await _projects_service.patch_project_for_user(
request.app,
user_id=req_ctx.user_id,
project_uuid=path_params.project_id,
project_patch=project_patch,
product_name=req_ctx.product_name,
client_session_id=client_session_id,
)

return web.json_response(status=status.HTTP_204_NO_CONTENT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pydantic import BaseModel, BeforeValidator, ConfigDict, Field
from servicelib.aiohttp import status
from servicelib.aiohttp.requests_validation import parse_request_path_parameters_as
from servicelib.rest_constants import X_CLIENT_SESSION_ID_HEADER

from ..._meta import api_version_prefix as VTAG
from ...login.decorators import login_required
Expand Down Expand Up @@ -44,11 +45,14 @@ async def move_project_to_workspace(request: web.Request):
_ProjectWorkspacesPathParams, request
)

client_session_id: str | None = request.headers.get(X_CLIENT_SESSION_ID_HEADER)

await _workspaces_service.move_project_into_workspace(
app=request.app,
user_id=req_ctx.user_id,
project_id=path_params.project_id,
workspace_id=path_params.workspace_id,
product_name=req_ctx.product_name,
client_session_id=client_session_id,
)
return web.json_response(status=status.HTTP_204_NO_CONTENT)
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Utility functions for project document management.
This module contains common utilities for building and versioning project documents.
"""

from typing import cast

from aiohttp import web
from models_library.api_schemas_webserver.projects import (
ProjectDocument,
ProjectDocumentVersion,
)
from models_library.projects import ProjectID, ProjectTemplateType
from models_library.projects import ProjectType as ProjectTypeAPI
from servicelib.redis import (
PROJECT_DB_UPDATE_REDIS_LOCK_KEY,
exclusive,
increment_and_return_project_document_version,
)

from ..redis import (
get_redis_document_manager_client_sdk,
get_redis_lock_manager_client_sdk,
)
from . import _projects_repository


async def build_project_document_and_increment_version(
app: web.Application, project_uuid: ProjectID
) -> tuple[ProjectDocument, ProjectDocumentVersion]:
"""Build project document and increment version with Redis lock protection.
This function is protected by Redis exclusive lock because:
- the project document and its version must be kept in sync
Args:
app: The web application instance
project_uuid: UUID of the project
Returns:
Tuple containing the project document and its version number
"""

@exclusive(
get_redis_lock_manager_client_sdk(app),
lock_key=PROJECT_DB_UPDATE_REDIS_LOCK_KEY.format(project_uuid),
blocking=True,
blocking_timeout=None, # NOTE: this is a blocking call, a timeout has undefined effects
)
async def _build_project_document_and_increment_version() -> (
tuple[ProjectDocument, int]
):
"""This function is protected because
- the project document and its version must be kept in sync
"""
# Get the full project with workbench for document creation
project_with_workbench = await _projects_repository.get_project_with_workbench(
app=app, project_uuid=project_uuid
)
# Create project document
project_document = ProjectDocument(
uuid=project_with_workbench.uuid,
workspace_id=project_with_workbench.workspace_id,
name=project_with_workbench.name,
description=project_with_workbench.description,
thumbnail=project_with_workbench.thumbnail,
last_change_date=project_with_workbench.last_change_date,
classifiers=project_with_workbench.classifiers,
dev=project_with_workbench.dev,
quality=project_with_workbench.quality,
workbench=project_with_workbench.workbench,
ui=project_with_workbench.ui,
type=cast(ProjectTypeAPI, project_with_workbench.type),
template_type=cast(
ProjectTemplateType, project_with_workbench.template_type
),
)
# Increment document version
redis_client_sdk = get_redis_document_manager_client_sdk(app)
document_version = await increment_and_return_project_document_version(
redis_client=redis_client_sdk, project_uuid=project_uuid
)

return project_document, document_version

return await _build_project_document_and_increment_version()
Loading
Loading