Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
from models_library.users import UserID
from servicelib.rabbitmq import RPCRouter
from servicelib.utils import limited_gather
from simcore_service_director_v2.models.comp_tasks import ComputationTaskForRpcDBGet

from ...core.errors import ComputationalRunNotFoundError
from ...models.comp_runs import CompRunsAtDB
from ...models.comp_tasks import ComputationTaskForRpcDBGet
from ...modules.db.repositories.comp_runs import CompRunsRepository
from ...modules.db.repositories.comp_tasks import CompTasksRepository
from ...utils import dask as dask_utils
Expand Down Expand Up @@ -95,6 +97,19 @@ async def _fetch_task_log(
return None


async def _get_latest_run_or_none(
comp_runs_repo: CompRunsRepository,
user_id: UserID,
project_uuid: ProjectID,
) -> CompRunsAtDB | None:
try:
return await comp_runs_repo.get(
user_id=user_id, project_id=project_uuid, iteration=None
)
except ComputationalRunNotFoundError:
return None


@router.expose(reraise_if_error_type=())
async def list_computations_latest_iteration_tasks_page(
app: FastAPI,
Expand All @@ -114,6 +129,8 @@ async def list_computations_latest_iteration_tasks_page(
comp_tasks_repo = CompTasksRepository.instance(db_engine=app.state.engine)
comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine)

# Get latest

total, comp_tasks = await comp_tasks_repo.list_computational_tasks_rpc_domain(
project_ids=project_ids,
offset=offset,
Expand All @@ -127,13 +144,15 @@ async def list_computations_latest_iteration_tasks_page(
# Fetch latest run for each project concurrently
latest_runs = await limited_gather(
*[
comp_runs_repo.get(user_id=user_id, project_id=project_uuid, iteration=None)
_get_latest_run_or_none(comp_runs_repo, user_id, project_uuid)
for project_uuid in unique_project_uuids
],
limit=20,
)
# Build a dict: project_uuid -> iteration
project_uuid_to_iteration = {run.project_uuid: run.iteration for run in latest_runs}
project_uuid_to_iteration = {
run.project_uuid: run.iteration for run in latest_runs if run is not None
}

# Run all log fetches concurrently
log_files = await limited_gather(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,7 @@ async def list_for_user__only_latest_iterations(
if order_by is None:
order_by = OrderBy(field=IDStr("run_id")) # default ordering

base_select_query = sa.select(
*self._COMPUTATION_RUNS_RPC_GET_COLUMNS
).select_from(
_latest_runs = (
sa.select(
comp_runs.c.project_uuid,
sa.func.max(comp_runs.c.iteration).label(
Expand All @@ -235,20 +233,23 @@ async def list_for_user__only_latest_iterations(
& (
comp_runs.c.metadata["product_name"].astext == product_name
) # <-- NOTE: We might create a separate column for this for fast retrieval
& (
comp_runs.c.result.in_(
[
RUNNING_STATE_TO_DB[item]
for item in RunningState.list_running_states()
]
)
)
if filter_only_running
else True
)
.group_by(comp_runs.c.project_uuid)
.subquery("latest_runs")
.join(
)
if filter_only_running:
_latest_runs = _latest_runs.where(
comp_runs.c.result.in_(
[
RUNNING_STATE_TO_DB[item]
for item in RunningState.list_running_states()
]
)
)

base_select_query = sa.select(
*self._COMPUTATION_RUNS_RPC_GET_COLUMNS
).select_from(
_latest_runs.subquery().join(
comp_runs,
sa.and_(
comp_runs.c.project_uuid
Expand Down
Loading