Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@
from ._service_functions import FunctionService
from ._service_jobs import JobService
from .api.dependencies.authentication import Identity
from .exceptions.function_errors import (
FunctionJobCacheNotFoundError,
from .exceptions.backend_errors import (
SolverJobOutputRequestButNotSucceededError,
StudyJobOutputRequestButNotSucceededError,
)
from .exceptions.function_errors import FunctionJobCacheNotFoundError
from .models.api_resources import JobLinks
from .models.domain.celery_models import ApiServerOwnerMetadata
from .models.schemas.functions import FunctionJobCreationTaskStatus
from .models.schemas.jobs import JobInputs, JobPricingSpecification
from .services_http.webserver import AuthSession
from .services_rpc.storage import StorageService
Expand Down Expand Up @@ -260,7 +261,7 @@ async def get_cached_function_job(

raise FunctionJobCacheNotFoundError

async def function_job_outputs(
async def function_job_outputs( # noqa: PLR0911 # too-many-return-statements
self,
*,
function: RegisteredFunction,
Expand All @@ -283,30 +284,36 @@ async def function_job_outputs(
):
if function_job.project_job_id is None:
return None
new_outputs = dict(
(
await self._job_service.get_study_job_outputs(
study_id=function.project_id,
job_id=function_job.project_job_id,
)
).results
)
try:
new_outputs = dict(
(
await self._job_service.get_study_job_outputs(
study_id=function.project_id,
job_id=function_job.project_job_id,
)
).results
)
except StudyJobOutputRequestButNotSucceededError:
return None
elif (
function.function_class == FunctionClass.SOLVER
and function_job.function_class == FunctionClass.SOLVER
):
if function_job.solver_job_id is None:
return None
new_outputs = dict(
(
await self._job_service.get_solver_job_outputs(
solver_key=function.solver_key,
version=function.solver_version,
job_id=function_job.solver_job_id,
async_pg_engine=self._async_pg_engine,
)
).results
)
try:
new_outputs = dict(
(
await self._job_service.get_solver_job_outputs(
solver_key=function.solver_key,
version=function.solver_version,
job_id=function_job.solver_job_id,
async_pg_engine=self._async_pg_engine,
)
).results
)
except SolverJobOutputRequestButNotSucceededError:
return None
else:
raise UnsupportedFunctionClassError(function_class=function.function_class)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from models_library.projects import ProjectID
from models_library.projects_nodes import InputID, InputTypes
from models_library.projects_nodes_io import BaseFileLink, NodeID
from models_library.projects_state import RunningState
from models_library.rest_pagination import PageMetaInfoLimitOffset, PageOffsetInt
from models_library.rpc.webserver.projects import ProjectJobRpcGet
from models_library.rpc_pagination import PageLimitInt
Expand All @@ -29,7 +30,11 @@
from sqlalchemy.ext.asyncio import AsyncEngine

from ._service_solvers import SolverService
from .exceptions.backend_errors import JobAssetsMissingError
from .exceptions.backend_errors import (
JobAssetsMissingError,
SolverJobOutputRequestButNotSucceededError,
StudyJobOutputRequestButNotSucceededError,
)
from .exceptions.custom_errors import (
InsufficientCreditsError,
MissingWalletError,
Expand Down Expand Up @@ -308,6 +313,15 @@ async def get_solver_job_outputs(
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
_logger.debug("Get Job '%s' outputs", job_name)

job_status = await self.inspect_solver_job(
solver_key=solver_key, version=version, job_id=job_id
)

if job_status.state != RunningState.SUCCESS:
raise SolverJobOutputRequestButNotSucceededError(
job_id=job_id, state=job_status.state
)

project_marked_as_job = await self.get_job(
job_id=job_id,
job_parent_resource_name=Solver.compose_resource_name(
Expand Down Expand Up @@ -379,9 +393,16 @@ async def get_study_job_outputs(
job_name = compose_study_job_resource_name(study_id, job_id)
_logger.debug("Getting Job Outputs for '%s'", job_name)

job_status = await self.inspect_study_job(job_id=job_id)

if job_status.state != RunningState.SUCCESS:
raise StudyJobOutputRequestButNotSucceededError(
job_id=job_id, state=job_status.state
)
project_outputs = await self._web_rest_client.get_project_outputs(
project_id=job_id
)

return await create_job_outputs_from_project_outputs(
job_id, project_outputs, self.user_id, self._storage_rest_client
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,13 @@ class JobAssetsMissingError(BaseBackEndError):
class CeleryTaskNotFoundError(BaseBackEndError):
msg_template = "Task {task_uuid} not found"
status_code = status.HTTP_404_NOT_FOUND


class SolverJobOutputRequestButNotSucceededError(BaseBackEndError):
msg_template = "Solver job '{job_id}' not succeeded, when output is requested. Current state: {state}"
status_code = status.HTTP_409_CONFLICT


class StudyJobOutputRequestButNotSucceededError(BaseBackEndError):
msg_template = "Study job '{job_id}' not succeeded, when output is requested. Current state: {state}"
status_code = status.HTTP_409_CONFLICT
Loading