Skip to content

Commit a40a138

Browse files
wvangeitbisgaard-itis
authored andcommitted
🎨 Check study and solver job status before returning output (ITISFoundation#8511)
1 parent 43b9082 commit a40a138

File tree

9 files changed

+258
-115
lines changed

9 files changed

+258
-115
lines changed

services/api-server/src/simcore_service_api_server/_service_function_jobs_task_client.py

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@
4848
from .models.domain.functions import FunctionJobPatch
4949
from .models.schemas.functions import FunctionJobCreationTaskStatus
5050
from .models.schemas.jobs import JobPricingSpecification
51+
from .exceptions.backend_errors import (
52+
SolverJobOutputRequestButNotSucceededError,
53+
StudyJobOutputRequestButNotSucceededError,
54+
)
55+
from .exceptions.function_errors import FunctionJobCacheNotFoundError
56+
from .models.api_resources import JobLinks
57+
from .models.domain.celery_models import ApiServerOwnerMetadata
58+
from .models.schemas.jobs import JobInputs, JobPricingSpecification
5159
from .services_http.webserver import AuthSession
5260
from .services_rpc.storage import StorageService
5361
from .services_rpc.wb_api_server import WbApiRpcClient
@@ -260,30 +268,36 @@ async def function_job_outputs(
260268
):
261269
if function_job.project_job_id is None:
262270
return None
263-
new_outputs = dict(
264-
(
265-
await self._job_service.get_study_job_outputs(
266-
study_id=function.project_id,
267-
job_id=function_job.project_job_id,
268-
)
269-
).results
270-
)
271+
try:
272+
new_outputs = dict(
273+
(
274+
await self._job_service.get_study_job_outputs(
275+
study_id=function.project_id,
276+
job_id=function_job.project_job_id,
277+
)
278+
).results
279+
)
280+
except StudyJobOutputRequestButNotSucceededError:
281+
return None
271282
elif (
272283
function.function_class == FunctionClass.SOLVER
273284
and function_job.function_class == FunctionClass.SOLVER
274285
):
275286
if function_job.solver_job_id is None:
276287
return None
277-
new_outputs = dict(
278-
(
279-
await self._job_service.get_solver_job_outputs(
280-
solver_key=function.solver_key,
281-
version=function.solver_version,
282-
job_id=function_job.solver_job_id,
283-
async_pg_engine=self._async_pg_engine,
284-
)
285-
).results
286-
)
288+
try:
289+
new_outputs = dict(
290+
(
291+
await self._job_service.get_solver_job_outputs(
292+
solver_key=function.solver_key,
293+
version=function.solver_version,
294+
job_id=function_job.solver_job_id,
295+
async_pg_engine=self._async_pg_engine,
296+
)
297+
).results
298+
)
299+
except SolverJobOutputRequestButNotSucceededError:
300+
return None
287301
else:
288302
raise UnsupportedFunctionClassError(function_class=function.function_class)
289303

services/api-server/src/simcore_service_api_server/_service_jobs.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from models_library.projects import ProjectID
2121
from models_library.projects_nodes import InputID, InputTypes
2222
from models_library.projects_nodes_io import BaseFileLink, NodeID
23+
from models_library.projects_state import RunningState
2324
from models_library.rest_pagination import PageMetaInfoLimitOffset, PageOffsetInt
2425
from models_library.rpc.webserver.projects import ProjectJobRpcGet
2526
from models_library.rpc_pagination import PageLimitInt
@@ -29,7 +30,11 @@
2930
from sqlalchemy.ext.asyncio import AsyncEngine
3031

3132
from ._service_solvers import SolverService
32-
from .exceptions.backend_errors import JobAssetsMissingError
33+
from .exceptions.backend_errors import (
34+
JobAssetsMissingError,
35+
SolverJobOutputRequestButNotSucceededError,
36+
StudyJobOutputRequestButNotSucceededError,
37+
)
3338
from .exceptions.custom_errors import (
3439
InsufficientCreditsError,
3540
MissingWalletError,
@@ -308,6 +313,15 @@ async def get_solver_job_outputs(
308313
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
309314
_logger.debug("Get Job '%s' outputs", job_name)
310315

316+
job_status = await self.inspect_solver_job(
317+
solver_key=solver_key, version=version, job_id=job_id
318+
)
319+
320+
if job_status.state != RunningState.SUCCESS:
321+
raise SolverJobOutputRequestButNotSucceededError(
322+
job_id=job_id, state=job_status.state
323+
)
324+
311325
project_marked_as_job = await self.get_job(
312326
job_id=job_id,
313327
job_parent_resource_name=Solver.compose_resource_name(
@@ -379,9 +393,16 @@ async def get_study_job_outputs(
379393
job_name = compose_study_job_resource_name(study_id, job_id)
380394
_logger.debug("Getting Job Outputs for '%s'", job_name)
381395

396+
job_status = await self.inspect_study_job(job_id=job_id)
397+
398+
if job_status.state != RunningState.SUCCESS:
399+
raise StudyJobOutputRequestButNotSucceededError(
400+
job_id=job_id, state=job_status.state
401+
)
382402
project_outputs = await self._web_rest_client.get_project_outputs(
383403
project_id=job_id
384404
)
405+
385406
return await create_job_outputs_from_project_outputs(
386407
job_id, project_outputs, self.user_id, self._storage_rest_client
387408
)

services/api-server/src/simcore_service_api_server/exceptions/backend_errors.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,13 @@ class JobAssetsMissingError(BaseBackEndError):
151151
class CeleryTaskNotFoundError(BaseBackEndError):
152152
msg_template = "Task {task_uuid} not found"
153153
status_code = status.HTTP_404_NOT_FOUND
154+
155+
156+
class SolverJobOutputRequestButNotSucceededError(BaseBackEndError):
157+
msg_template = "Solver job '{job_id}' not succeeded, when output is requested. Current state: {state}"
158+
status_code = status.HTTP_409_CONFLICT
159+
160+
161+
class StudyJobOutputRequestButNotSucceededError(BaseBackEndError):
162+
msg_template = "Study job '{job_id}' not succeeded, when output is requested. Current state: {state}"
163+
status_code = status.HTTP_409_CONFLICT

services/api-server/tests/unit/api_functions/conftest.py

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
3939
from pytest_simcore.helpers.typing_env import EnvVarsDict
4040
from pytest_simcore.helpers.typing_mock import HandlerMockFactory
41+
from servicelib.rabbitmq.rpc_interfaces.webserver.v1.functions import FunctionsRpcApi
42+
from servicelib.rabbitmq.rpc_interfaces.webserver.v1.projects import ProjectsRpcApi
43+
from simcore_service_api_server.api.dependencies import services
4144

4245

4346
@pytest.fixture
@@ -62,8 +65,6 @@ async def mock_dependency_get_celery_task_manager(
6265
def _new(app: FastAPI):
6366
return None
6467

65-
from simcore_service_api_server.api.dependencies import services
66-
6768
return mocker.patch.object(services, services.get_task_manager.__name__, _new)
6869

6970

@@ -243,9 +244,6 @@ def _create(
243244
exception: Exception | None = None,
244245
side_effect: Callable | None = None,
245246
) -> MockType:
246-
from servicelib.rabbitmq.rpc_interfaces.webserver.v1.functions import (
247-
FunctionsRpcApi,
248-
)
249247

250248
assert exception is None or side_effect is None
251249

@@ -272,9 +270,6 @@ def _create(
272270
exception: Exception | None = None,
273271
side_effect: Callable | None = None,
274272
) -> MockType:
275-
from servicelib.rabbitmq.rpc_interfaces.webserver.v1.projects import (
276-
ProjectsRpcApi,
277-
)
278273

279274
assert exception is None or side_effect is None
280275

@@ -286,25 +281,3 @@ def _create(
286281
)
287282

288283
return _create
289-
290-
291-
@pytest.fixture()
292-
def mock_method_in_jobs_service(
293-
mocked_app_rpc_dependencies: None,
294-
mocker: MockerFixture,
295-
) -> Callable[[str, Any, Exception | None], MockType]:
296-
def _create(
297-
method_name: str = "",
298-
return_value: Any = None,
299-
exception: Exception | None = None,
300-
) -> MockType:
301-
from simcore_service_api_server._service_jobs import JobService
302-
303-
return mocker.patch.object(
304-
JobService,
305-
method_name,
306-
return_value=return_value,
307-
side_effect=exception,
308-
)
309-
310-
return _create

services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs.py

Lines changed: 55 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
# pylint: disable=unused-variable
44
# pylint: disable=too-many-arguments
55

6+
import datetime
7+
import uuid
8+
from collections.abc import Callable
69
from pathlib import Path
710
from pprint import pprint
811
from typing import Any
@@ -14,7 +17,8 @@
1417
import pytest
1518
from faker import Faker
1619
from fastapi import FastAPI
17-
from models_library.services import ServiceMetaDataPublished
20+
from models_library.projects import ProjectID
21+
from models_library.projects_state import RunningState
1822
from models_library.utils.fastapi_encoders import jsonable_encoder
1923
from pydantic import AnyUrl, HttpUrl, TypeAdapter
2024
from pytest_mock import MockType
@@ -129,23 +133,8 @@ def mocked_directorv2_rest_api(
129133
def test_download_presigned_link(
130134
presigned_download_link: AnyUrl, tmp_path: Path, project_id: str, node_id: str
131135
):
132-
"""Cheks that the generation of presigned_download_link works as expected"""
136+
"""Checks that the generation of presigned_download_link works as expected"""
133137
r = httpx.get(f"{presigned_download_link}")
134-
## pprint(dict(r.headers))
135-
# r.headers looks like:
136-
# {
137-
# 'access-control-allow-origin': '*',
138-
# 'connection': 'close',
139-
# 'content-length': '491',
140-
# 'content-md5': 'HoY5Kfgqb9VSdS44CYBxnA==',
141-
# 'content-type': 'binary/octet-stream',
142-
# 'date': 'Thu, 19 May 2022 22:16:48 GMT',
143-
# 'etag': '"1e863929f82a6fd552752e380980719c"',
144-
# 'last-modified': 'Thu, 19 May 2022 22:16:48 GMT',
145-
# 'server': 'Werkzeug/2.1.2 Python/3.9.12',
146-
# 'x-amz-version-id': 'null',
147-
# 'x-amzn-requestid': 'WMAPXWFR2G4EJRVYBNJDRHTCXJ7NBRMDN7QQNHTQ5RYAQ34ZZNAL'
148-
# }
149138
assert r.status_code == status.HTTP_200_OK
150139

151140
expected_fname = f"{project_id}-{node_id}.log"
@@ -196,6 +185,55 @@ async def test_solver_logs(
196185
pprint(dict(resp.headers)) # noqa: T203
197186

198187

188+
@pytest.mark.parametrize(
189+
"job_outputs, project_id, job_state, expected_output, expected_status_code, expected_error_message",
190+
[
191+
(
192+
None,
193+
uuid.uuid4(),
194+
RunningState.STARTED,
195+
None,
196+
status.HTTP_409_CONFLICT,
197+
"not succeeded, when output is requested",
198+
),
199+
],
200+
)
201+
async def test_solver_job_outputs(
202+
client: httpx.AsyncClient,
203+
auth: httpx.BasicAuth,
204+
job_outputs: dict[str, Any] | None,
205+
project_id: ProjectID,
206+
job_state: RunningState,
207+
expected_output: dict[str, Any] | None,
208+
mock_method_in_jobs_service: Callable[[str, Any], MockType],
209+
expected_status_code: int,
210+
expected_error_message: str | None,
211+
solver_key: str,
212+
solver_version: str,
213+
) -> None:
214+
215+
job_status = JobStatus(
216+
state=job_state,
217+
job_id=project_id,
218+
submitted_at=datetime.datetime.now(tz=datetime.UTC),
219+
started_at=datetime.datetime.now(tz=datetime.UTC),
220+
stopped_at=datetime.datetime.now(tz=datetime.UTC),
221+
progress=0,
222+
)
223+
mock_method_in_jobs_service("inspect_solver_job", job_status)
224+
225+
response = await client.get(
226+
f"{API_VTAG}/solvers/{solver_key}/releases/{solver_version}/jobs/{project_id}/outputs",
227+
auth=auth,
228+
)
229+
assert response.status_code == expected_status_code
230+
data = response.json()
231+
if expected_error_message:
232+
assert "not succeeded, when output is requested" in data["errors"][0]
233+
if expected_output:
234+
assert data == expected_output
235+
236+
199237
@pytest.mark.acceptance_test(
200238
"New feature https://github.com/ITISFoundation/osparc-simcore/issues/3940"
201239
)
@@ -311,12 +349,6 @@ async def test_run_solver_job(
311349
"owner",
312350
} == set(oas["components"]["schemas"]["ServiceGet"]["required"])
313351

314-
example = next(
315-
e
316-
for e in ServiceMetaDataPublished.model_json_schema()["examples"]
317-
if "boot-options" in e
318-
)
319-
320352
# ---------------------------------------------------------------------------------------------------------
321353

322354
resp = await client.get(f"/{API_VTAG}/meta")

0 commit comments

Comments
 (0)