Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
44767c0
new feature
matusdrobuliak66 Jul 1, 2025
ee061d4
add first batch of unit tests
matusdrobuliak66 Jul 2, 2025
33ef9db
fix test
matusdrobuliak66 Jul 2, 2025
895ca6c
additional tests
matusdrobuliak66 Jul 2, 2025
641494d
fix test
matusdrobuliak66 Jul 2, 2025
a67b0bc
Merge branch 'master' into add-project-grouping-for-task-manager
matusdrobuliak66 Jul 2, 2025
d6bb65a
fix test
matusdrobuliak66 Jul 2, 2025
744e8c2
Merge branch 'add-project-grouping-for-task-manager' of github.com:ma…
matusdrobuliak66 Jul 2, 2025
e2e3da0
autogenerae alembic for funcapi read funcstions
matusdrobuliak66 Jul 2, 2025
c0b1e1b
Merge branch 'master' into add-project-grouping-for-task-manager
matusdrobuliak66 Jul 3, 2025
a5b1ecc
fixes
matusdrobuliak66 Jul 3, 2025
50527ee
fixes tests in postgres package
matusdrobuliak66 Jul 3, 2025
b0cbcc2
removing redundant comments
matusdrobuliak66 Jul 3, 2025
152c7e3
adding final tests
matusdrobuliak66 Jul 3, 2025
254ed05
Merge branch 'master' into add-project-grouping-for-task-manager
matusdrobuliak66 Jul 3, 2025
c177512
open api specs
matusdrobuliak66 Jul 3, 2025
6c6283f
removing redundant comments
matusdrobuliak66 Jul 3, 2025
397ee77
increase version
matusdrobuliak66 Jul 3, 2025
00c046a
merge master
matusdrobuliak66 Jul 3, 2025
7933bbb
review @sanderegg
matusdrobuliak66 Jul 3, 2025
d532646
fix
matusdrobuliak66 Jul 3, 2025
713e9ab
merge master - resolve conflicts
matusdrobuliak66 Jul 4, 2025
5675329
review @sanderegg
matusdrobuliak66 Jul 4, 2025
6e823e4
fix
matusdrobuliak66 Jul 4, 2025
d939878
fix
matusdrobuliak66 Jul 4, 2025
ee183b9
open api specs
matusdrobuliak66 Jul 4, 2025
fbb6cac
review @pcrespov
matusdrobuliak66 Jul 4, 2025
e22f580
fix
matusdrobuliak66 Jul 4, 2025
6c283aa
fix
matusdrobuliak66 Jul 4, 2025
6ccf7ef
fix
matusdrobuliak66 Jul 4, 2025
4a133f9
fix
matusdrobuliak66 Jul 4, 2025
4692f78
Merge branch 'master' into add-project-grouping-for-task-manager
matusdrobuliak66 Jul 4, 2025
3c5cbb8
fix
matusdrobuliak66 Jul 4, 2025
dedc4ff
fix
matusdrobuliak66 Jul 4, 2025
f9e27bf
fix tests
matusdrobuliak66 Jul 4, 2025
13859f4
Merge branch 'master' into add-project-grouping-for-task-manager
matusdrobuliak66 Jul 4, 2025
475be6a
fix migration
matusdrobuliak66 Jul 4, 2025
a85096b
Merge branch 'master' into add-project-grouping-for-task-manager
matusdrobuliak66 Jul 4, 2025
731bc9e
fix missue with 422
matusdrobuliak66 Jul 7, 2025
27e2485
Merge branch 'master' into add-project-grouping-for-task-manager
matusdrobuliak66 Jul 7, 2025
5bb60dd
fix comment
matusdrobuliak66 Jul 7, 2025
db2efc4
add filter filter_only_running - director-v2
matusdrobuliak66 Jul 7, 2025
b765962
fix tests
matusdrobuliak66 Jul 7, 2025
cb8ba1f
Merge branch 'master' into add-project-grouping-for-task-manager
matusdrobuliak66 Jul 7, 2025
b129044
fix
matusdrobuliak66 Jul 7, 2025
ca93f7a
Merge branches 'add-project-grouping-for-task-manager' and 'add-proje…
matusdrobuliak66 Jul 7, 2025
1dffca2
Merge branch 'add-project-grouping-for-task-manager' into add-filter-…
matusdrobuliak66 Jul 7, 2025
b898dc1
merge master
matusdrobuliak66 Jul 7, 2025
5f116fe
Merge branch 'master' into add-filter-on-active-jobs
matusdrobuliak66 Jul 7, 2025
1cf0045
merge master
matusdrobuliak66 Jul 7, 2025
0e5ffed
delete migration
matusdrobuliak66 Jul 7, 2025
7641846
fix after merge conflict
matusdrobuliak66 Jul 7, 2025
0260db4
fix after merge conflict
matusdrobuliak66 Jul 7, 2025
166f1c4
add unit tests to webserver
matusdrobuliak66 Jul 7, 2025
bf57620
fix
matusdrobuliak66 Jul 7, 2025
536ef25
Merge branch 'master' into add-filter-on-active-jobs
matusdrobuliak66 Jul 7, 2025
9d6a2e9
Merge branch 'master' into add-filter-on-active-jobs
matusdrobuliak66 Jul 7, 2025
4476cac
Merge branch 'master' into add-filter-on-active-jobs
matusdrobuliak66 Jul 8, 2025
1da768b
fix
matusdrobuliak66 Jul 8, 2025
b6369b4
migration
matusdrobuliak66 Jul 8, 2025
517ffe7
add test
matusdrobuliak66 Jul 8, 2025
cbe7831
Merge branch 'master' into add-filter-on-active-jobs
matusdrobuliak66 Jul 8, 2025
aba0be6
...
matusdrobuliak66 Jul 8, 2025
a826a44
Merge branch 'master' into add-filter-on-active-jobs
matusdrobuliak66 Jul 9, 2025
8e71386
Merge branch 'master' into add-filter-on-active-jobs
matusdrobuliak66 Jul 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""computational collection uniquencess

