diff --git a/packages/postgres-database/src/simcore_postgres_database/migration/versions/61b98a60e934_computational_collection_uniquencess.py b/packages/postgres-database/src/simcore_postgres_database/migration/versions/61b98a60e934_computational_collection_uniquencess.py new file mode 100644 index 00000000000..1de7d9da7f8 --- /dev/null +++ b/packages/postgres-database/src/simcore_postgres_database/migration/versions/61b98a60e934_computational_collection_uniquencess.py @@ -0,0 +1,35 @@ +"""computational collection uniquencess + +Revision ID: 61b98a60e934 +Revises: df61d1b2b967 +Create Date: 2025-07-08 15:40:12.714684+00:00 + +""" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "61b98a60e934" +down_revision = "df61d1b2b967" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_unique_constraint( + "client_or_system_generated_id_uniqueness", + "comp_runs_collections", + ["client_or_system_generated_id"], + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint( + "client_or_system_generated_id_uniqueness", + "comp_runs_collections", + type_="unique", + ) + # ### end Alembic commands ### diff --git a/packages/postgres-database/src/simcore_postgres_database/models/comp_runs_collections.py b/packages/postgres-database/src/simcore_postgres_database/models/comp_runs_collections.py index 5953a90074b..31439acc458 100644 --- a/packages/postgres-database/src/simcore_postgres_database/models/comp_runs_collections.py +++ b/packages/postgres-database/src/simcore_postgres_database/models/comp_runs_collections.py @@ -35,4 +35,7 @@ "ix_comp_runs_collections_client_or_system_generated_id", "client_or_system_generated_id", ), + sa.UniqueConstraint( + "client_or_system_generated_id", name="client_or_system_generated_id_uniqueness" + ), ) diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py index 63d02b3a64a..b1b90b999a2 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py @@ -126,6 +126,7 @@ async def list_computation_collection_runs_page( product_name: ProductName, user_id: UserID, project_ids: list[ProjectID] | None, + filter_only_running: bool = False, # pagination offset: int = 0, limit: int = 20, @@ -138,6 +139,7 @@ async def list_computation_collection_runs_page( product_name=product_name, user_id=user_id, project_ids=project_ids, + filter_only_running=filter_only_running, offset=offset, limit=limit, timeout_s=_DEFAULT_TIMEOUT_S, diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py index 1335d6d86e5..d8ad583ece7 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py @@ -102,15 +102,24 @@ async def list_computation_collection_runs_page( product_name: ProductName, user_id: UserID, project_ids: list[ProjectID] | None, + filter_only_running: bool = False, # pagination offset: int = 0, limit: int = 20, ) -> ComputationCollectionRunRpcGetPage: comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine) + + collection_run_ids: list[CollectionRunID] | None = None + if filter_only_running is True: + collection_run_ids = await comp_runs_repo.list_all_collection_run_ids_for_user_currently_running_computations( + product_name=product_name, user_id=user_id + ) + total, comp_runs_output = await comp_runs_repo.list_group_by_collection_run_id( product_name=product_name, user_id=user_id, - project_ids=project_ids, + project_ids_or_none=project_ids, + collection_run_ids_or_none=collection_run_ids, offset=offset, limit=limit, ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py index e9a2f26ceab..3da15d494f3 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py @@ -377,12 +377,46 @@ async def list_for_user_and_project_all_iterations( return cast(int, total_count), items + async def list_all_collection_run_ids_for_user_currently_running_computations( + self, + *, + product_name: str, + user_id: UserID, + ) -> list[CollectionRunID]: + + list_query = ( + sa.select( + comp_runs.c.collection_run_id, + ) + .where( + (comp_runs.c.user_id == user_id) + & ( + comp_runs.c.metadata["product_name"].astext == product_name + ) # <-- NOTE: We might create a separate column for this for fast retrieval + & ( + comp_runs.c.result.in_( + [ + RUNNING_STATE_TO_DB[item] + for item in RunningState.list_running_states() + ] + ) + ) + ) + .distinct() + ) + + async with pass_or_acquire_connection(self.db_engine) as conn: + return [ + CollectionRunID(row[0]) async for row in await conn.stream(list_query) + ] + async def list_group_by_collection_run_id( self, *, product_name: str, user_id: UserID, - project_ids: list[ProjectID] | None = None, + project_ids_or_none: list[ProjectID] | None = None, + collection_run_ids_or_none: list[CollectionRunID] | None = None, # pagination offset: int, limit: int, @@ -408,10 +442,19 @@ async def list_group_by_collection_run_id( & (comp_runs.c.metadata["product_name"].astext == product_name) ) - if project_ids: + if project_ids_or_none: base_select_query = base_select_query.where( comp_runs.c.project_uuid.in_( - [f"{project_id}" for project_id in project_ids] + [f"{project_id}" for project_id in project_ids_or_none] + ) + ) + if collection_run_ids_or_none: + base_select_query = base_select_query.where( + comp_runs.c.collection_run_id.in_( + [ + f"{collection_run_id}" + for collection_run_id in collection_run_ids_or_none + ] ) ) diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py index 336f5366c38..d9429fcbf86 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py @@ -265,18 +265,29 @@ async def test_rpc_list_computation_collection_runs_page_and_collection_run_task not_default_collection_run_id, ] - for proj, collection_run_id in zip(projects, collection_run_id_project_list): + running_state_project_list = [ + RunningState.SUCCESS, + RunningState.PENDING, + RunningState.SUCCESS, + ] + + for proj, collection_run_id, running_state in zip( + projects, + collection_run_id_project_list, + running_state_project_list, + strict=True, + ): await create_pipeline( project_id=f"{proj.uuid}", dag_adjacency_list=fake_workbench_adjacency, ) await create_tasks_from_project( - user=user, project=proj, state=StateType.PUBLISHED, progress=None + user=user, project=proj, state=running_state, progress=None ) run = await create_comp_run( user=user, project=proj, - result=RunningState.SUCCESS, + result=running_state, started=datetime.now(tz=UTC) - timedelta(minutes=120), ended=datetime.now(tz=UTC) - timedelta(minutes=100), iteration=1, @@ -317,3 +328,16 @@ async def test_rpc_list_computation_collection_runs_page_and_collection_run_task assert output.total == 4 assert len(output.items) == 4 isinstance(output, ComputationCollectionRunTaskRpcGetPage) + + # Test filtering only running collection runs + output = await rpc_computations.list_computation_collection_runs_page( + rpc_client, + product_name="osparc", + user_id=user["id"], + project_ids=None, + filter_only_running=True, # <-- This is the tested filter + ) + assert output.total == 1 + assert len(output.items) == 1 + assert isinstance(output, ComputationCollectionRunRpcGetPage) + assert len(output.items[0].project_ids) == 2 diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py index 34d26ce135c..9e521c54a3a 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py @@ -1019,7 +1019,7 @@ async def test_list_group_by_collection_run_id_with_project_filter( total_count, items = await repo.list_group_by_collection_run_id( product_name=run_metadata.get("product_name"), user_id=published_project_1.user["id"], - project_ids=[ + project_ids_or_none=[ published_project_1.project.uuid, published_project_2.project.uuid, ], diff --git a/services/static-webserver/client/source/class/osparc/data/Resources.js b/services/static-webserver/client/source/class/osparc/data/Resources.js index df605b8fd16..c934e90380a 100644 --- a/services/static-webserver/client/source/class/osparc/data/Resources.js +++ b/services/static-webserver/client/source/class/osparc/data/Resources.js @@ -368,9 +368,7 @@ qx.Class.define("osparc.data.Resources", { endpoints: { getPageLatest: { method: "GET", - // uncomment this when the backend supports filtering by runningOnly #8055 - // url: statics.API + "/computation-collection-runs?offset={offset}&limit={limit}&order_by={orderBy}&filter_only_running={runningOnly}" - url: statics.API + "/computation-collection-runs?offset={offset}&limit={limit}&order_by={orderBy}&filter_only_running=false" + url: statics.API + "/computation-collection-runs?offset={offset}&limit={limit}&order_by={orderBy}&filter_only_running={runningOnly}" }, getPageHistory: { method: "GET", diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_comp_runs_collections_repository.py b/services/web/server/src/simcore_service_webserver/director_v2/_comp_runs_collections_repository.py index 22eee161826..346af730161 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_comp_runs_collections_repository.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_comp_runs_collections_repository.py @@ -5,6 +5,7 @@ from pydantic import TypeAdapter from simcore_postgres_database.models.comp_runs_collections import comp_runs_collections from sqlalchemy import func +from sqlalchemy.dialects.postgresql import insert as pg_insert from ._comp_runs_collections_models import CompRunCollectionDBGet @@ -63,3 +64,30 @@ async def get_comp_run_collection_or_none_by_client_generated_id( if row is None: return None return CompRunCollectionDBGet.model_validate(row) + + +async def upsert_comp_run_collection( + conn, + client_or_system_generated_id: str, + client_or_system_generated_display_name: str, + is_generated_by_system: bool, +) -> CollectionRunID: + """Upsert a computational run collection. If it exists, only update the modified time.""" + insert_stmt = pg_insert(comp_runs_collections).values( + client_or_system_generated_id=client_or_system_generated_id, + client_or_system_generated_display_name=client_or_system_generated_display_name, + is_generated_by_system=is_generated_by_system, + created=func.now(), + modified=func.now(), + ) + on_update_stmt = insert_stmt.on_conflict_do_update( + index_elements=[comp_runs_collections.c.client_or_system_generated_id], + set_={ + "modified": func.now(), + }, + ) + result = await conn.stream( + on_update_stmt.returning(comp_runs_collections.c.collection_run_id) + ) + collection_id_tuple: tuple[UUID] = await result.one() + return TypeAdapter(CollectionRunID).validate_python(collection_id_tuple[0]) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_comp_runs_collections_service.py b/services/web/server/src/simcore_service_webserver/director_v2/_comp_runs_collections_service.py index 0bf82c01f7f..57aa0ce6656 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_comp_runs_collections_service.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_comp_runs_collections_service.py @@ -27,6 +27,22 @@ async def create_comp_run_collection( ) +async def upsert_comp_run_collection( + app: web.Application, + *, + client_or_system_generated_id: str, + client_or_system_generated_display_name: str, + is_generated_by_system: bool, +) -> CollectionRunID: + async with transaction_context(get_asyncpg_engine(app)) as conn: + return await _comp_runs_collections_repository.upsert_comp_run_collection( + conn=conn, + client_or_system_generated_id=client_or_system_generated_id, + client_or_system_generated_display_name=client_or_system_generated_display_name, + is_generated_by_system=is_generated_by_system, + ) + + async def get_comp_run_collection_or_none_by_id( app: web.Application, *, collection_run_id: CollectionRunID ) -> CompRunCollectionDBGet | None: diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py index 2068a97fe9b..8611373d95d 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py @@ -321,6 +321,7 @@ async def list_computation_collection_runs( user_id: UserID, # filters filter_by_root_project_id: ProjectID | None = None, + filter_only_running: bool = False, # pagination offset: int, limit: NonNegativeInt, @@ -345,6 +346,7 @@ async def list_computation_collection_runs( product_name=product_name, user_id=user_id, project_ids=child_projects_with_root, + filter_only_running=filter_only_running, offset=offset, limit=limit, ) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py index 98c3ef73ca5..0e01a4b414b 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py @@ -212,15 +212,13 @@ async def list_computation_collection_runs(request: web.Request) -> web.Response ) ) - if query_params.filter_only_running is True: - raise NotImplementedError - total, items = await _computations_service.list_computation_collection_runs( request.app, product_name=req_ctx.product_name, user_id=req_ctx.user_id, # filters filter_by_root_project_id=query_params.filter_by_root_project_id, + filter_only_running=query_params.filter_only_running, # pagination offset=query_params.offset, limit=query_params.limit, diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/rest.py index 990c958ba67..d9016db8f37 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_controller/rest.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/rest.py @@ -88,7 +88,7 @@ async def start_computation(request: web.Request) -> web.Response: comp_run_collection: CompRunCollectionDBGet | None = None if group_id_or_none: comp_run_collection = await _comp_runs_collections_service.get_comp_run_collection_or_none_by_client_generated_id( - request.app, client_or_system_generated_id=group_id_or_none # type: ignore + request.app, client_or_system_generated_id=str(group_id_or_none) ) if comp_run_collection is not None: created_at: datetime = comp_run_collection.created @@ -111,10 +111,10 @@ async def start_computation(request: web.Request) -> web.Response: client_or_system_generated_id = f"{group_id_or_none}" group_name = custom_metadata.get("group_name", "No Group Name") - collection_run_id = await _comp_runs_collections_service.create_comp_run_collection( + collection_run_id = await _comp_runs_collections_service.upsert_comp_run_collection( request.app, client_or_system_generated_id=client_or_system_generated_id, - client_or_system_generated_display_name=group_name, # type: ignore + client_or_system_generated_display_name=str(group_name), is_generated_by_system=is_generated_by_system, ) diff --git a/services/web/server/tests/integration/02/test_computation.py b/services/web/server/tests/integration/02/test_computation.py index 200deba64f3..0fd534ea1b8 100644 --- a/services/web/server/tests/integration/02/test_computation.py +++ b/services/web/server/tests/integration/02/test_computation.py @@ -16,6 +16,7 @@ import sqlalchemy as sa from aiohttp.test_utils import TestClient from common_library.json_serialization import json_dumps +from faker import Faker from models_library.projects_state import RunningState from pytest_simcore.helpers.assert_checks import assert_status from servicelib.aiohttp import status @@ -23,7 +24,9 @@ from servicelib.status_codes_utils import get_code_display_name from settings_library.rabbit import RabbitSettings from settings_library.redis import RedisSettings +from simcore_postgres_database.models.comp_runs_collections import comp_runs_collections from simcore_postgres_database.models.projects import projects +from simcore_postgres_database.models.projects_metadata import projects_metadata from simcore_postgres_database.models.users import UserRole from simcore_postgres_database.webserver_models import ( NodeClass, @@ -518,3 +521,61 @@ async def test_run_pipeline_and_check_state( ) print(f"<-- pipeline completed successfully in {time.monotonic() - start} seconds") + + +@pytest.fixture +async def populated_project_metadata( + client: TestClient, + logged_user: dict[str, Any], + user_project: dict[str, Any], + faker: Faker, + postgres_db: sa.engine.Engine, +): + assert client.app + project_uuid = user_project["uuid"] + with postgres_db.connect() as con: + con.execute( + projects_metadata.insert().values( + **{ + "project_uuid": project_uuid, + "custom": { + "job_name": "My Job Name", + "group_id": faker.uuid4(), + "group_name": "My Group Name", + }, + } + ) + ) + yield + con.execute(projects_metadata.delete()) + con.execute(comp_runs_collections.delete()) # cleanup + + +@pytest.mark.parametrize(*user_role_response(), ids=str) +async def test_start_multiple_computation_with_the_same_collection_run_id( + client: TestClient, + sleeper_service: dict[str, str], + postgres_db: sa.engine.Engine, + populated_project_metadata: None, + logged_user: dict[str, Any], + user_project: dict[str, Any], + fake_workbench_adjacency_list: dict[str, Any], + user_role: UserRole, + expected: _ExpectedResponseTuple, +): + assert client.app + project_id = user_project["uuid"] + + url_start = client.app.router["start_computation"].url_for(project_id=project_id) + assert url_start == URL(f"/{API_VTAG}/computations/{project_id}:start") + + # POST /v0/computations/{project_id}:start + resp = await client.post(f"{url_start}") + await assert_status(resp, expected.created) + + resp = await client.post(f"{url_start}") + # starting again should be disallowed, since it's already running + assert resp.status == expected.confict + + # NOTE: This tests that there is only one entry in comp_runs_collections table created + # as the project metadata has the same group_id diff --git a/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py b/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py index 8e8ef6faee4..e39c79d751b 100644 --- a/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py +++ b/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py @@ -389,6 +389,52 @@ async def test_list_computation_collection_runs_with_client_defined_name( assert data[0]["name"] == "My Collection Run" +@pytest.mark.parametrize(*standard_role_response(), ids=str) +async def test_list_computation_collection_runs_with_filter_only_running( + director_v2_service_mock: AioResponsesMock, + user_project: ProjectDict, + client: TestClient, + logged_user: LoggedUser, + user_role: UserRole, + expected: ExpectedResponse, + populated_comp_run_collection: None, + mock_rpc_list_computation_collection_runs_page: None, +): + assert client.app + url = client.app.router["list_computation_collection_runs"].url_for() + query_parameters = {"filter_only_running": "true"} + url_with_query = url.with_query(**query_parameters) + resp = await client.get(f"{url_with_query}") + data, _ = await assert_status( + resp, status.HTTP_200_OK if user_role == UserRole.GUEST else expected.ok + ) + if user_role != UserRole.ANONYMOUS: + assert ComputationCollectionRunRestGet.model_validate(data[0]) + + +@pytest.mark.parametrize(*standard_role_response(), ids=str) +async def test_list_computation_collection_runs_with_filter_root_project( + director_v2_service_mock: AioResponsesMock, + user_project: ProjectDict, + client: TestClient, + logged_user: LoggedUser, + user_role: UserRole, + expected: ExpectedResponse, + populated_comp_run_collection: None, + mock_rpc_list_computation_collection_runs_page: None, +): + assert client.app + url = client.app.router["list_computation_collection_runs"].url_for() + query_parameters = {"filter_by_root_project_id": user_project["uuid"]} + url_with_query = url.with_query(**query_parameters) + resp = await client.get(f"{url_with_query}") + data, _ = await assert_status( + resp, status.HTTP_200_OK if user_role == UserRole.GUEST else expected.ok + ) + if user_role != UserRole.ANONYMOUS: + assert ComputationCollectionRunRestGet.model_validate(data[0]) + + @pytest.fixture async def populated_project_metadata( client: TestClient, @@ -412,7 +458,7 @@ async def populated_project_metadata( @pytest.mark.parametrize(*standard_role_response(), ids=str) -async def test_list_computation_collection_runs_and_tasks_with_different_names( +async def test_list_computation_collection_runs_tasks_with_different_names( director_v2_service_mock: AioResponsesMock, user_project: ProjectDict, client: TestClient,