Skip to content

Commit 7aec02e

Browse files
🚒 Director-v2 introduce get_latest_run_by_project (#8079)
1 parent b901a65 commit 7aec02e

File tree

3 files changed

+102
-11
lines changed

3 files changed

+102
-11
lines changed

services/director-v2/src/simcore_service_director_v2/api/routes/computations.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ async def _check_pipeline_not_running_or_raise_409(
9797
computation: ComputationCreate,
9898
) -> None:
9999
with contextlib.suppress(ComputationalRunNotFoundError):
100-
last_run = await comp_runs_repo.get(
101-
user_id=computation.user_id, project_id=computation.project_id
100+
last_run = await comp_runs_repo.get_latest_run_by_project(
101+
project_id=computation.project_id
102102
)
103103
pipeline_state = last_run.result
104104

@@ -367,8 +367,8 @@ async def create_or_update_or_start_computation( # noqa: PLR0913 # pylint: disa
367367
last_run: CompRunsAtDB | None = None
368368
pipeline_state = RunningState.NOT_STARTED
369369
with contextlib.suppress(ComputationalRunNotFoundError):
370-
last_run = await comp_runs_repo.get(
371-
user_id=computation.user_id, project_id=computation.project_id
370+
last_run = await comp_runs_repo.get_latest_run_by_project(
371+
project_id=computation.project_id
372372
)
373373
pipeline_state = last_run.result
374374

@@ -467,7 +467,7 @@ async def get_computation(
467467
last_run: CompRunsAtDB | None = None
468468
pipeline_state = RunningState.NOT_STARTED
469469
with contextlib.suppress(ComputationalRunNotFoundError):
470-
last_run = await comp_runs_repo.get(user_id=user_id, project_id=project_id)
470+
last_run = await comp_runs_repo.get_latest_run_by_project(project_id=project_id)
471471
pipeline_state = last_run.result
472472

473473
_logger.debug(
@@ -542,8 +542,8 @@ async def stop_computation(
542542
last_run: CompRunsAtDB | None = None
543543
pipeline_state = RunningState.UNKNOWN
544544
with contextlib.suppress(ComputationalRunNotFoundError):
545-
last_run = await comp_runs_repo.get(
546-
user_id=computation_stop.user_id, project_id=project_id
545+
last_run = await comp_runs_repo.get_latest_run_by_project(
546+
project_id=project_id
547547
)
548548
pipeline_state = last_run.result
549549
if utils.is_pipeline_running(last_run.result):
@@ -601,8 +601,8 @@ async def delete_computation(
601601
# check if current state allow to stop the computation
602602
pipeline_state = RunningState.UNKNOWN
603603
with contextlib.suppress(ComputationalRunNotFoundError):
604-
last_run = await comp_runs_repo.get(
605-
user_id=computation_stop.user_id, project_id=project_id
604+
last_run = await comp_runs_repo.get_latest_run_by_project(
605+
project_id=project_id
606606
)
607607
pipeline_state = last_run.result
608608
if utils.is_pipeline_running(pipeline_state):
@@ -636,8 +636,8 @@ def return_last_value(retry_state: Any) -> Any:
636636
before_sleep=before_sleep_log(_logger, logging.INFO),
637637
)
638638
async def check_pipeline_stopped() -> bool:
639-
last_run = await comp_runs_repo.get(
640-
user_id=computation_stop.user_id, project_id=project_id
639+
last_run = await comp_runs_repo.get_latest_run_by_project(
640+
project_id=project_id
641641
)
642642
pipeline_state = last_run.result
643643
return utils.is_pipeline_stopped(pipeline_state)

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,22 @@ async def get(
145145
raise ComputationalRunNotFoundError
146146
return CompRunsAtDB.model_validate(row)
147147

148+
async def get_latest_run_by_project(
149+
self,
150+
project_id: ProjectID,
151+
) -> CompRunsAtDB:
152+
async with pass_or_acquire_connection(self.db_engine) as conn:
153+
result = await conn.execute(
154+
sa.select(comp_runs)
155+
.where(comp_runs.c.project_uuid == f"{project_id}")
156+
.order_by(desc(comp_runs.c.run_id))
157+
.limit(1)
158+
)
159+
row = result.one_or_none()
160+
if not row:
161+
raise ComputationalRunNotFoundError
162+
return CompRunsAtDB.model_validate(row)
163+
148164
async def list_(
149165
self,
150166
*,

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1245,3 +1245,78 @@ async def test_list_group_by_collection_run_id_state_priority_precedence(
12451245
assert len(items) == 1
12461246
collection_item = items[0]
12471247
assert collection_item.state == RunningState.FAILED
1248+
1249+
1250+
async def test_get_latest_run_by_project(
1251+
sqlalchemy_async_engine: AsyncEngine,
1252+
run_metadata: RunMetadataDict,
1253+
faker: Faker,
1254+
publish_project: Callable[[], Awaitable[PublishedProject]],
1255+
create_registered_user: Callable[..., dict[str, Any]],
1256+
with_product: dict[str, Any],
1257+
):
1258+
"""Test that get() with user_id=None retrieves the latest run regardless of user"""
1259+
published_project = await publish_project()
1260+
1261+
# Create a second user
1262+
second_user = create_registered_user()
1263+
1264+
# Create comp runs for the original user
1265+
comp_run_user1_iter1 = await CompRunsRepository(sqlalchemy_async_engine).create(
1266+
user_id=published_project.user["id"],
1267+
project_id=published_project.project.uuid,
1268+
iteration=None,
1269+
metadata=run_metadata,
1270+
use_on_demand_clusters=faker.pybool(),
1271+
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
1272+
collection_run_id=CollectionRunID(faker.uuid4()),
1273+
)
1274+
1275+
# Create comp runs for the second user (this should increment iteration)
1276+
comp_run_user2_iter2 = await CompRunsRepository(sqlalchemy_async_engine).create(
1277+
user_id=second_user["id"],
1278+
project_id=published_project.project.uuid,
1279+
iteration=None,
1280+
metadata=run_metadata,
1281+
use_on_demand_clusters=faker.pybool(),
1282+
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
1283+
collection_run_id=CollectionRunID(faker.uuid4()),
1284+
)
1285+
1286+
# Create another run for the first user (should be iteration 3)
1287+
comp_run_user1_iter3 = await CompRunsRepository(sqlalchemy_async_engine).create(
1288+
user_id=published_project.user["id"],
1289+
project_id=published_project.project.uuid,
1290+
iteration=None,
1291+
metadata=run_metadata,
1292+
use_on_demand_clusters=faker.pybool(),
1293+
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
1294+
collection_run_id=CollectionRunID(faker.uuid4()),
1295+
)
1296+
1297+
# Verify iterations are correct
1298+
assert comp_run_user1_iter1.iteration == 1
1299+
assert comp_run_user2_iter2.iteration == 1
1300+
assert comp_run_user1_iter3.iteration == 2
1301+
1302+
# Test get with user_id=None should return the latest run (highest iteration)
1303+
latest_run = await CompRunsRepository(
1304+
sqlalchemy_async_engine
1305+
).get_latest_run_by_project(
1306+
project_id=published_project.project.uuid,
1307+
)
1308+
assert latest_run == comp_run_user1_iter3
1309+
assert latest_run.iteration == 2
1310+
1311+
# Test get with specific user_id still works
1312+
user1_latest = await CompRunsRepository(sqlalchemy_async_engine).get(
1313+
user_id=published_project.user["id"],
1314+
project_id=published_project.project.uuid,
1315+
)
1316+
assert user1_latest == comp_run_user1_iter3
1317+
1318+
user2_latest = await CompRunsRepository(sqlalchemy_async_engine).get(
1319+
user_id=second_user["id"],
1320+
project_id=published_project.project.uuid,
1321+
)
1322+
assert user2_latest == comp_run_user2_iter2

0 commit comments

Comments
 (0)