Skip to content

Commit c4ac547

Browse files
Merge branch 'master' into fix-api-base-url-generation
2 parents e739fe9 + 2be6248 commit c4ac547

File tree

25 files changed

+893
-147
lines changed

25 files changed

+893
-147
lines changed

api/specs/web-server/_computations.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22

33
from _common import as_query
44
from fastapi import APIRouter, Depends, status
5+
from fastapi_pagination import Page
56
from models_library.api_schemas_webserver.computations import (
67
ComputationGet,
78
ComputationPathParams,
9+
ComputationRunPathParams,
810
ComputationRunRestGet,
11+
ComputationRunWithFiltersListQueryParams,
912
ComputationStart,
1013
ComputationStarted,
1114
ComputationTaskRestGet,
@@ -65,16 +68,26 @@ async def stop_computation(_path: Annotated[ComputationPathParams, Depends()]):
6568

6669
@router.get(
6770
"/computations/-/iterations/latest",
68-
response_model=Envelope[list[ComputationRunRestGet]],
71+
response_model=Page[ComputationRunRestGet],
6972
)
7073
async def list_computations_latest_iteration(
74+
_query: Annotated[as_query(ComputationRunWithFiltersListQueryParams), Depends()],
75+
): ...
76+
77+
78+
@router.get(
79+
"/computations/{project_id}/iterations",
80+
response_model=Page[ComputationRunRestGet],
81+
)
82+
async def list_computation_iterations(
7183
_query: Annotated[as_query(ComputationRunListQueryParams), Depends()],
84+
_path: Annotated[ComputationRunPathParams, Depends()],
7285
): ...
7386

7487

7588
@router.get(
7689
"/computations/{project_id}/iterations/latest/tasks",
77-
response_model=Envelope[list[ComputationTaskRestGet]],
90+
response_model=Page[ComputationTaskRestGet],
7891
)
7992
async def list_computations_latest_iteration_tasks(
8093
_query: Annotated[as_query(ComputationTaskListQueryParams), Depends()],

packages/models-library/src/models_library/api_schemas_webserver/computations.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,10 @@ class ComputationStarted(OutputSchemaWithoutCamelCase):
8181
class ComputationRunListQueryParams(
8282
PageQueryParameters,
8383
ComputationRunListOrderParams, # type: ignore[misc, valid-type]
84-
):
84+
): ...
85+
86+
87+
class ComputationRunWithFiltersListQueryParams(ComputationRunListQueryParams):
8588
filter_only_running: bool = Field(
8689
default=False,
8790
description="If true, only running computations are returned",
@@ -100,6 +103,11 @@ class ComputationRunRestGet(OutputSchema):
100103
project_custom_metadata: dict[str, Any]
101104

102105

106+
class ComputationRunPathParams(BaseModel):
107+
project_id: ProjectID
108+
model_config = ConfigDict(populate_by_name=True, extra="forbid")
109+
110+
103111
### Computation Task
104112

105113

packages/pytest-simcore/src/pytest_simcore/db_entries_mocks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424

2525
@pytest.fixture()
26-
def registered_user(
26+
def create_registered_user(
2727
postgres_db: sa.engine.Engine, faker: Faker
2828
) -> Iterator[Callable[..., dict]]:
2929
created_user_ids = []

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,34 @@ async def list_computations_latest_iteration_page(
5858
return result
5959

6060

61+
@log_decorator(_logger, level=logging.DEBUG)
62+
async def list_computations_iterations_page(
63+
rabbitmq_rpc_client: RabbitMQRPCClient,
64+
*,
65+
product_name: ProductName,
66+
user_id: UserID,
67+
project_id: ProjectID,
68+
# pagination
69+
offset: int = 0,
70+
limit: int = 20,
71+
# ordering
72+
order_by: OrderBy | None = None,
73+
) -> ComputationRunRpcGetPage:
74+
result = await rabbitmq_rpc_client.request(
75+
DIRECTOR_V2_RPC_NAMESPACE,
76+
_RPC_METHOD_NAME_ADAPTER.validate_python("list_computations_iterations_page"),
77+
product_name=product_name,
78+
user_id=user_id,
79+
project_id=project_id,
80+
offset=offset,
81+
limit=limit,
82+
order_by=order_by,
83+
timeout_s=_DEFAULT_TIMEOUT_S,
84+
)
85+
assert isinstance(result, ComputationRunRpcGetPage) # nosec
86+
return result
87+
88+
6189
@log_decorator(_logger, level=logging.DEBUG)
6290
async def list_computations_latest_iteration_tasks_page(
6391
rabbitmq_rpc_client: RabbitMQRPCClient,

services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,36 @@ async def list_computations_latest_iteration_page(
5353
)
5454

5555

56+
@router.expose(reraise_if_error_type=())
57+
async def list_computations_iterations_page(
58+
app: FastAPI,
59+
*,
60+
product_name: ProductName,
61+
user_id: UserID,
62+
project_id: ProjectID,
63+
# pagination
64+
offset: int = 0,
65+
limit: int = 20,
66+
# ordering
67+
order_by: OrderBy | None = None,
68+
) -> ComputationRunRpcGetPage:
69+
comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine)
70+
total, comp_runs_output = (
71+
await comp_runs_repo.list_for_user_and_project_all_iterations(
72+
product_name=product_name,
73+
user_id=user_id,
74+
project_id=project_id,
75+
offset=offset,
76+
limit=limit,
77+
order_by=order_by,
78+
)
79+
)
80+
return ComputationRunRpcGetPage(
81+
items=comp_runs_output,
82+
total=total,
83+
)
84+
85+
5686
async def _fetch_task_log(
5787
user_id: UserID, project_id: ProjectID, task: ComputationTaskForRpcDBGet
5888
) -> TaskLogFileGet | None:

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,16 @@ async def list_(
190190
)
191191
]
192192

193+
_COMPUTATION_RUNS_RPC_GET_COLUMNS = [ # noqa: RUF012
194+
comp_runs.c.project_uuid,
195+
comp_runs.c.iteration,
196+
comp_runs.c.result.label("state"),
197+
comp_runs.c.metadata.label("info"),
198+
comp_runs.c.created.label("submitted_at"),
199+
comp_runs.c.started.label("started_at"),
200+
comp_runs.c.ended.label("ended_at"),
201+
]
202+
193203
async def list_for_user__only_latest_iterations(
194204
self,
195205
*,
@@ -212,13 +222,7 @@ async def list_for_user__only_latest_iterations(
212222
order_by = OrderBy(field=IDStr("run_id")) # default ordering
213223

214224
base_select_query = sa.select(
215-
comp_runs.c.project_uuid,
216-
comp_runs.c.iteration,
217-
comp_runs.c.result.label("state"),
218-
comp_runs.c.metadata.label("info"),
219-
comp_runs.c.created.label("submitted_at"),
220-
comp_runs.c.started.label("started_at"),
221-
comp_runs.c.ended.label("ended_at"),
225+
*self._COMPUTATION_RUNS_RPC_GET_COLUMNS
222226
).select_from(
223227
sa.select(
224228
comp_runs.c.project_uuid,
@@ -286,6 +290,62 @@ async def list_for_user__only_latest_iterations(
286290

287291
return cast(int, total_count), items
288292

293+
async def list_for_user_and_project_all_iterations(
294+
self,
295+
*,
296+
product_name: str,
297+
user_id: UserID,
298+
project_id: ProjectID,
299+
# pagination
300+
offset: int,
301+
limit: int,
302+
# ordering
303+
order_by: OrderBy | None = None,
304+
) -> tuple[int, list[ComputationRunRpcGet]]:
305+
if order_by is None:
306+
order_by = OrderBy(field=IDStr("run_id")) # default ordering
307+
308+
base_select_query = sa.select(
309+
*self._COMPUTATION_RUNS_RPC_GET_COLUMNS,
310+
).where(
311+
(comp_runs.c.user_id == user_id)
312+
& (comp_runs.c.project_uuid == f"{project_id}")
313+
& (
314+
comp_runs.c.metadata["product_name"].astext == product_name
315+
) # <-- NOTE: We might create a separate column for this for fast retrieval
316+
)
317+
318+
# Select total count from base_query
319+
count_query = sa.select(sa.func.count()).select_from(
320+
base_select_query.subquery()
321+
)
322+
323+
# Ordering and pagination
324+
if order_by.direction == OrderDirection.ASC:
325+
list_query = base_select_query.order_by(
326+
sa.asc(getattr(comp_runs.c, order_by.field)), comp_runs.c.run_id
327+
)
328+
else:
329+
list_query = base_select_query.order_by(
330+
desc(getattr(comp_runs.c, order_by.field)), comp_runs.c.run_id
331+
)
332+
list_query = list_query.offset(offset).limit(limit)
333+
334+
async with pass_or_acquire_connection(self.db_engine) as conn:
335+
total_count = await conn.scalar(count_query)
336+
337+
items = [
338+
ComputationRunRpcGet.model_validate(
339+
{
340+
**row,
341+
"state": DB_TO_RUNNING_STATE[row["state"]],
342+
}
343+
)
344+
async for row in await conn.stream(list_query)
345+
]
346+
347+
return cast(int, total_count), items
348+
289349
async def create(
290350
self,
291351
*,

services/director-v2/tests/integration/01/test_computation_api.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -191,13 +191,13 @@ def test_invalid_computation(
191191

192192
async def test_start_empty_computation_is_refused(
193193
async_client: httpx.AsyncClient,
194-
registered_user: Callable,
194+
create_registered_user: Callable,
195195
project: Callable[..., Awaitable[ProjectAtDB]],
196196
osparc_product_name: str,
197197
osparc_product_api_base_url: str,
198198
create_pipeline: Callable[..., Awaitable[ComputationGet]],
199199
):
200-
user = registered_user()
200+
user = create_registered_user()
201201
empty_project = await project(user)
202202
with pytest.raises(
203203
httpx.HTTPStatusError, match=f"{status.HTTP_422_UNPROCESSABLE_ENTITY}"
@@ -396,7 +396,7 @@ class PartialComputationParams:
396396
async def test_run_partial_computation(
397397
wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]],
398398
async_client: httpx.AsyncClient,
399-
registered_user: Callable,
399+
create_registered_user: Callable,
400400
project: Callable[..., Awaitable[ProjectAtDB]],
401401
update_project_workbench_with_comp_tasks: Callable,
402402
fake_workbench_without_outputs: dict[str, Any],
@@ -405,7 +405,7 @@ async def test_run_partial_computation(
405405
osparc_product_api_base_url: str,
406406
create_pipeline: Callable[..., Awaitable[ComputationGet]],
407407
):
408-
user = registered_user()
408+
user = create_registered_user()
409409
await wait_for_catalog_service(user["id"], osparc_product_name)
410410
sleepers_project: ProjectAtDB = await project(
411411
user, workbench=fake_workbench_without_outputs
@@ -548,7 +548,7 @@ def _convert_to_pipeline_details(
548548
async def test_run_computation(
549549
wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]],
550550
async_client: httpx.AsyncClient,
551-
registered_user: Callable,
551+
create_registered_user: Callable,
552552
project: Callable[..., Awaitable[ProjectAtDB]],
553553
fake_workbench_without_outputs: dict[str, Any],
554554
update_project_workbench_with_comp_tasks: Callable,
@@ -558,7 +558,7 @@ async def test_run_computation(
558558
osparc_product_api_base_url: str,
559559
create_pipeline: Callable[..., Awaitable[ComputationGet]],
560560
):
561-
user = registered_user()
561+
user = create_registered_user()
562562
await wait_for_catalog_service(user["id"], osparc_product_name)
563563
sleepers_project = await project(user, workbench=fake_workbench_without_outputs)
564564
# send a valid project with sleepers
@@ -666,15 +666,15 @@ async def test_run_computation(
666666

667667
async def test_abort_computation(
668668
async_client: httpx.AsyncClient,
669-
registered_user: Callable,
669+
create_registered_user: Callable,
670670
project: Callable[..., Awaitable[ProjectAtDB]],
671671
fake_workbench_without_outputs: dict[str, Any],
672672
fake_workbench_computational_pipeline_details: PipelineDetails,
673673
osparc_product_name: str,
674674
osparc_product_api_base_url: str,
675675
create_pipeline: Callable[..., Awaitable[ComputationGet]],
676676
):
677-
user = registered_user()
677+
user = create_registered_user()
678678
# we need long running tasks to ensure cancellation is done properly
679679
for node in fake_workbench_without_outputs.values():
680680
if "sleeper" in node["key"]:
@@ -745,7 +745,7 @@ async def test_abort_computation(
745745

746746
async def test_update_and_delete_computation(
747747
async_client: httpx.AsyncClient,
748-
registered_user: Callable,
748+
create_registered_user: Callable,
749749
project: Callable[..., Awaitable[ProjectAtDB]],
750750
fake_workbench_without_outputs: dict[str, Any],
751751
fake_workbench_computational_pipeline_details_not_started: PipelineDetails,
@@ -754,7 +754,7 @@ async def test_update_and_delete_computation(
754754
osparc_product_api_base_url: str,
755755
create_pipeline: Callable[..., Awaitable[ComputationGet]],
756756
):
757-
user = registered_user()
757+
user = create_registered_user()
758758
sleepers_project = await project(user, workbench=fake_workbench_without_outputs)
759759
# send a valid project with sleepers
760760
task_out = await create_pipeline(
@@ -873,14 +873,14 @@ async def test_update_and_delete_computation(
873873

874874
async def test_pipeline_with_no_computational_services_still_create_correct_comp_tasks_in_db(
875875
async_client: httpx.AsyncClient,
876-
registered_user: Callable,
876+
create_registered_user: Callable,
877877
project: Callable[..., Awaitable[ProjectAtDB]],
878878
jupyter_service: dict[str, Any],
879879
osparc_product_name: str,
880880
osparc_product_api_base_url: str,
881881
create_pipeline: Callable[..., Awaitable[ComputationGet]],
882882
):
883-
user = registered_user()
883+
user = create_registered_user()
884884
# create a workbench with just a dynamic service
885885
project_with_dynamic_node = await project(
886886
user,
@@ -919,13 +919,13 @@ async def test_pipeline_with_no_computational_services_still_create_correct_comp
919919

920920
async def test_pipeline_with_control_loop_made_of_dynamic_services_is_allowed(
921921
client: TestClient,
922-
registered_user: Callable,
922+
create_registered_user: Callable,
923923
project: Callable[..., Awaitable[ProjectAtDB]],
924924
jupyter_service: dict[str, Any],
925925
osparc_product_name: str,
926926
osparc_product_api_base_url: str,
927927
):
928-
user = registered_user()
928+
user = create_registered_user()
929929
# create a workbench with just 2 dynamic service in a cycle
930930
project_with_dynamic_node = await project(
931931
user,
@@ -990,14 +990,14 @@ async def test_pipeline_with_control_loop_made_of_dynamic_services_is_allowed(
990990

991991
async def test_pipeline_with_cycle_containing_a_computational_service_is_forbidden(
992992
client: TestClient,
993-
registered_user: Callable,
993+
create_registered_user: Callable,
994994
project: Callable[..., Awaitable[ProjectAtDB]],
995995
sleeper_service: dict[str, Any],
996996
jupyter_service: dict[str, Any],
997997
osparc_product_name: str,
998998
osparc_product_api_base_url: str,
999999
):
1000-
user = registered_user()
1000+
user = create_registered_user()
10011001
# create a workbench with just 2 dynamic service in a cycle
10021002
project_with_cycly_and_comp_service = await project(
10031003
user,
@@ -1074,7 +1074,7 @@ async def test_pipeline_with_cycle_containing_a_computational_service_is_forbidd
10741074

10751075
async def test_burst_create_computations(
10761076
async_client: httpx.AsyncClient,
1077-
registered_user: Callable,
1077+
create_registered_user: Callable,
10781078
project: Callable[..., Awaitable[ProjectAtDB]],
10791079
fake_workbench_without_outputs: dict[str, Any],
10801080
update_project_workbench_with_comp_tasks: Callable,
@@ -1084,7 +1084,7 @@ async def test_burst_create_computations(
10841084
osparc_product_api_base_url: str,
10851085
create_pipeline: Callable[..., Awaitable[ComputationGet]],
10861086
):
1087-
user = registered_user()
1087+
user = create_registered_user()
10881088
sleepers_project = await project(user, workbench=fake_workbench_without_outputs)
10891089
sleepers_project2 = await project(user, workbench=fake_workbench_without_outputs)
10901090

0 commit comments

Comments
 (0)