Skip to content

Commit db2efc4

Browse files
add filter filter_only_running - director-v2
1 parent 5bb60dd commit db2efc4

File tree

6 files changed

+87
-8
lines changed

6 files changed

+87
-8
lines changed

packages/postgres-database/src/simcore_postgres_database/migration/versions/42ec7816c0b4_computational_collection_runs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,4 @@ def downgrade():
119119
)
120120
op.drop_table("comp_runs_collections")
121121
# ### end Alembic commands ###
122+
# ### end Alembic commands ###

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ async def list_computation_collection_runs_page(
126126
product_name: ProductName,
127127
user_id: UserID,
128128
project_ids: list[ProjectID] | None,
129+
filter_only_running: bool = False,
129130
# pagination
130131
offset: int = 0,
131132
limit: int = 20,
@@ -138,6 +139,7 @@ async def list_computation_collection_runs_page(
138139
product_name=product_name,
139140
user_id=user_id,
140141
project_ids=project_ids,
142+
filter_only_running=filter_only_running,
141143
offset=offset,
142144
limit=limit,
143145
timeout_s=_DEFAULT_TIMEOUT_S,

services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,24 @@ async def list_computation_collection_runs_page(
102102
product_name: ProductName,
103103
user_id: UserID,
104104
project_ids: list[ProjectID] | None,
105+
filter_only_running: bool = False,
105106
# pagination
106107
offset: int = 0,
107108
limit: int = 20,
108109
) -> ComputationCollectionRunRpcGetPage:
109110
comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine)
111+
112+
collection_run_ids: list[CollectionRunID] | None = None
113+
if filter_only_running is True:
114+
collection_run_ids = await comp_runs_repo.list_all_collection_run_ids_for_user_currently_running_computations(
115+
product_name=product_name, user_id=user_id
116+
)
117+
110118
total, comp_runs_output = await comp_runs_repo.list_group_by_collection_run_id(
111119
product_name=product_name,
112120
user_id=user_id,
113-
project_ids=project_ids,
121+
project_ids_or_none=project_ids,
122+
collection_run_ids_or_none=collection_run_ids,
114123
offset=offset,
115124
limit=limit,
116125
)

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -377,12 +377,46 @@ async def list_for_user_and_project_all_iterations(
377377

378378
return cast(int, total_count), items
379379

380+
async def list_all_collection_run_ids_for_user_currently_running_computations(
381+
self,
382+
*,
383+
product_name: str,
384+
user_id: UserID,
385+
) -> list[CollectionRunID]:
386+
387+
list_query = (
388+
sa.select(
389+
comp_runs.c.collection_run_id,
390+
)
391+
.where(
392+
(comp_runs.c.user_id == user_id)
393+
& (
394+
comp_runs.c.metadata["product_name"].astext == product_name
395+
) # <-- NOTE: We might create a separate column for this for fast retrieval
396+
& (
397+
comp_runs.c.result.in_(
398+
[
399+
RUNNING_STATE_TO_DB[item]
400+
for item in RunningState.list_running_states()
401+
]
402+
)
403+
)
404+
)
405+
.distinct()
406+
)
407+
408+
async with pass_or_acquire_connection(self.db_engine) as conn:
409+
return [
410+
CollectionRunID(row[0]) async for row in await conn.stream(list_query)
411+
]
412+
380413
async def list_group_by_collection_run_id(
381414
self,
382415
*,
383416
product_name: str,
384417
user_id: UserID,
385-
project_ids: list[ProjectID] | None = None,
418+
project_ids_or_none: list[ProjectID] | None = None,
419+
collection_run_ids_or_none: list[CollectionRunID] | None = None,
386420
# pagination
387421
offset: int,
388422
limit: int,
@@ -408,10 +442,19 @@ async def list_group_by_collection_run_id(
408442
& (comp_runs.c.metadata["product_name"].astext == product_name)
409443
)
410444

411-
if project_ids:
445+
if project_ids_or_none:
412446
base_select_query = base_select_query.where(
413447
comp_runs.c.project_uuid.in_(
414-
[f"{project_id}" for project_id in project_ids]
448+
[f"{project_id}" for project_id in project_ids_or_none]
449+
)
450+
)
451+
if collection_run_ids_or_none:
452+
base_select_query = base_select_query.where(
453+
comp_runs.c.collection_run_id.in_(
454+
[
455+
f"{collection_run_id}"
456+
for collection_run_id in collection_run_ids_or_none
457+
]
415458
)
416459
)
417460

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,18 +265,29 @@ async def test_rpc_list_computation_collection_runs_page_and_collection_run_task
265265
not_default_collection_run_id,
266266
]
267267

268-
for proj, collection_run_id in zip(projects, collection_run_id_project_list):
268+
running_state_project_list = [
269+
RunningState.SUCCESS,
270+
RunningState.PENDING,
271+
RunningState.SUCCESS,
272+
]
273+
274+
for proj, collection_run_id, running_state in zip(
275+
projects,
276+
collection_run_id_project_list,
277+
running_state_project_list,
278+
strict=True,
279+
):
269280
await create_pipeline(
270281
project_id=f"{proj.uuid}",
271282
dag_adjacency_list=fake_workbench_adjacency,
272283
)
273284
await create_tasks_from_project(
274-
user=user, project=proj, state=StateType.PUBLISHED, progress=None
285+
user=user, project=proj, state=running_state, progress=None
275286
)
276287
run = await create_comp_run(
277288
user=user,
278289
project=proj,
279-
result=RunningState.SUCCESS,
290+
result=running_state,
280291
started=datetime.now(tz=UTC) - timedelta(minutes=120),
281292
ended=datetime.now(tz=UTC) - timedelta(minutes=100),
282293
iteration=1,
@@ -317,3 +328,16 @@ async def test_rpc_list_computation_collection_runs_page_and_collection_run_task
317328
assert output.total == 4
318329
assert len(output.items) == 4
319330
isinstance(output, ComputationCollectionRunTaskRpcGetPage)
331+
332+
# Test filtering only running collection runs
333+
output = await rpc_computations.list_computation_collection_runs_page(
334+
rpc_client,
335+
product_name="osparc",
336+
user_id=user["id"],
337+
project_ids=None,
338+
filter_only_running=True, # <-- This is the tested filter
339+
)
340+
assert output.total == 1
341+
assert len(output.items) == 1
342+
assert isinstance(output, ComputationCollectionRunRpcGetPage)
343+
assert len(output.items[0].project_ids) == 2

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1019,7 +1019,7 @@ async def test_list_group_by_collection_run_id_with_project_filter(
10191019
total_count, items = await repo.list_group_by_collection_run_id(
10201020
product_name=run_metadata.get("product_name"),
10211021
user_id=published_project_1.user["id"],
1022-
project_ids=[
1022+
project_ids_or_none=[
10231023
published_project_1.project.uuid,
10241024
published_project_2.project.uuid,
10251025
],

0 commit comments

Comments
 (0)