Skip to content
Merged
10 changes: 6 additions & 4 deletions api/specs/web-server/_computations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
from models_library.api_schemas_webserver.computations import (
ComputationGet,
ComputationPathParams,
ComputationRunIterationsLatestListQueryParams,
ComputationRunIterationsListQueryParams,
ComputationRunPathParams,
ComputationRunRestGet,
ComputationRunWithFiltersListQueryParams,
ComputationStart,
ComputationStarted,
ComputationTaskRestGet,
)
from models_library.generics import Envelope
from simcore_service_webserver._meta import API_VTAG
from simcore_service_webserver.director_v2._controller.computations_rest import (
ComputationRunListQueryParams,
ComputationTaskListQueryParams,
ComputationTaskPathParams,
)
Expand Down Expand Up @@ -71,7 +71,9 @@ async def stop_computation(_path: Annotated[ComputationPathParams, Depends()]):
response_model=Page[ComputationRunRestGet],
)
async def list_computations_latest_iteration(
_query: Annotated[as_query(ComputationRunWithFiltersListQueryParams), Depends()],
_query: Annotated[
as_query(ComputationRunIterationsLatestListQueryParams), Depends()
],
): ...


Expand All @@ -80,7 +82,7 @@ async def list_computations_latest_iteration(
response_model=Page[ComputationRunRestGet],
)
async def list_computation_iterations(
_query: Annotated[as_query(ComputationRunListQueryParams), Depends()],
_query: Annotated[as_query(ComputationRunIterationsListQueryParams), Depends()],
_path: Annotated[ComputationRunPathParams, Depends()],
): ...

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,20 @@ class ComputationRunListQueryParams(
): ...


class ComputationRunWithFiltersListQueryParams(ComputationRunListQueryParams):
class ComputationRunIterationsLatestListQueryParams(ComputationRunListQueryParams):
filter_only_running: bool = Field(
default=False,
description="If true, only running computations are returned",
)


class ComputationRunIterationsListQueryParams(ComputationRunListQueryParams):
include_children: bool = Field(
default=False,
description="If true, all computational runs of the project and its children are returned (Currently supported only for root projects)",
)


class ComputationRunRestGet(OutputSchema):
project_uuid: ProjectID
iteration: int
Expand Down Expand Up @@ -128,7 +135,11 @@ class ComputationTaskPathParams(BaseModel):
class ComputationTaskListQueryParams(
PageQueryParameters,
ComputationTaskListOrderParams, # type: ignore[misc, valid-type]
): ...
):
include_children: bool = Field(
default=False,
description="If true, all tasks of the project and its children are returned (Currently supported only for root projects)",
)


class ComputationTaskRestGet(OutputSchema):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""add index to projects_metadata

Revision ID: 4e7d8719855b
Revises: ba9c4816a31b
Create Date: 2025-05-21 11:48:34.062860+00:00

"""

from alembic import op

# revision identifiers, used by Alembic.
revision = "4e7d8719855b"
down_revision = "ba9c4816a31b"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_index(
"idx_projects_metadata_root_parent_project_uuid",
"projects_metadata",
["root_parent_project_uuid"],
unique=False,
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"idx_projects_metadata_root_parent_project_uuid", table_name="projects_metadata"
)
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
These tables were designed to be controled by projects-plugin in
the webserver's service
These tables were designed to be controled by projects-plugin in
the webserver's service
"""

