Skip to content

Commit e0d6282

Browse files
committed
fix test createing/deleteing nodes in high throuput conditions
1 parent c5e7bbe commit e0d6282

File tree

2 files changed

+42
-29
lines changed

2 files changed

+42
-29
lines changed

services/web/server/src/simcore_service_webserver/projects/_projects_repository_legacy.py

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
66
"""
77

8-
import datetime
98
import logging
109
from contextlib import AsyncExitStack
1110
from typing import Any, Self, cast
@@ -449,7 +448,6 @@ def _create_shared_workspace_query(
449448
is_search_by_multi_columns: bool,
450449
user_groups: list[GroupID],
451450
) -> sql.Select | None:
452-
453451
if workspace_query.workspace_scope is not WorkspaceScope.PRIVATE:
454452
assert workspace_query.workspace_scope in ( # nosec
455453
WorkspaceScope.SHARED,
@@ -951,27 +949,32 @@ async def _update_project_workbench_with_lock_and_notify(
951949
thread-safe operations on the project document.
952950
"""
953951

952+
# Get user's primary group ID for notification
953+
async with self.engine.acquire() as conn:
954+
user_primary_gid = await self._get_user_primary_group_gid(conn, user_id)
955+
956+
# 10 concurrent calls
954957
@exclusive(
955958
get_redis_lock_manager_client_sdk(self._app),
956959
lock_key=PROJECT_DB_UPDATE_REDIS_LOCK_KEY.format(project_uuid),
957960
blocking=True,
958-
blocking_timeout=datetime.timedelta(seconds=30),
961+
blocking_timeout=None, # NOTE: this is a blocking call, a timeout has undefined effects
959962
)
960963
async def _update_workbench_and_notify() -> (
961-
tuple[ProjectDict, dict[NodeIDStr, Any]]
964+
tuple[ProjectDict, dict[NodeIDStr, Any], ProjectDocument, int]
962965
):
963-
# Update the workbench
966+
"""This function is protected because
967+
- the project document and its version must be kept in sync
968+
"""
969+
# Update the workbench work since it's atomic
964970
updated_project, changed_entries = await self._update_project_workbench(
965971
partial_workbench_data,
966972
user_id=user_id,
967973
project_uuid=f"{project_uuid}",
968974
product_name=product_name,
969975
allow_workbench_changes=allow_workbench_changes,
970976
)
971-
972-
# Get user's primary group ID for notification
973-
async with self.engine.acquire() as conn:
974-
user_primary_gid = await self._get_user_primary_group_gid(conn, user_id)
977+
# the update project with last_modified timestamp latest is the last
975978

976979
# Get the full project with workbench for document creation
977980
project_with_workbench = (
@@ -1004,17 +1007,23 @@ async def _update_workbench_and_notify() -> (
10041007
document_version = await increment_and_return_project_document_version(
10051008
redis_client=redis_client_sdk, project_uuid=project_uuid
10061009
)
1007-
await notify_project_document_updated(
1008-
app=self._app,
1009-
project_id=project_uuid,
1010-
user_primary_gid=user_primary_gid,
1011-
version=document_version,
1012-
document=project_document,
1013-
)
10141010

1015-
return updated_project, changed_entries
1016-
1017-
return await _update_workbench_and_notify()
1011+
return updated_project, changed_entries, project_document, document_version
1012+
1013+
(
1014+
updated_project,
1015+
changed_entries,
1016+
project_document,
1017+
document_version,
1018+
) = await _update_workbench_and_notify()
1019+
await notify_project_document_updated(
1020+
app=self._app,
1021+
project_id=project_uuid,
1022+
user_primary_gid=user_primary_gid,
1023+
version=document_version,
1024+
document=project_document,
1025+
)
1026+
return updated_project, changed_entries
10181027

10191028
async def _update_project_workbench(
10201029
self,

services/web/server/src/simcore_service_webserver/projects/_projects_service.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,12 @@ async def patch_project_and_notify_users(
207207
get_redis_lock_manager_client_sdk(app),
208208
lock_key=PROJECT_DB_UPDATE_REDIS_LOCK_KEY.format(project_uuid),
209209
blocking=True,
210-
blocking_timeout=datetime.timedelta(seconds=30),
210+
blocking_timeout=None, # NOTE: this is a blocking call, a timeout has undefined effects
211211
)
212-
async def _patch_and_notify() -> None:
212+
async def _patch_and_create_project_document() -> tuple[ProjectDocument, int]:
213+
"""This function is protected because
214+
- the project document and its version must be kept in sync
215+
"""
213216
await _projects_repository.patch_project(
214217
app=app,
215218
project_uuid=project_uuid,
@@ -239,15 +242,16 @@ async def _patch_and_notify() -> None:
239242
document_version = await increment_and_return_project_document_version(
240243
redis_client=redis_client_sdk, project_uuid=project_uuid
241244
)
242-
await notify_project_document_updated(
243-
app=app,
244-
project_id=project_uuid,
245-
user_primary_gid=user_primary_gid,
246-
version=document_version,
247-
document=project_document,
248-
)
245+
return project_document, document_version
249246

250-
await _patch_and_notify()
247+
project_document, document_version = await _patch_and_create_project_document()
248+
await notify_project_document_updated(
249+
app=app,
250+
project_id=project_uuid,
251+
user_primary_gid=user_primary_gid,
252+
version=document_version,
253+
document=project_document,
254+
)
251255

252256

253257
def _is_node_dynamic(node_key: str) -> bool:

0 commit comments

Comments
 (0)