Revision ID: 61b98a60e934
Revises: df61d1b2b967
Create Date: 2025-07-08 15:40:12.714684+00:00

"""

from alembic import op

# revision identifiers, used by Alembic.
revision = "61b98a60e934"
down_revision = "df61d1b2b967"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_unique_constraint(
"client_or_system_generated_id_uniqueness",
"comp_runs_collections",
["client_or_system_generated_id"],
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"client_or_system_generated_id_uniqueness",
"comp_runs_collections",
type_="unique",
)
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@
"ix_comp_runs_collections_client_or_system_generated_id",
"client_or_system_generated_id",
),
sa.UniqueConstraint(
"client_or_system_generated_id", name="client_or_system_generated_id_uniqueness"
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ async def list_computation_collection_runs_page(
product_name: ProductName,
user_id: UserID,
project_ids: list[ProjectID] | None,
filter_only_running: bool = False,
# pagination
offset: int = 0,
limit: int = 20,
Expand All @@ -138,6 +139,7 @@ async def list_computation_collection_runs_page(
product_name=product_name,
user_id=user_id,
project_ids=project_ids,
filter_only_running=filter_only_running,
offset=offset,
limit=limit,
timeout_s=_DEFAULT_TIMEOUT_S,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,24 @@ async def list_computation_collection_runs_page(
product_name: ProductName,
user_id: UserID,
project_ids: list[ProjectID] | None,
filter_only_running: bool = False,
# pagination
offset: int = 0,
limit: int = 20,
) -> ComputationCollectionRunRpcGetPage:
comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine)

collection_run_ids: list[CollectionRunID] | None = None
if filter_only_running is True:
collection_run_ids = await comp_runs_repo.list_all_collection_run_ids_for_user_currently_running_computations(
product_name=product_name, user_id=user_id
)

total, comp_runs_output = await comp_runs_repo.list_group_by_collection_run_id(
product_name=product_name,
user_id=user_id,
project_ids=project_ids,
project_ids_or_none=project_ids,
collection_run_ids_or_none=collection_run_ids,
offset=offset,
limit=limit,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,12 +377,46 @@ async def list_for_user_and_project_all_iterations(

return cast(int, total_count), items

async def list_all_collection_run_ids_for_user_currently_running_computations(
self,
*,
product_name: str,
user_id: UserID,
) -> list[CollectionRunID]:

list_query = (
sa.select(
comp_runs.c.collection_run_id,
)
.where(
(comp_runs.c.user_id == user_id)
& (
comp_runs.c.metadata["product_name"].astext == product_name
) # <-- NOTE: We might create a separate column for this for fast retrieval
& (
comp_runs.c.result.in_(
[
RUNNING_STATE_TO_DB[item]
for item in RunningState.list_running_states()
]
)
)
)
.distinct()
)

async with pass_or_acquire_connection(self.db_engine) as conn:
return [
CollectionRunID(row[0]) async for row in await conn.stream(list_query)
]

async def list_group_by_collection_run_id(
self,
*,
product_name: str,
user_id: UserID,
project_ids: list[ProjectID] | None = None,
project_ids_or_none: list[ProjectID] | None = None,
collection_run_ids_or_none: list[CollectionRunID] | None = None,
# pagination
offset: int,
limit: int,
Expand All @@ -408,10 +442,19 @@ async def list_group_by_collection_run_id(
& (comp_runs.c.metadata["product_name"].astext == product_name)
)

if project_ids:
if project_ids_or_none:
base_select_query = base_select_query.where(
comp_runs.c.project_uuid.in_(
[f"{project_id}" for project_id in project_ids]
[f"{project_id}" for project_id in project_ids_or_none]
)
)
if collection_run_ids_or_none:
base_select_query = base_select_query.where(
comp_runs.c.collection_run_id.in_(
[
f"{collection_run_id}"
for collection_run_id in collection_run_ids_or_none
]
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,18 +265,29 @@ async def test_rpc_list_computation_collection_runs_page_and_collection_run_task
not_default_collection_run_id,
]

for proj, collection_run_id in zip(projects, collection_run_id_project_list):
running_state_project_list = [
RunningState.SUCCESS,
RunningState.PENDING,
RunningState.SUCCESS,
]

for proj, collection_run_id, running_state in zip(
projects,
collection_run_id_project_list,
running_state_project_list,
strict=True,
):
await create_pipeline(
project_id=f"{proj.uuid}",
dag_adjacency_list=fake_workbench_adjacency,
)
await create_tasks_from_project(
user=user, project=proj, state=StateType.PUBLISHED, progress=None
user=user, project=proj, state=running_state, progress=None
)
run = await create_comp_run(
user=user,
project=proj,
result=RunningState.SUCCESS,
result=running_state,
started=datetime.now(tz=UTC) - timedelta(minutes=120),
ended=datetime.now(tz=UTC) - timedelta(minutes=100),
iteration=1,
Expand Down Expand Up @@ -317,3 +328,16 @@ async def test_rpc_list_computation_collection_runs_page_and_collection_run_task
assert output.total == 4
assert len(output.items) == 4
isinstance(output, ComputationCollectionRunTaskRpcGetPage)

# Test filtering only running collection runs
output = await rpc_computations.list_computation_collection_runs_page(
rpc_client,
product_name="osparc",
user_id=user["id"],
project_ids=None,
filter_only_running=True, # <-- This is the tested filter
)
assert output.total == 1
assert len(output.items) == 1
assert isinstance(output, ComputationCollectionRunRpcGetPage)
assert len(output.items[0].project_ids) == 2
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ async def test_list_group_by_collection_run_id_with_project_filter(
total_count, items = await repo.list_group_by_collection_run_id(
product_name=run_metadata.get("product_name"),
user_id=published_project_1.user["id"],
project_ids=[
project_ids_or_none=[
published_project_1.project.uuid,
published_project_2.project.uuid,
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,7 @@ qx.Class.define("osparc.data.Resources", {
endpoints: {
getPageLatest: {
method: "GET",
// uncomment this when the backend supports filtering by runningOnly #8055
// url: statics.API + "/computation-collection-runs?offset={offset}&limit={limit}&order_by={orderBy}&filter_only_running={runningOnly}"
url: statics.API + "/computation-collection-runs?offset={offset}&limit={limit}&order_by={orderBy}&filter_only_running=false"
url: statics.API + "/computation-collection-runs?offset={offset}&limit={limit}&order_by={orderBy}&filter_only_running={runningOnly}"
},
getPageHistory: {
method: "GET",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pydantic import TypeAdapter
from simcore_postgres_database.models.comp_runs_collections import comp_runs_collections
from sqlalchemy import func
from sqlalchemy.dialects.postgresql import insert as pg_insert

from ._comp_runs_collections_models import CompRunCollectionDBGet

Expand Down Expand Up @@ -63,3 +64,30 @@ async def get_comp_run_collection_or_none_by_client_generated_id(
if row is None:
return None
return CompRunCollectionDBGet.model_validate(row)


async def upsert_comp_run_collection(
conn,
client_or_system_generated_id: str,
client_or_system_generated_display_name: str,
is_generated_by_system: bool,
) -> CollectionRunID:
"""Upsert a computational run collection. If it exists, only update the modified time."""
insert_stmt = pg_insert(comp_runs_collections).values(
client_or_system_generated_id=client_or_system_generated_id,
client_or_system_generated_display_name=client_or_system_generated_display_name,
is_generated_by_system=is_generated_by_system,
created=func.now(),
modified=func.now(),
)
on_update_stmt = insert_stmt.on_conflict_do_update(
index_elements=[comp_runs_collections.c.client_or_system_generated_id],
set_={
"modified": func.now(),
},
)
result = await conn.stream(
on_update_stmt.returning(comp_runs_collections.c.collection_run_id)
)
collection_id_tuple: tuple[UUID] = await result.one()
return TypeAdapter(CollectionRunID).validate_python(collection_id_tuple[0])
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@ async def create_comp_run_collection(
)


async def upsert_comp_run_collection(
app: web.Application,
*,
client_or_system_generated_id: str,
client_or_system_generated_display_name: str,
is_generated_by_system: bool,
) -> CollectionRunID:
async with transaction_context(get_asyncpg_engine(app)) as conn:
return await _comp_runs_collections_repository.upsert_comp_run_collection(
conn=conn,
client_or_system_generated_id=client_or_system_generated_id,
client_or_system_generated_display_name=client_or_system_generated_display_name,
is_generated_by_system=is_generated_by_system,
)


async def get_comp_run_collection_or_none_by_id(
app: web.Application, *, collection_run_id: CollectionRunID
) -> CompRunCollectionDBGet | None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ async def list_computation_collection_runs(
user_id: UserID,
# filters
filter_by_root_project_id: ProjectID | None = None,
filter_only_running: bool = False,
# pagination
offset: int,
limit: NonNegativeInt,
Expand All @@ -345,6 +346,7 @@ async def list_computation_collection_runs(
product_name=product_name,
user_id=user_id,
project_ids=child_projects_with_root,
filter_only_running=filter_only_running,
offset=offset,
limit=limit,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,13 @@ async def list_computation_collection_runs(request: web.Request) -> web.Response
)
)

if query_params.filter_only_running is True:
raise NotImplementedError

total, items = await _computations_service.list_computation_collection_runs(
request.app,
product_name=req_ctx.product_name,
user_id=req_ctx.user_id,
# filters
filter_by_root_project_id=query_params.filter_by_root_project_id,
filter_only_running=query_params.filter_only_running,
# pagination
offset=query_params.offset,
limit=query_params.limit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async def start_computation(request: web.Request) -> web.Response:
comp_run_collection: CompRunCollectionDBGet | None = None
if group_id_or_none:
comp_run_collection = await _comp_runs_collections_service.get_comp_run_collection_or_none_by_client_generated_id(
request.app, client_or_system_generated_id=group_id_or_none # type: ignore
request.app, client_or_system_generated_id=str(group_id_or_none)
)
if comp_run_collection is not None:
created_at: datetime = comp_run_collection.created
Expand All @@ -111,10 +111,10 @@ async def start_computation(request: web.Request) -> web.Response:
client_or_system_generated_id = f"{group_id_or_none}"
group_name = custom_metadata.get("group_name", "No Group Name")

collection_run_id = await _comp_runs_collections_service.create_comp_run_collection(
collection_run_id = await _comp_runs_collections_service.upsert_comp_run_collection(
request.app,
client_or_system_generated_id=client_or_system_generated_id,
client_or_system_generated_display_name=group_name, # type: ignore
client_or_system_generated_display_name=str(group_name),
is_generated_by_system=is_generated_by_system,
)

Expand Down
Loading
Loading