Skip to content

Commit 6c1cc19

Browse files
authored
Merge branch 'master' into enh/publish-and-template
2 parents e857f87 + 2be6248 commit 6c1cc19

File tree

25 files changed

+896
-153
lines changed

25 files changed

+896
-153
lines changed

api/specs/web-server/_computations.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22

33
from _common import as_query
44
from fastapi import APIRouter, Depends, status
5+
from fastapi_pagination import Page
56
from models_library.api_schemas_webserver.computations import (
67
ComputationGet,
78
ComputationPathParams,
9+
ComputationRunPathParams,
810
ComputationRunRestGet,
11+
ComputationRunWithFiltersListQueryParams,
912
ComputationStart,
1013
ComputationStarted,
1114
ComputationTaskRestGet,
@@ -65,16 +68,26 @@ async def stop_computation(_path: Annotated[ComputationPathParams, Depends()]):
6568

6669
@router.get(
6770
"/computations/-/iterations/latest",
68-
response_model=Envelope[list[ComputationRunRestGet]],
71+
response_model=Page[ComputationRunRestGet],
6972
)
7073
async def list_computations_latest_iteration(
74+
_query: Annotated[as_query(ComputationRunWithFiltersListQueryParams), Depends()],
75+
): ...
76+
77+
78+
@router.get(
79+
"/computations/{project_id}/iterations",
80+
response_model=Page[ComputationRunRestGet],
81+
)
82+
async def list_computation_iterations(
7183
_query: Annotated[as_query(ComputationRunListQueryParams), Depends()],
84+
_path: Annotated[ComputationRunPathParams, Depends()],
7285
): ...
7386

7487

7588
@router.get(
7689
"/computations/{project_id}/iterations/latest/tasks",
77-
response_model=Envelope[list[ComputationTaskRestGet]],
90+
response_model=Page[ComputationTaskRestGet],
7891
)
7992
async def list_computations_latest_iteration_tasks(
8093
_query: Annotated[as_query(ComputationTaskListQueryParams), Depends()],

packages/models-library/src/models_library/api_schemas_webserver/computations.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,10 @@ class ComputationStarted(OutputSchemaWithoutCamelCase):
8181
class ComputationRunListQueryParams(
8282
PageQueryParameters,
8383
ComputationRunListOrderParams, # type: ignore[misc, valid-type]
84-
):
84+
): ...
85+
86+
87+
class ComputationRunWithFiltersListQueryParams(ComputationRunListQueryParams):
8588
filter_only_running: bool = Field(
8689
default=False,
8790
description="If true, only running computations are returned",
@@ -100,6 +103,11 @@ class ComputationRunRestGet(OutputSchema):
100103
project_custom_metadata: dict[str, Any]
101104

102105

106+
class ComputationRunPathParams(BaseModel):
107+
project_id: ProjectID
108+
model_config = ConfigDict(populate_by_name=True, extra="forbid")
109+
110+
103111
### Computation Task
104112

105113

packages/pytest-simcore/src/pytest_simcore/db_entries_mocks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424

2525
@pytest.fixture()
26-
def registered_user(
26+
def create_registered_user(
2727
postgres_db: sa.engine.Engine, faker: Faker
2828
) -> Iterator[Callable[..., dict]]:
2929
created_user_ids = []

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,34 @@ async def list_computations_latest_iteration_page(
5858
return result
5959

6060

61+
@log_decorator(_logger, level=logging.DEBUG)
62+
async def list_computations_iterations_page(
63+
rabbitmq_rpc_client: RabbitMQRPCClient,
64+
*,
65+
product_name: ProductName,
66+
user_id: UserID,
67+
project_id: ProjectID,
68+
# pagination
69+
offset: int = 0,
70+
limit: int = 20,
71+
# ordering
72+
order_by: OrderBy | None = None,
73+
) -> ComputationRunRpcGetPage:
74+
result = await rabbitmq_rpc_client.request(
75+
DIRECTOR_V2_RPC_NAMESPACE,
76+
_RPC_METHOD_NAME_ADAPTER.validate_python("list_computations_iterations_page"),
77+
product_name=product_name,
78+
user_id=user_id,
79+
project_id=project_id,
80+
offset=offset,
81+
limit=limit,
82+
order_by=order_by,
83+
timeout_s=_DEFAULT_TIMEOUT_S,
84+
)
85+
assert isinstance(result, ComputationRunRpcGetPage) # nosec
86+
return result
87+
88+
6189
@log_decorator(_logger, level=logging.DEBUG)
6290
async def list_computations_latest_iteration_tasks_page(
6391
rabbitmq_rpc_client: RabbitMQRPCClient,

services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,36 @@ async def list_computations_latest_iteration_page(
5353
)
5454

5555

56+
@router.expose(reraise_if_error_type=())
57+
async def list_computations_iterations_page(
58+
app: FastAPI,
59+
*,
60+
product_name: ProductName,
61+
user_id: UserID,
62+
project_id: ProjectID,
63+
# pagination
64+
offset: int = 0,
65+
limit: int = 20,
66+
# ordering
67+
order_by: OrderBy | None = None,
68+
) -> ComputationRunRpcGetPage:
69+
comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine)
70+
total, comp_runs_output = (
71+
await comp_runs_repo.list_for_user_and_project_all_iterations(
72+
product_name=product_name,
73+
user_id=user_id,
74+
project_id=project_id,
75+
offset=offset,
76+
limit=limit,
77+
order_by=order_by,
78+
)
79+
)
80+
return ComputationRunRpcGetPage(
81+
items=comp_runs_output,
82+
total=total,
83+
)
84+
85+
5686
async def _fetch_task_log(
5787
user_id: UserID, project_id: ProjectID, task: ComputationTaskForRpcDBGet
5888
) -> TaskLogFileGet | None:

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,16 @@ async def list_(
190190
)
191191
]
192192

193+
_COMPUTATION_RUNS_RPC_GET_COLUMNS = [ # noqa: RUF012
194+
comp_runs.c.project_uuid,
195+
comp_runs.c.iteration,
196+
comp_runs.c.result.label("state"),
197+
comp_runs.c.metadata.label("info"),
198+
comp_runs.c.created.label("submitted_at"),
199+
comp_runs.c.started.label("started_at"),
200+
comp_runs.c.ended.label("ended_at"),
201+
]
202+
193203
async def list_for_user__only_latest_iterations(
194204
self,
195205
*,
@@ -212,13 +222,7 @@ async def list_for_user__only_latest_iterations(
212222
order_by = OrderBy(field=IDStr("run_id")) # default ordering
213223

214224
base_select_query = sa.select(
215-
comp_runs.c.project_uuid,
216-
comp_runs.c.iteration,
217-
comp_runs.c.result.label("state"),
218-
comp_runs.c.metadata.label("info"),
219-
comp_runs.c.created.label("submitted_at"),
220-
comp_runs.c.started.label("started_at"),
221-
comp_runs.c.ended.label("ended_at"),
225+
*self._COMPUTATION_RUNS_RPC_GET_COLUMNS
222226
).select_from(
223227
sa.select(
224228
comp_runs.c.project_uuid,
@@ -286,6 +290,62 @@ async def list_for_user__only_latest_iterations(
286290

287291
return cast(int, total_count), items
288292

293+
async def list_for_user_and_project_all_iterations(
294+
self,
295+
*,
296+
product_name: str,
297+
user_id: UserID,
298+
project_id: ProjectID,
299+
# pagination
300+
offset: int,
301+
limit: int,
302+
# ordering
303+
order_by: OrderBy | None = None,
304+
) -> tuple[int, list[ComputationRunRpcGet]]:
305+
if order_by is None:
306+
order_by = OrderBy(field=IDStr("run_id")) # default ordering
307+
308+
base_select_query = sa.select(
309+
*self._COMPUTATION_RUNS_RPC_GET_COLUMNS,
310+
).where(
311+
(comp_runs.c.user_id == user_id)
312+
& (comp_runs.c.project_uuid == f"{project_id}")
313+
& (
314+
comp_runs.c.metadata["product_name"].astext == product_name
315+
) # <-- NOTE: We might create a separate column for this for fast retrieval
316+
)
317+
318+
# Select total count from base_query
319+
count_query = sa.select(sa.func.count()).select_from(
320+
base_select_query.subquery()
321+
)
322+
323+
# Ordering and pagination
324+
if order_by.direction == OrderDirection.ASC:
325+
list_query = base_select_query.order_by(
326+
sa.asc(getattr(comp_runs.c, order_by.field)), comp_runs.c.run_id
327+
)
328+
else:
329+
list_query = base_select_query.order_by(
330+
desc(getattr(comp_runs.c, order_by.field)), comp_runs.c.run_id
331+
)
332+
list_query = list_query.offset(offset).limit(limit)
333+
334+
async with pass_or_acquire_connection(self.db_engine) as conn:
335+
total_count = await conn.scalar(count_query)
336+
337+
items = [
338+
ComputationRunRpcGet.model_validate(
339+
{
340+
**row,
341+
"state": DB_TO_RUNNING_STATE[row["state"]],
342+
}
343+
)
344+
async for row in await conn.stream(list_query)
345+
]
346+
347+
return cast(int, total_count), items
348+
289349
async def create(
290350
self,
291351
*,

services/director-v2/tests/integration/01/test_computation_api.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -188,12 +188,12 @@ def test_invalid_computation(
188188

189189
async def test_start_empty_computation_is_refused(
190190
async_client: httpx.AsyncClient,
191-
registered_user: Callable,
191+
create_registered_user: Callable,
192192
project: Callable[..., Awaitable[ProjectAtDB]],
193193
osparc_product_name: str,
194194
create_pipeline: Callable[..., Awaitable[ComputationGet]],
195195
):
196-
user = registered_user()
196+
user = create_registered_user()
197197
empty_project = await project(user)
198198
with pytest.raises(
199199
httpx.HTTPStatusError, match=f"{status.HTTP_422_UNPROCESSABLE_ENTITY}"
@@ -391,15 +391,15 @@ class PartialComputationParams:
391391
async def test_run_partial_computation(
392392
wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]],
393393
async_client: httpx.AsyncClient,
394-
registered_user: Callable,
394+
create_registered_user: Callable,
395395
project: Callable[..., Awaitable[ProjectAtDB]],
396396
update_project_workbench_with_comp_tasks: Callable,
397397
fake_workbench_without_outputs: dict[str, Any],
398398
params: PartialComputationParams,
399399
osparc_product_name: str,
400400
create_pipeline: Callable[..., Awaitable[ComputationGet]],
401401
):
402-
user = registered_user()
402+
user = create_registered_user()
403403
await wait_for_catalog_service(user["id"], osparc_product_name)
404404
sleepers_project: ProjectAtDB = await project(
405405
user, workbench=fake_workbench_without_outputs
@@ -539,7 +539,7 @@ def _convert_to_pipeline_details(
539539
async def test_run_computation(
540540
wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]],
541541
async_client: httpx.AsyncClient,
542-
registered_user: Callable,
542+
create_registered_user: Callable,
543543
project: Callable[..., Awaitable[ProjectAtDB]],
544544
fake_workbench_without_outputs: dict[str, Any],
545545
update_project_workbench_with_comp_tasks: Callable,
@@ -548,7 +548,7 @@ async def test_run_computation(
548548
osparc_product_name: str,
549549
create_pipeline: Callable[..., Awaitable[ComputationGet]],
550550
):
551-
user = registered_user()
551+
user = create_registered_user()
552552
await wait_for_catalog_service(user["id"], osparc_product_name)
553553
sleepers_project = await project(user, workbench=fake_workbench_without_outputs)
554554
# send a valid project with sleepers
@@ -653,14 +653,14 @@ async def test_run_computation(
653653

654654
async def test_abort_computation(
655655
async_client: httpx.AsyncClient,
656-
registered_user: Callable,
656+
create_registered_user: Callable,
657657
project: Callable[..., Awaitable[ProjectAtDB]],
658658
fake_workbench_without_outputs: dict[str, Any],
659659
fake_workbench_computational_pipeline_details: PipelineDetails,
660660
osparc_product_name: str,
661661
create_pipeline: Callable[..., Awaitable[ComputationGet]],
662662
):
663-
user = registered_user()
663+
user = create_registered_user()
664664
# we need long running tasks to ensure cancellation is done properly
665665
for node in fake_workbench_without_outputs.values():
666666
if "sleeper" in node["key"]:
@@ -730,15 +730,15 @@ async def test_abort_computation(
730730

731731
async def test_update_and_delete_computation(
732732
async_client: httpx.AsyncClient,
733-
registered_user: Callable,
733+
create_registered_user: Callable,
734734
project: Callable[..., Awaitable[ProjectAtDB]],
735735
fake_workbench_without_outputs: dict[str, Any],
736736
fake_workbench_computational_pipeline_details_not_started: PipelineDetails,
737737
fake_workbench_computational_pipeline_details: PipelineDetails,
738738
osparc_product_name: str,
739739
create_pipeline: Callable[..., Awaitable[ComputationGet]],
740740
):
741-
user = registered_user()
741+
user = create_registered_user()
742742
sleepers_project = await project(user, workbench=fake_workbench_without_outputs)
743743
# send a valid project with sleepers
744744
task_out = await create_pipeline(
@@ -852,13 +852,13 @@ async def test_update_and_delete_computation(
852852

853853
async def test_pipeline_with_no_computational_services_still_create_correct_comp_tasks_in_db(
854854
async_client: httpx.AsyncClient,
855-
registered_user: Callable,
855+
create_registered_user: Callable,
856856
project: Callable[..., Awaitable[ProjectAtDB]],
857857
jupyter_service: dict[str, Any],
858858
osparc_product_name: str,
859859
create_pipeline: Callable[..., Awaitable[ComputationGet]],
860860
):
861-
user = registered_user()
861+
user = create_registered_user()
862862
# create a workbench with just a dynamic service
863863
project_with_dynamic_node = await project(
864864
user,
@@ -895,12 +895,12 @@ async def test_pipeline_with_no_computational_services_still_create_correct_comp
895895

896896
async def test_pipeline_with_control_loop_made_of_dynamic_services_is_allowed(
897897
client: TestClient,
898-
registered_user: Callable,
898+
create_registered_user: Callable,
899899
project: Callable[..., Awaitable[ProjectAtDB]],
900900
jupyter_service: dict[str, Any],
901901
osparc_product_name: str,
902902
):
903-
user = registered_user()
903+
user = create_registered_user()
904904
# create a workbench with just 2 dynamic service in a cycle
905905
project_with_dynamic_node = await project(
906906
user,
@@ -963,13 +963,13 @@ async def test_pipeline_with_control_loop_made_of_dynamic_services_is_allowed(
963963

964964
async def test_pipeline_with_cycle_containing_a_computational_service_is_forbidden(
965965
client: TestClient,
966-
registered_user: Callable,
966+
create_registered_user: Callable,
967967
project: Callable[..., Awaitable[ProjectAtDB]],
968968
sleeper_service: dict[str, Any],
969969
jupyter_service: dict[str, Any],
970970
osparc_product_name: str,
971971
):
972-
user = registered_user()
972+
user = create_registered_user()
973973
# create a workbench with just 2 dynamic service in a cycle
974974
project_with_cycly_and_comp_service = await project(
975975
user,
@@ -1044,7 +1044,7 @@ async def test_pipeline_with_cycle_containing_a_computational_service_is_forbidd
10441044

10451045
async def test_burst_create_computations(
10461046
async_client: httpx.AsyncClient,
1047-
registered_user: Callable,
1047+
create_registered_user: Callable,
10481048
project: Callable[..., Awaitable[ProjectAtDB]],
10491049
fake_workbench_without_outputs: dict[str, Any],
10501050
update_project_workbench_with_comp_tasks: Callable,
@@ -1053,7 +1053,7 @@ async def test_burst_create_computations(
10531053
osparc_product_name: str,
10541054
create_pipeline: Callable[..., Awaitable[ComputationGet]],
10551055
):
1056-
user = registered_user()
1056+
user = create_registered_user()
10571057
sleepers_project = await project(user, workbench=fake_workbench_without_outputs)
10581058
sleepers_project2 = await project(user, workbench=fake_workbench_without_outputs)
10591059

0 commit comments

Comments
 (0)