Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@
from ._service_functions import FunctionService
from ._service_jobs import JobService
from .api.dependencies.authentication import Identity
from .exceptions.function_errors import (
FunctionJobCacheNotFoundError,
from .exceptions.backend_errors import (
SolverJobOutputRequestButNotSucceededError,
StudyJobOutputRequestButNotSucceededError,
)
from .exceptions.function_errors import FunctionJobCacheNotFoundError
from .models.api_resources import JobLinks
from .models.domain.celery_models import ApiServerOwnerMetadata
from .models.schemas.functions import FunctionJobCreationTaskStatus
from .models.schemas.jobs import JobInputs, JobPricingSpecification
from .services_http.webserver import AuthSession
from .services_rpc.storage import StorageService
Expand Down Expand Up @@ -260,7 +261,7 @@ async def get_cached_function_job(

raise FunctionJobCacheNotFoundError

async def function_job_outputs(
async def function_job_outputs( # noqa: PLR0911 # pylint: disable=too-many-return-statements
self,
*,
function: RegisteredFunction,
Expand All @@ -283,30 +284,36 @@ async def function_job_outputs(
):
if function_job.project_job_id is None:
return None
new_outputs = dict(
(
await self._job_service.get_study_job_outputs(
study_id=function.project_id,
job_id=function_job.project_job_id,
)
).results
)
try:
new_outputs = dict(
(
await self._job_service.get_study_job_outputs(
study_id=function.project_id,
job_id=function_job.project_job_id,
)
).results
)
except StudyJobOutputRequestButNotSucceededError:
return None
elif (
function.function_class == FunctionClass.SOLVER
and function_job.function_class == FunctionClass.SOLVER
):
if function_job.solver_job_id is None:
return None
new_outputs = dict(
(
await self._job_service.get_solver_job_outputs(
solver_key=function.solver_key,
version=function.solver_version,
job_id=function_job.solver_job_id,
async_pg_engine=self._async_pg_engine,
)
).results
)
try:
new_outputs = dict(
(
await self._job_service.get_solver_job_outputs(
solver_key=function.solver_key,
version=function.solver_version,
job_id=function_job.solver_job_id,
async_pg_engine=self._async_pg_engine,
)
).results
)
except SolverJobOutputRequestButNotSucceededError:
return None
else:
raise UnsupportedFunctionClassError(function_class=function.function_class)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from models_library.projects import ProjectID
from models_library.projects_nodes import InputID, InputTypes
from models_library.projects_nodes_io import BaseFileLink, NodeID
from models_library.projects_state import RunningState
from models_library.rest_pagination import PageMetaInfoLimitOffset, PageOffsetInt
from models_library.rpc.webserver.projects import ProjectJobRpcGet
from models_library.rpc_pagination import PageLimitInt
Expand All @@ -29,7 +30,11 @@
from sqlalchemy.ext.asyncio import AsyncEngine

from ._service_solvers import SolverService
from .exceptions.backend_errors import JobAssetsMissingError
from .exceptions.backend_errors import (
JobAssetsMissingError,
SolverJobOutputRequestButNotSucceededError,
StudyJobOutputRequestButNotSucceededError,
)
from .exceptions.custom_errors import (
InsufficientCreditsError,
MissingWalletError,
Expand Down Expand Up @@ -308,6 +313,15 @@ async def get_solver_job_outputs(
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
_logger.debug("Get Job '%s' outputs", job_name)

job_status = await self.inspect_solver_job(
solver_key=solver_key, version=version, job_id=job_id
)

if job_status.state != RunningState.SUCCESS:
raise SolverJobOutputRequestButNotSucceededError(
job_id=job_id, state=job_status.state
)

project_marked_as_job = await self.get_job(
job_id=job_id,
job_parent_resource_name=Solver.compose_resource_name(
Expand Down Expand Up @@ -379,9 +393,16 @@ async def get_study_job_outputs(
job_name = compose_study_job_resource_name(study_id, job_id)
_logger.debug("Getting Job Outputs for '%s'", job_name)

job_status = await self.inspect_study_job(job_id=job_id)

if job_status.state != RunningState.SUCCESS:
raise StudyJobOutputRequestButNotSucceededError(
job_id=job_id, state=job_status.state
)
project_outputs = await self._web_rest_client.get_project_outputs(
project_id=job_id
)

return await create_job_outputs_from_project_outputs(
job_id, project_outputs, self.user_id, self._storage_rest_client
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,13 @@ class JobAssetsMissingError(BaseBackEndError):
class CeleryTaskNotFoundError(BaseBackEndError):
msg_template = "Task {task_uuid} not found"
status_code = status.HTTP_404_NOT_FOUND


class SolverJobOutputRequestButNotSucceededError(BaseBackEndError):
msg_template = "Solver job '{job_id}' not succeeded, when output is requested. Current state: {state}"
status_code = status.HTTP_409_CONFLICT


class StudyJobOutputRequestButNotSucceededError(BaseBackEndError):
msg_template = "Study job '{job_id}' not succeeded, when output is requested. Current state: {state}"
status_code = status.HTTP_409_CONFLICT
33 changes: 3 additions & 30 deletions services/api-server/tests/unit/api_functions/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
from pytest_simcore.helpers.typing_env import EnvVarsDict
from pytest_simcore.helpers.typing_mock import HandlerMockFactory
from servicelib.rabbitmq.rpc_interfaces.webserver.v1.functions import FunctionsRpcApi
from servicelib.rabbitmq.rpc_interfaces.webserver.v1.projects import ProjectsRpcApi
from simcore_service_api_server.api.dependencies import services


@pytest.fixture
Expand All @@ -62,8 +65,6 @@ async def mock_dependency_get_celery_task_manager(
def _new(app: FastAPI):
return None

from simcore_service_api_server.api.dependencies import services

return mocker.patch.object(services, services.get_task_manager.__name__, _new)


Expand Down Expand Up @@ -243,9 +244,6 @@ def _create(
exception: Exception | None = None,
side_effect: Callable | None = None,
) -> MockType:
from servicelib.rabbitmq.rpc_interfaces.webserver.v1.functions import (
FunctionsRpcApi,
)

assert exception is None or side_effect is None

Expand All @@ -272,9 +270,6 @@ def _create(
exception: Exception | None = None,
side_effect: Callable | None = None,
) -> MockType:
from servicelib.rabbitmq.rpc_interfaces.webserver.v1.projects import (
ProjectsRpcApi,
)

assert exception is None or side_effect is None

Expand All @@ -286,25 +281,3 @@ def _create(
)

return _create


@pytest.fixture()
def mock_method_in_jobs_service(
mocked_app_rpc_dependencies: None,
mocker: MockerFixture,
) -> Callable[[str, Any, Exception | None], MockType]:
def _create(
method_name: str = "",
return_value: Any = None,
exception: Exception | None = None,
) -> MockType:
from simcore_service_api_server._service_jobs import JobService

return mocker.patch.object(
JobService,
method_name,
return_value=return_value,
side_effect=exception,
)

return _create
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
# pylint: disable=unused-variable
# pylint: disable=too-many-arguments

import datetime
import uuid
from collections.abc import Callable
from pathlib import Path
from pprint import pprint
from typing import Any
Expand All @@ -14,7 +17,8 @@
import pytest
from faker import Faker
from fastapi import FastAPI
from models_library.services import ServiceMetaDataPublished
from models_library.projects import ProjectID
from models_library.projects_state import RunningState
from models_library.utils.fastapi_encoders import jsonable_encoder
from pydantic import AnyUrl, HttpUrl, TypeAdapter
from pytest_mock import MockType
Expand Down Expand Up @@ -129,23 +133,8 @@ def mocked_directorv2_rest_api(
def test_download_presigned_link(
presigned_download_link: AnyUrl, tmp_path: Path, project_id: str, node_id: str
):
"""Cheks that the generation of presigned_download_link works as expected"""
"""Checks that the generation of presigned_download_link works as expected"""
r = httpx.get(f"{presigned_download_link}")
## pprint(dict(r.headers))
# r.headers looks like:
# {
# 'access-control-allow-origin': '*',
# 'connection': 'close',
# 'content-length': '491',
# 'content-md5': 'HoY5Kfgqb9VSdS44CYBxnA==',
# 'content-type': 'binary/octet-stream',
# 'date': 'Thu, 19 May 2022 22:16:48 GMT',
# 'etag': '"1e863929f82a6fd552752e380980719c"',
# 'last-modified': 'Thu, 19 May 2022 22:16:48 GMT',
# 'server': 'Werkzeug/2.1.2 Python/3.9.12',
# 'x-amz-version-id': 'null',
# 'x-amzn-requestid': 'WMAPXWFR2G4EJRVYBNJDRHTCXJ7NBRMDN7QQNHTQ5RYAQ34ZZNAL'
# }
assert r.status_code == status.HTTP_200_OK

expected_fname = f"{project_id}-{node_id}.log"
Expand Down Expand Up @@ -196,6 +185,55 @@ async def test_solver_logs(
pprint(dict(resp.headers)) # noqa: T203


@pytest.mark.parametrize(
"job_outputs, project_id, job_state, expected_output, expected_status_code, expected_error_message",
[
(
None,
uuid.uuid4(),
RunningState.STARTED,
None,
status.HTTP_409_CONFLICT,
"not succeeded, when output is requested",
),
],
)
async def test_solver_job_outputs(
client: httpx.AsyncClient,
auth: httpx.BasicAuth,
job_outputs: dict[str, Any] | None,
project_id: ProjectID,
job_state: RunningState,
expected_output: dict[str, Any] | None,
mock_method_in_jobs_service: Callable[[str, Any], MockType],
expected_status_code: int,
expected_error_message: str | None,
solver_key: str,
solver_version: str,
) -> None:

job_status = JobStatus(
state=job_state,
job_id=project_id,
submitted_at=datetime.datetime.now(tz=datetime.UTC),
started_at=datetime.datetime.now(tz=datetime.UTC),
stopped_at=datetime.datetime.now(tz=datetime.UTC),
progress=0,
)
mock_method_in_jobs_service("inspect_solver_job", job_status)

response = await client.get(
f"{API_VTAG}/solvers/{solver_key}/releases/{solver_version}/jobs/{project_id}/outputs",
auth=auth,
)
assert response.status_code == expected_status_code
data = response.json()
if expected_error_message:
assert "not succeeded, when output is requested" in data["errors"][0]
if expected_output:
assert data == expected_output


@pytest.mark.acceptance_test(
"New feature https://github.com/ITISFoundation/osparc-simcore/issues/3940"
)
Expand Down Expand Up @@ -311,12 +349,6 @@ async def test_run_solver_job(
"owner",
} == set(oas["components"]["schemas"]["ServiceGet"]["required"])

example = next(
e
for e in ServiceMetaDataPublished.model_json_schema()["examples"]
if "boot-options" in e
)

# ---------------------------------------------------------------------------------------------------------

resp = await client.get(f"/{API_VTAG}/meta")
Expand Down
Loading
Loading