Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ async def _check_pipeline_not_running_or_raise_409(
computation: ComputationCreate,
) -> None:
with contextlib.suppress(ComputationalRunNotFoundError):
last_run = await comp_runs_repo.get(
user_id=computation.user_id, project_id=computation.project_id
last_run = await comp_runs_repo.get_latest_run_by_project(
project_id=computation.project_id
)
pipeline_state = last_run.result

Expand Down Expand Up @@ -367,8 +367,8 @@ async def create_or_update_or_start_computation( # noqa: PLR0913 # pylint: disa
last_run: CompRunsAtDB | None = None
pipeline_state = RunningState.NOT_STARTED
with contextlib.suppress(ComputationalRunNotFoundError):
last_run = await comp_runs_repo.get(
user_id=computation.user_id, project_id=computation.project_id
last_run = await comp_runs_repo.get_latest_run_by_project(
project_id=computation.project_id
)
pipeline_state = last_run.result

Expand Down Expand Up @@ -467,7 +467,7 @@ async def get_computation(
last_run: CompRunsAtDB | None = None
pipeline_state = RunningState.NOT_STARTED
with contextlib.suppress(ComputationalRunNotFoundError):
last_run = await comp_runs_repo.get(user_id=user_id, project_id=project_id)
last_run = await comp_runs_repo.get_latest_run_by_project(project_id=project_id)
pipeline_state = last_run.result

_logger.debug(
Expand Down Expand Up @@ -542,8 +542,8 @@ async def stop_computation(
last_run: CompRunsAtDB | None = None
pipeline_state = RunningState.UNKNOWN
with contextlib.suppress(ComputationalRunNotFoundError):
last_run = await comp_runs_repo.get(
user_id=computation_stop.user_id, project_id=project_id
last_run = await comp_runs_repo.get_latest_run_by_project(
project_id=project_id
)
pipeline_state = last_run.result
if utils.is_pipeline_running(last_run.result):
Expand Down Expand Up @@ -601,8 +601,8 @@ async def delete_computation(
# check if current state allow to stop the computation
pipeline_state = RunningState.UNKNOWN
with contextlib.suppress(ComputationalRunNotFoundError):
last_run = await comp_runs_repo.get(
user_id=computation_stop.user_id, project_id=project_id
last_run = await comp_runs_repo.get_latest_run_by_project(
project_id=project_id
)
pipeline_state = last_run.result
if utils.is_pipeline_running(pipeline_state):
Expand Down Expand Up @@ -636,8 +636,8 @@ def return_last_value(retry_state: Any) -> Any:
before_sleep=before_sleep_log(_logger, logging.INFO),
)
async def check_pipeline_stopped() -> bool:
last_run = await comp_runs_repo.get(
user_id=computation_stop.user_id, project_id=project_id
last_run = await comp_runs_repo.get_latest_run_by_project(
project_id=project_id
)
pipeline_state = last_run.result
return utils.is_pipeline_stopped(pipeline_state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,22 @@ async def get(
raise ComputationalRunNotFoundError
return CompRunsAtDB.model_validate(row)

async def get_latest_run_by_project(
self,
project_id: ProjectID,
) -> CompRunsAtDB:
async with pass_or_acquire_connection(self.db_engine) as conn:
result = await conn.execute(
sa.select(comp_runs)
.where(comp_runs.c.project_uuid == f"{project_id}")
.order_by(desc(comp_runs.c.run_id))
.limit(1)
)
row = result.one_or_none()
if not row:
raise ComputationalRunNotFoundError
return CompRunsAtDB.model_validate(row)

async def list_(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1245,3 +1245,77 @@ async def test_list_group_by_collection_run_id_state_priority_precedence(
assert len(items) == 1
collection_item = items[0]
assert collection_item.state == RunningState.FAILED


async def test_get_latest_run_by_project(
sqlalchemy_async_engine: AsyncEngine,
run_metadata: RunMetadataDict,
faker: Faker,
publish_project: Callable[[], Awaitable[PublishedProject]],
create_registered_user: Callable[..., dict[str, Any]],
):
"""Test that get() with user_id=None retrieves the latest run regardless of user"""
published_project = await publish_project()

# Create a second user
second_user = create_registered_user()

# Create comp runs for the original user
comp_run_user1_iter1 = await CompRunsRepository(sqlalchemy_async_engine).create(
user_id=published_project.user["id"],
project_id=published_project.project.uuid,
iteration=None,
metadata=run_metadata,
use_on_demand_clusters=faker.pybool(),
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
collection_run_id=CollectionRunID(faker.uuid4()),
)

# Create comp runs for the second user (this should increment iteration)
comp_run_user2_iter2 = await CompRunsRepository(sqlalchemy_async_engine).create(
user_id=second_user["id"],
project_id=published_project.project.uuid,
iteration=None,
metadata=run_metadata,
use_on_demand_clusters=faker.pybool(),
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
collection_run_id=CollectionRunID(faker.uuid4()),
)

# Create another run for the first user (should be iteration 3)
comp_run_user1_iter3 = await CompRunsRepository(sqlalchemy_async_engine).create(
user_id=published_project.user["id"],
project_id=published_project.project.uuid,
iteration=None,
metadata=run_metadata,
use_on_demand_clusters=faker.pybool(),
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
collection_run_id=CollectionRunID(faker.uuid4()),
)

# Verify iterations are correct
assert comp_run_user1_iter1.iteration == 1
assert comp_run_user2_iter2.iteration == 1
assert comp_run_user1_iter3.iteration == 2

# Test get with user_id=None should return the latest run (highest iteration)
latest_run = await CompRunsRepository(
sqlalchemy_async_engine
).get_latest_run_by_project(
project_id=published_project.project.uuid,
)
assert latest_run == comp_run_user1_iter3
assert latest_run.iteration == 2

# Test get with specific user_id still works
user1_latest = await CompRunsRepository(sqlalchemy_async_engine).get(
user_id=published_project.user["id"],
project_id=published_project.project.uuid,
)
assert user1_latest == comp_run_user1_iter3

user2_latest = await CompRunsRepository(sqlalchemy_async_engine).get(
user_id=second_user["id"],
project_id=published_project.project.uuid,
)
assert user2_latest == comp_run_user2_iter2
Loading