Skip to content
Merged
17 changes: 15 additions & 2 deletions api/specs/web-server/_computations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

from _common import as_query
from fastapi import APIRouter, Depends, status
from fastapi_pagination import Page
from models_library.api_schemas_webserver.computations import (
ComputationGet,
ComputationPathParams,
ComputationRunPathParams,
ComputationRunRestGet,
ComputationRunWithFiltersListQueryParams,
ComputationStart,
ComputationStarted,
ComputationTaskRestGet,
Expand Down Expand Up @@ -65,16 +68,26 @@ async def stop_computation(_path: Annotated[ComputationPathParams, Depends()]):

@router.get(
"/computations/-/iterations/latest",
response_model=Envelope[list[ComputationRunRestGet]],
response_model=Page[ComputationRunRestGet],
)
async def list_computations_latest_iteration(
_query: Annotated[as_query(ComputationRunWithFiltersListQueryParams), Depends()],
): ...


@router.get(
"/computations/{project_id}/iterations",
response_model=Page[ComputationRunRestGet],
)
async def list_computation_iterations(
_query: Annotated[as_query(ComputationRunListQueryParams), Depends()],
_path: Annotated[ComputationRunPathParams, Depends()],
): ...


@router.get(
"/computations/{project_id}/iterations/latest/tasks",
response_model=Envelope[list[ComputationTaskRestGet]],
response_model=Page[ComputationTaskRestGet],
)
async def list_computations_latest_iteration_tasks(
_query: Annotated[as_query(ComputationTaskListQueryParams), Depends()],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ class ComputationStarted(OutputSchemaWithoutCamelCase):
class ComputationRunListQueryParams(
PageQueryParameters,
ComputationRunListOrderParams, # type: ignore[misc, valid-type]
):
): ...


class ComputationRunWithFiltersListQueryParams(ComputationRunListQueryParams):
filter_only_running: bool = Field(
default=False,
description="If true, only running computations are returned",
Expand All @@ -100,6 +103,11 @@ class ComputationRunRestGet(OutputSchema):
project_custom_metadata: dict[str, Any]


class ComputationRunPathParams(BaseModel):
project_id: ProjectID
model_config = ConfigDict(populate_by_name=True, extra="forbid")


### Computation Task


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


@pytest.fixture()
def registered_user(
def create_registered_user(
postgres_db: sa.engine.Engine, faker: Faker
) -> Iterator[Callable[..., dict]]:
created_user_ids = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,34 @@ async def list_computations_latest_iteration_page(
return result


@log_decorator(_logger, level=logging.DEBUG)
async def list_computations_iterations_page(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
product_name: ProductName,
user_id: UserID,
project_id: ProjectID,
# pagination
offset: int = 0,
limit: int = 20,
# ordering
order_by: OrderBy | None = None,
) -> ComputationRunRpcGetPage:
result = await rabbitmq_rpc_client.request(
DIRECTOR_V2_RPC_NAMESPACE,
_RPC_METHOD_NAME_ADAPTER.validate_python("list_computations_iterations_page"),
product_name=product_name,
user_id=user_id,
project_id=project_id,
offset=offset,
limit=limit,
order_by=order_by,
timeout_s=_DEFAULT_TIMEOUT_S,
)
assert isinstance(result, ComputationRunRpcGetPage) # nosec
return result


@log_decorator(_logger, level=logging.DEBUG)
async def list_computations_latest_iteration_tasks_page(
rabbitmq_rpc_client: RabbitMQRPCClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,36 @@ async def list_computations_latest_iteration_page(
)


@router.expose(reraise_if_error_type=())
async def list_computations_iterations_page(
app: FastAPI,
*,
product_name: ProductName,
user_id: UserID,
project_id: ProjectID,
# pagination
offset: int = 0,
limit: int = 20,
# ordering
order_by: OrderBy | None = None,
) -> ComputationRunRpcGetPage:
comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine)
total, comp_runs_output = (
await comp_runs_repo.list_for_user_and_project_all_iterations(
product_name=product_name,
user_id=user_id,
project_id=project_id,
offset=offset,
limit=limit,
order_by=order_by,
)
)
return ComputationRunRpcGetPage(
items=comp_runs_output,
total=total,
)


async def _fetch_task_log(
user_id: UserID, project_id: ProjectID, task: ComputationTaskForRpcDBGet
) -> TaskLogFileGet | None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ async def list_(
)
]

_COMPUTATION_RUNS_RPC_GET_COLUMNS = [ # noqa: RUF012
comp_runs.c.project_uuid,
comp_runs.c.iteration,
comp_runs.c.result.label("state"),
comp_runs.c.metadata.label("info"),
comp_runs.c.created.label("submitted_at"),
comp_runs.c.started.label("started_at"),
comp_runs.c.ended.label("ended_at"),
]

async def list_for_user__only_latest_iterations(
self,
*,
Expand All @@ -212,13 +222,7 @@ async def list_for_user__only_latest_iterations(
order_by = OrderBy(field=IDStr("run_id")) # default ordering

base_select_query = sa.select(
comp_runs.c.project_uuid,
comp_runs.c.iteration,
comp_runs.c.result.label("state"),
comp_runs.c.metadata.label("info"),
comp_runs.c.created.label("submitted_at"),
comp_runs.c.started.label("started_at"),
comp_runs.c.ended.label("ended_at"),
*self._COMPUTATION_RUNS_RPC_GET_COLUMNS
).select_from(
sa.select(
comp_runs.c.project_uuid,
Expand Down Expand Up @@ -286,6 +290,62 @@ async def list_for_user__only_latest_iterations(

return cast(int, total_count), items

async def list_for_user_and_project_all_iterations(
self,
*,
product_name: str,
user_id: UserID,
project_id: ProjectID,
# pagination
offset: int,
limit: int,
# ordering
order_by: OrderBy | None = None,
) -> tuple[int, list[ComputationRunRpcGet]]:
if order_by is None:
order_by = OrderBy(field=IDStr("run_id")) # default ordering

base_select_query = sa.select(
*self._COMPUTATION_RUNS_RPC_GET_COLUMNS,
).where(
(comp_runs.c.user_id == user_id)
& (comp_runs.c.project_uuid == f"{project_id}")
& (
comp_runs.c.metadata["product_name"].astext == product_name
) # <-- NOTE: We might create a separate column for this for fast retrieval
)

# Select total count from base_query
count_query = sa.select(sa.func.count()).select_from(
base_select_query.subquery()
)

# Ordering and pagination
if order_by.direction == OrderDirection.ASC:
list_query = base_select_query.order_by(
sa.asc(getattr(comp_runs.c, order_by.field)), comp_runs.c.run_id
)
else:
list_query = base_select_query.order_by(
desc(getattr(comp_runs.c, order_by.field)), comp_runs.c.run_id
)
list_query = list_query.offset(offset).limit(limit)

async with pass_or_acquire_connection(self.db_engine) as conn:
total_count = await conn.scalar(count_query)

items = [
ComputationRunRpcGet.model_validate(
{
**row,
"state": DB_TO_RUNNING_STATE[row["state"]],
}
)
async for row in await conn.stream(list_query)
]

return cast(int, total_count), items

async def create(
self,
*,
Expand Down
36 changes: 18 additions & 18 deletions services/director-v2/tests/integration/01/test_computation_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,12 @@ def test_invalid_computation(

async def test_start_empty_computation_is_refused(
async_client: httpx.AsyncClient,
registered_user: Callable,
create_registered_user: Callable,
project: Callable[..., Awaitable[ProjectAtDB]],
osparc_product_name: str,
create_pipeline: Callable[..., Awaitable[ComputationGet]],
):
user = registered_user()
user = create_registered_user()
empty_project = await project(user)
with pytest.raises(
httpx.HTTPStatusError, match=f"{status.HTTP_422_UNPROCESSABLE_ENTITY}"
Expand Down Expand Up @@ -391,15 +391,15 @@ class PartialComputationParams:
async def test_run_partial_computation(
wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]],
async_client: httpx.AsyncClient,
registered_user: Callable,
create_registered_user: Callable,
project: Callable[..., Awaitable[ProjectAtDB]],
update_project_workbench_with_comp_tasks: Callable,
fake_workbench_without_outputs: dict[str, Any],
params: PartialComputationParams,
osparc_product_name: str,
create_pipeline: Callable[..., Awaitable[ComputationGet]],
):
user = registered_user()
user = create_registered_user()
await wait_for_catalog_service(user["id"], osparc_product_name)
sleepers_project: ProjectAtDB = await project(
user, workbench=fake_workbench_without_outputs
Expand Down Expand Up @@ -539,7 +539,7 @@ def _convert_to_pipeline_details(
async def test_run_computation(
wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]],
async_client: httpx.AsyncClient,
registered_user: Callable,
create_registered_user: Callable,
project: Callable[..., Awaitable[ProjectAtDB]],
fake_workbench_without_outputs: dict[str, Any],
update_project_workbench_with_comp_tasks: Callable,
Expand All @@ -548,7 +548,7 @@ async def test_run_computation(
osparc_product_name: str,
create_pipeline: Callable[..., Awaitable[ComputationGet]],
):
user = registered_user()
user = create_registered_user()
await wait_for_catalog_service(user["id"], osparc_product_name)
sleepers_project = await project(user, workbench=fake_workbench_without_outputs)
# send a valid project with sleepers
Expand Down Expand Up @@ -653,14 +653,14 @@ async def test_run_computation(

async def test_abort_computation(
async_client: httpx.AsyncClient,
registered_user: Callable,
create_registered_user: Callable,
project: Callable[..., Awaitable[ProjectAtDB]],
fake_workbench_without_outputs: dict[str, Any],
fake_workbench_computational_pipeline_details: PipelineDetails,
osparc_product_name: str,
create_pipeline: Callable[..., Awaitable[ComputationGet]],
):
user = registered_user()
user = create_registered_user()
# we need long running tasks to ensure cancellation is done properly
for node in fake_workbench_without_outputs.values():
if "sleeper" in node["key"]:
Expand Down Expand Up @@ -730,15 +730,15 @@ async def test_abort_computation(

async def test_update_and_delete_computation(
async_client: httpx.AsyncClient,
registered_user: Callable,
create_registered_user: Callable,
project: Callable[..., Awaitable[ProjectAtDB]],
fake_workbench_without_outputs: dict[str, Any],
fake_workbench_computational_pipeline_details_not_started: PipelineDetails,
fake_workbench_computational_pipeline_details: PipelineDetails,
osparc_product_name: str,
create_pipeline: Callable[..., Awaitable[ComputationGet]],
):
user = registered_user()
user = create_registered_user()
sleepers_project = await project(user, workbench=fake_workbench_without_outputs)
# send a valid project with sleepers
task_out = await create_pipeline(
Expand Down Expand Up @@ -852,13 +852,13 @@ async def test_update_and_delete_computation(

async def test_pipeline_with_no_computational_services_still_create_correct_comp_tasks_in_db(
async_client: httpx.AsyncClient,
registered_user: Callable,
create_registered_user: Callable,
project: Callable[..., Awaitable[ProjectAtDB]],
jupyter_service: dict[str, Any],
osparc_product_name: str,
create_pipeline: Callable[..., Awaitable[ComputationGet]],
):
user = registered_user()
user = create_registered_user()
# create a workbench with just a dynamic service
project_with_dynamic_node = await project(
user,
Expand Down Expand Up @@ -895,12 +895,12 @@ async def test_pipeline_with_no_computational_services_still_create_correct_comp

async def test_pipeline_with_control_loop_made_of_dynamic_services_is_allowed(
client: TestClient,
registered_user: Callable,
create_registered_user: Callable,
project: Callable[..., Awaitable[ProjectAtDB]],
jupyter_service: dict[str, Any],
osparc_product_name: str,
):
user = registered_user()
user = create_registered_user()
# create a workbench with just 2 dynamic service in a cycle
project_with_dynamic_node = await project(
user,
Expand Down Expand Up @@ -963,13 +963,13 @@ async def test_pipeline_with_control_loop_made_of_dynamic_services_is_allowed(

async def test_pipeline_with_cycle_containing_a_computational_service_is_forbidden(
client: TestClient,
registered_user: Callable,
create_registered_user: Callable,
project: Callable[..., Awaitable[ProjectAtDB]],
sleeper_service: dict[str, Any],
jupyter_service: dict[str, Any],
osparc_product_name: str,
):
user = registered_user()
user = create_registered_user()
# create a workbench with just 2 dynamic service in a cycle
project_with_cycly_and_comp_service = await project(
user,
Expand Down Expand Up @@ -1044,7 +1044,7 @@ async def test_pipeline_with_cycle_containing_a_computational_service_is_forbidd

async def test_burst_create_computations(
async_client: httpx.AsyncClient,
registered_user: Callable,
create_registered_user: Callable,
project: Callable[..., Awaitable[ProjectAtDB]],
fake_workbench_without_outputs: dict[str, Any],
update_project_workbench_with_comp_tasks: Callable,
Expand All @@ -1053,7 +1053,7 @@ async def test_burst_create_computations(
osparc_product_name: str,
create_pipeline: Callable[..., Awaitable[ComputationGet]],
):
user = registered_user()
user = create_registered_user()
sleepers_project = await project(user, workbench=fake_workbench_without_outputs)
sleepers_project2 = await project(user, workbench=fake_workbench_without_outputs)

Expand Down
Loading
Loading