Skip to content

Commit 0480725

Browse files
🎨 Add filter to show only running jobs in Activity Overview (#8055)
1 parent aa0cd2f commit 0480725

File tree

15 files changed

+283
-18
lines changed

15 files changed

+283
-18
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""computational collection uniquencess
2+
3+
Revision ID: 61b98a60e934
4+
Revises: df61d1b2b967
5+
Create Date: 2025-07-08 15:40:12.714684+00:00
6+
7+
"""
8+
9+
from alembic import op
10+
11+
# revision identifiers, used by Alembic.
12+
revision = "61b98a60e934"
13+
down_revision = "df61d1b2b967"
14+
branch_labels = None
15+
depends_on = None
16+
17+
18+
def upgrade():
19+
# ### commands auto generated by Alembic - please adjust! ###
20+
op.create_unique_constraint(
21+
"client_or_system_generated_id_uniqueness",
22+
"comp_runs_collections",
23+
["client_or_system_generated_id"],
24+
)
25+
# ### end Alembic commands ###
26+
27+
28+
def downgrade():
29+
# ### commands auto generated by Alembic - please adjust! ###
30+
op.drop_constraint(
31+
"client_or_system_generated_id_uniqueness",
32+
"comp_runs_collections",
33+
type_="unique",
34+
)
35+
# ### end Alembic commands ###

packages/postgres-database/src/simcore_postgres_database/models/comp_runs_collections.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,7 @@
3535
"ix_comp_runs_collections_client_or_system_generated_id",
3636
"client_or_system_generated_id",
3737
),
38+
sa.UniqueConstraint(
39+
"client_or_system_generated_id", name="client_or_system_generated_id_uniqueness"
40+
),
3841
)

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ async def list_computation_collection_runs_page(
126126
product_name: ProductName,
127127
user_id: UserID,
128128
project_ids: list[ProjectID] | None,
129+
filter_only_running: bool = False,
129130
# pagination
130131
offset: int = 0,
131132
limit: int = 20,
@@ -138,6 +139,7 @@ async def list_computation_collection_runs_page(
138139
product_name=product_name,
139140
user_id=user_id,
140141
project_ids=project_ids,
142+
filter_only_running=filter_only_running,
141143
offset=offset,
142144
limit=limit,
143145
timeout_s=_DEFAULT_TIMEOUT_S,

services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,24 @@ async def list_computation_collection_runs_page(
102102
product_name: ProductName,
103103
user_id: UserID,
104104
project_ids: list[ProjectID] | None,
105+
filter_only_running: bool = False,
105106
# pagination
106107
offset: int = 0,
107108
limit: int = 20,
108109
) -> ComputationCollectionRunRpcGetPage:
109110
comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine)
111+
112+
collection_run_ids: list[CollectionRunID] | None = None
113+
if filter_only_running is True:
114+
collection_run_ids = await comp_runs_repo.list_all_collection_run_ids_for_user_currently_running_computations(
115+
product_name=product_name, user_id=user_id
116+
)
117+
110118
total, comp_runs_output = await comp_runs_repo.list_group_by_collection_run_id(
111119
product_name=product_name,
112120
user_id=user_id,
113-
project_ids=project_ids,
121+
project_ids_or_none=project_ids,
122+
collection_run_ids_or_none=collection_run_ids,
114123
offset=offset,
115124
limit=limit,
116125
)

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -377,12 +377,46 @@ async def list_for_user_and_project_all_iterations(
377377

378378
return cast(int, total_count), items
379379

380+
async def list_all_collection_run_ids_for_user_currently_running_computations(
381+
self,
382+
*,
383+
product_name: str,
384+
user_id: UserID,
385+
) -> list[CollectionRunID]:
386+
387+
list_query = (
388+
sa.select(
389+
comp_runs.c.collection_run_id,
390+
)
391+
.where(
392+
(comp_runs.c.user_id == user_id)
393+
& (
394+
comp_runs.c.metadata["product_name"].astext == product_name
395+
) # <-- NOTE: We might create a separate column for this for fast retrieval
396+
& (
397+
comp_runs.c.result.in_(
398+
[
399+
RUNNING_STATE_TO_DB[item]
400+
for item in RunningState.list_running_states()
401+
]
402+
)
403+
)
404+
)
405+
.distinct()
406+
)
407+
408+
async with pass_or_acquire_connection(self.db_engine) as conn:
409+
return [
410+
CollectionRunID(row[0]) async for row in await conn.stream(list_query)
411+
]
412+
380413
async def list_group_by_collection_run_id(
381414
self,
382415
*,
383416
product_name: str,
384417
user_id: UserID,
385-
project_ids: list[ProjectID] | None = None,
418+
project_ids_or_none: list[ProjectID] | None = None,
419+
collection_run_ids_or_none: list[CollectionRunID] | None = None,
386420
# pagination
387421
offset: int,
388422
limit: int,
@@ -408,10 +442,19 @@ async def list_group_by_collection_run_id(
408442
& (comp_runs.c.metadata["product_name"].astext == product_name)
409443
)
410444

411-
if project_ids:
445+
if project_ids_or_none:
412446
base_select_query = base_select_query.where(
413447
comp_runs.c.project_uuid.in_(
414-
[f"{project_id}" for project_id in project_ids]
448+
[f"{project_id}" for project_id in project_ids_or_none]
449+
)
450+
)
451+
if collection_run_ids_or_none:
452+
base_select_query = base_select_query.where(
453+
comp_runs.c.collection_run_id.in_(
454+
[
455+
f"{collection_run_id}"
456+
for collection_run_id in collection_run_ids_or_none
457+
]
415458
)
416459
)
417460

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,18 +265,29 @@ async def test_rpc_list_computation_collection_runs_page_and_collection_run_task
265265
not_default_collection_run_id,
266266
]
267267

268-
for proj, collection_run_id in zip(projects, collection_run_id_project_list):
268+
running_state_project_list = [
269+
RunningState.SUCCESS,
270+
RunningState.PENDING,
271+
RunningState.SUCCESS,
272+
]
273+
274+
for proj, collection_run_id, running_state in zip(
275+
projects,
276+
collection_run_id_project_list,
277+
running_state_project_list,
278+
strict=True,
279+
):
269280
await create_pipeline(
270281
project_id=f"{proj.uuid}",
271282
dag_adjacency_list=fake_workbench_adjacency,
272283
)
273284
await create_tasks_from_project(
274-
user=user, project=proj, state=StateType.PUBLISHED, progress=None
285+
user=user, project=proj, state=running_state, progress=None
275286
)
276287
run = await create_comp_run(
277288
user=user,
278289
project=proj,
279-
result=RunningState.SUCCESS,
290+
result=running_state,
280291
started=datetime.now(tz=UTC) - timedelta(minutes=120),
281292
ended=datetime.now(tz=UTC) - timedelta(minutes=100),
282293
iteration=1,
@@ -317,3 +328,16 @@ async def test_rpc_list_computation_collection_runs_page_and_collection_run_task
317328
assert output.total == 4
318329
assert len(output.items) == 4
319330
isinstance(output, ComputationCollectionRunTaskRpcGetPage)
331+
332+
# Test filtering only running collection runs
333+
output = await rpc_computations.list_computation_collection_runs_page(
334+
rpc_client,
335+
product_name="osparc",
336+
user_id=user["id"],
337+
project_ids=None,
338+
filter_only_running=True, # <-- This is the tested filter
339+
)
340+
assert output.total == 1
341+
assert len(output.items) == 1
342+
assert isinstance(output, ComputationCollectionRunRpcGetPage)
343+
assert len(output.items[0].project_ids) == 2

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1019,7 +1019,7 @@ async def test_list_group_by_collection_run_id_with_project_filter(
10191019
total_count, items = await repo.list_group_by_collection_run_id(
10201020
product_name=run_metadata.get("product_name"),
10211021
user_id=published_project_1.user["id"],
1022-
project_ids=[
1022+
project_ids_or_none=[
10231023
published_project_1.project.uuid,
10241024
published_project_2.project.uuid,
10251025
],

services/static-webserver/client/source/class/osparc/data/Resources.js

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -368,9 +368,7 @@ qx.Class.define("osparc.data.Resources", {
368368
endpoints: {
369369
getPageLatest: {
370370
method: "GET",
371-
// uncomment this when the backend supports filtering by runningOnly #8055
372-
// url: statics.API + "/computation-collection-runs?offset={offset}&limit={limit}&order_by={orderBy}&filter_only_running={runningOnly}"
373-
url: statics.API + "/computation-collection-runs?offset={offset}&limit={limit}&order_by={orderBy}&filter_only_running=false"
371+
url: statics.API + "/computation-collection-runs?offset={offset}&limit={limit}&order_by={orderBy}&filter_only_running={runningOnly}"
374372
},
375373
getPageHistory: {
376374
method: "GET",

services/web/server/src/simcore_service_webserver/director_v2/_comp_runs_collections_repository.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from pydantic import TypeAdapter
66
from simcore_postgres_database.models.comp_runs_collections import comp_runs_collections
77
from sqlalchemy import func
8+
from sqlalchemy.dialects.postgresql import insert as pg_insert
89

910
from ._comp_runs_collections_models import CompRunCollectionDBGet
1011

@@ -63,3 +64,30 @@ async def get_comp_run_collection_or_none_by_client_generated_id(
6364
if row is None:
6465
return None
6566
return CompRunCollectionDBGet.model_validate(row)
67+
68+
69+
async def upsert_comp_run_collection(
70+
conn,
71+
client_or_system_generated_id: str,
72+
client_or_system_generated_display_name: str,
73+
is_generated_by_system: bool,
74+
) -> CollectionRunID:
75+
"""Upsert a computational run collection. If it exists, only update the modified time."""
76+
insert_stmt = pg_insert(comp_runs_collections).values(
77+
client_or_system_generated_id=client_or_system_generated_id,
78+
client_or_system_generated_display_name=client_or_system_generated_display_name,
79+
is_generated_by_system=is_generated_by_system,
80+
created=func.now(),
81+
modified=func.now(),
82+
)
83+
on_update_stmt = insert_stmt.on_conflict_do_update(
84+
index_elements=[comp_runs_collections.c.client_or_system_generated_id],
85+
set_={
86+
"modified": func.now(),
87+
},
88+
)
89+
result = await conn.stream(
90+
on_update_stmt.returning(comp_runs_collections.c.collection_run_id)
91+
)
92+
collection_id_tuple: tuple[UUID] = await result.one()
93+
return TypeAdapter(CollectionRunID).validate_python(collection_id_tuple[0])

services/web/server/src/simcore_service_webserver/director_v2/_comp_runs_collections_service.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,22 @@ async def create_comp_run_collection(
2727
)
2828

2929

30+
async def upsert_comp_run_collection(
31+
app: web.Application,
32+
*,
33+
client_or_system_generated_id: str,
34+
client_or_system_generated_display_name: str,
35+
is_generated_by_system: bool,
36+
) -> CollectionRunID:
37+
async with transaction_context(get_asyncpg_engine(app)) as conn:
38+
return await _comp_runs_collections_repository.upsert_comp_run_collection(
39+
conn=conn,
40+
client_or_system_generated_id=client_or_system_generated_id,
41+
client_or_system_generated_display_name=client_or_system_generated_display_name,
42+
is_generated_by_system=is_generated_by_system,
43+
)
44+
45+
3046
async def get_comp_run_collection_or_none_by_id(
3147
app: web.Application, *, collection_run_id: CollectionRunID
3248
) -> CompRunCollectionDBGet | None:

0 commit comments

Comments
 (0)