import sqlalchemy as sa
Expand Down Expand Up @@ -100,6 +100,10 @@
ondelete=RefActions.SET_NULL,
name="fk_projects_metadata_root_parent_node_id",
),
#######
sa.Index(
"idx_projects_metadata_root_parent_project_uuid", "root_parent_project_uuid"
),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def list_computations_iterations_page(
*,
product_name: ProductName,
user_id: UserID,
project_id: ProjectID,
project_ids: list[ProjectID],
# pagination
offset: int = 0,
limit: int = 20,
Expand All @@ -76,7 +76,7 @@ async def list_computations_iterations_page(
_RPC_METHOD_NAME_ADAPTER.validate_python("list_computations_iterations_page"),
product_name=product_name,
user_id=user_id,
project_id=project_id,
project_ids=project_ids,
offset=offset,
limit=limit,
order_by=order_by,
Expand All @@ -92,7 +92,7 @@ async def list_computations_latest_iteration_tasks_page(
*,
product_name: ProductName,
user_id: UserID,
project_id: ProjectID,
project_ids: list[ProjectID],
# pagination
offset: int = 0,
limit: int = 20,
Expand All @@ -106,7 +106,7 @@ async def list_computations_latest_iteration_tasks_page(
),
product_name=product_name,
user_id=user_id,
project_id=project_id,
project_ids=project_ids,
offset=offset,
limit=limit,
order_by=order_by,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class WalletTransactionError(OsparcErrorMixin, Exception):
msg_template = "{msg}"


class CreditTransactionNotFoundError(OsparcErrorMixin, Exception): ...
class CreditTransactionNotFoundError(OsparcErrorMixin, Exception):
msg_template = "Credit transaction for service run id {service_run_id} not found."


### Pricing Plans Error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def list_computations_iterations_page(
*,
product_name: ProductName,
user_id: UserID,
project_id: ProjectID,
project_ids: list[ProjectID],
# pagination
offset: int = 0,
limit: int = 20,
Expand All @@ -71,7 +71,7 @@ async def list_computations_iterations_page(
await comp_runs_repo.list_for_user_and_project_all_iterations(
product_name=product_name,
user_id=user_id,
project_id=project_id,
project_ids=project_ids,
offset=offset,
limit=limit,
order_by=order_by,
Expand All @@ -84,12 +84,12 @@ async def list_computations_iterations_page(


async def _fetch_task_log(
user_id: UserID, project_id: ProjectID, task: ComputationTaskForRpcDBGet
user_id: UserID, task: ComputationTaskForRpcDBGet
) -> TaskLogFileGet | None:
if not task.state.is_running():
return await dask_utils.get_task_log_file(
user_id=user_id,
project_id=project_id,
project_id=task.project_uuid,
node_id=task.node_id,
)
return None
Expand All @@ -101,7 +101,7 @@ async def list_computations_latest_iteration_tasks_page(
*,
product_name: ProductName,
user_id: UserID,
project_id: ProjectID,
project_ids: list[ProjectID],
# pagination
offset: int = 0,
limit: int = 20,
Expand All @@ -114,20 +114,30 @@ async def list_computations_latest_iteration_tasks_page(
comp_tasks_repo = CompTasksRepository.instance(db_engine=app.state.engine)
comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine)

comp_latest_run = await comp_runs_repo.get(
user_id=user_id, project_id=project_id, iteration=None # Returns last iteration
)

total, comp_tasks = await comp_tasks_repo.list_computational_tasks_rpc_domain(
project_id=project_id,
project_ids=project_ids,
offset=offset,
limit=limit,
order_by=order_by,
)

# Get unique set of all project_uuids from comp_tasks
unique_project_uuids = {task.project_uuid for task in comp_tasks}

# Fetch latest run for each project concurrently
latest_runs = await limited_gather(
*[
comp_runs_repo.get(user_id=user_id, project_id=project_uuid, iteration=None)
for project_uuid in unique_project_uuids
],
limit=20,
)
# Build a dict: project_uuid -> iteration
project_uuid_to_iteration = {run.project_uuid: run.iteration for run in latest_runs}

# Run all log fetches concurrently
log_files = await limited_gather(
*[_fetch_task_log(user_id, project_id, task) for task in comp_tasks],
*[_fetch_task_log(user_id, task) for task in comp_tasks],
limit=20,
)

Expand All @@ -142,7 +152,10 @@ async def list_computations_latest_iteration_tasks_page(
ended_at=task.ended_at,
log_download_link=log_file.download_link if log_file else None,
service_run_id=ServiceRunID.get_resource_tracking_run_id_for_computational(
user_id, project_id, task.node_id, comp_latest_run.iteration
user_id,
task.project_uuid,
task.node_id,
project_uuid_to_iteration[task.project_uuid],
),
)
for task, log_file in zip(comp_tasks, log_files, strict=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ async def list_for_user_and_project_all_iterations(
*,
product_name: str,
user_id: UserID,
project_id: ProjectID,
project_ids: list[ProjectID],
# pagination
offset: int,
limit: int,
Expand All @@ -309,7 +309,11 @@ async def list_for_user_and_project_all_iterations(
*self._COMPUTATION_RUNS_RPC_GET_COLUMNS,
).where(
(comp_runs.c.user_id == user_id)
& (comp_runs.c.project_uuid == f"{project_id}")
& (
comp_runs.c.project_uuid.in_(
[f"{project_id}" for project_id in project_ids]
)
)
& (
comp_runs.c.metadata["product_name"].astext == product_name
) # <-- NOTE: We might create a separate column for this for fast retrieval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def list_computational_tasks(
async def list_computational_tasks_rpc_domain(
self,
*,
project_id: ProjectID,
project_ids: list[ProjectID],
# pagination
offset: int = 0,
limit: int = 20,
Expand All @@ -100,7 +100,11 @@ async def list_computational_tasks_rpc_domain(
)
.select_from(comp_tasks)
.where(
(comp_tasks.c.project_id == f"{project_id}")
(
comp_tasks.c.project_id.in_(
[f"{project_id}" for project_id in project_ids]
)
)
& (comp_tasks.c.node_class == NodeClass.COMPUTATIONAL)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def test_rpc_list_computation_runs_and_tasks(
# Tasks

output = await rpc_computations.list_computations_latest_iteration_tasks_page(
rpc_client, product_name="osparc", user_id=user["id"], project_id=proj.uuid
rpc_client, product_name="osparc", user_id=user["id"], project_ids=[proj.uuid]
)
assert output
assert output.total == 4
Expand Down Expand Up @@ -201,7 +201,7 @@ async def test_rpc_list_computation_runs_history(
)

output = await rpc_computations.list_computations_iterations_page(
rpc_client, product_name="osparc", user_id=user["id"], project_id=proj.uuid
rpc_client, product_name="osparc", user_id=user["id"], project_ids=[proj.uuid]
)
assert output.total == 3
assert isinstance(output, ComputationRunRpcGetPage)
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def get_transaction_current_credits_by_service_run_id(
resource_tracker_credit_transactions.c.service_run_id == f"{service_run_id}"
)
result = await conn.execute(select_stmt)
row = result.first()
row = result.one_or_none()
if row is None:
raise CreditTransactionNotFoundError
raise CreditTransactionNotFoundError(service_run_id=service_run_id)
return Decimal(row[0])
Original file line number Diff line number Diff line change
Expand Up @@ -2619,6 +2619,13 @@ paths:
type: integer
default: 0
title: Offset
- name: include_children
in: query
required: false
schema:
type: boolean
default: false
title: Include Children
responses:
'200':
description: Successful Response
Expand Down Expand Up @@ -2664,6 +2671,13 @@ paths:
type: integer
default: 0
title: Offset
- name: include_children
in: query
required: false
schema:
type: boolean
default: false
title: Include Children
responses:
'200':
description: Successful Response
Expand Down
Loading
Loading