Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d653882
start introducing celery task manager into function job service layer
bisgaard-itis Sep 9, 2025
718f987
cleanup inspection endpoint
bisgaard-itis Sep 10, 2025
575b130
create FunctionJobTaskClientService
bisgaard-itis Sep 10, 2025
1437b91
further refactor
bisgaard-itis Sep 10, 2025
32e3d80
fix tests
bisgaard-itis Sep 10, 2025
981a5d6
test fixes
bisgaard-itis Sep 10, 2025
9c725bc
Merge branch 'master' into bugfix-use-celery-task-manager-in-function…
bisgaard-itis Sep 10, 2025
a454ee7
further test fixes
bisgaard-itis Sep 10, 2025
f046169
cosmetic fix
bisgaard-itis Sep 10, 2025
5264b05
fix for job collection status
bisgaard-itis Sep 10, 2025
931e582
Merge branch 'master' into bugfix-use-celery-task-manager-in-function…
bisgaard-itis Sep 10, 2025
b3cbad6
start improving docs
bisgaard-itis Sep 10, 2025
10f3f24
finish docs
bisgaard-itis Sep 10, 2025
4545efe
Merge branch 'master' into bugfix-use-celery-task-manager-in-function…
bisgaard-itis Sep 10, 2025
9cc387f
fix pylint tests
bisgaard-itis Sep 10, 2025
3cfcd59
Merge branch 'master' into bugfix-use-celery-task-manager-in-function…
bisgaard-itis Sep 11, 2025
b513f6c
make pylint happy
bisgaard-itis Sep 11, 2025
97ff8f9
pylint
bisgaard-itis Sep 11, 2025
e5f8e40
Merge branch 'master' into bugfix-use-celery-task-manager-in-function…
wvangeit Sep 11, 2025
870e5af
Merge branch 'master' into bugfix-use-celery-task-manager-in-function…
bisgaard-itis Sep 16, 2025
ce8f32d
@sanderegg fix
bisgaard-itis Sep 16, 2025
2fc3494
@GitHK clarify if-statement
bisgaard-itis Sep 16, 2025
88c4596
@GitHK cleanup
bisgaard-itis Sep 16, 2025
38bffd2
fix typecheck
bisgaard-itis Sep 16, 2025
a25aabc
factor out models registered in celery
bisgaard-itis Sep 16, 2025
a494caf
create ApiWorkerTaskFilter
bisgaard-itis Sep 16, 2025
1007626
further cleanup
bisgaard-itis Sep 16, 2025
c06b71b
test fix
bisgaard-itis Sep 16, 2025
a31bcc3
cosmetic change
bisgaard-itis Sep 16, 2025
65f5332
typecheck fix
bisgaard-itis Sep 16, 2025
d81d40a
Merge branch 'master' into bugfix-use-celery-task-manager-in-function…
bisgaard-itis Sep 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
569 changes: 356 additions & 213 deletions services/api-server/docs/api-server.drawio.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,32 @@
FunctionInputs,
FunctionJobCollectionID,
FunctionJobID,
FunctionJobStatus,
FunctionOutputs,
FunctionSchemaClass,
ProjectFunctionJob,
RegisteredFunction,
RegisteredFunctionJob,
RegisteredFunctionJobPatch,
RegisteredFunctionJobWithStatus,
RegisteredProjectFunctionJobPatch,
RegisteredSolverFunctionJobPatch,
SolverFunctionJob,
SolverJobID,
TaskID,
)
from models_library.functions_errors import (
FunctionExecuteAccessDeniedError,
FunctionInputsValidationError,
FunctionsExecuteApiAccessDeniedError,
UnsupportedFunctionClassError,
UnsupportedFunctionFunctionJobClassCombinationError,
)
from models_library.products import ProductName
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.projects_state import RunningState
from models_library.rest_pagination import PageMetaInfoLimitOffset, PageOffsetInt
from models_library.rpc_pagination import PageLimitInt
from models_library.users import UserID
from pydantic import ValidationError
from simcore_service_api_server._service_functions import FunctionService
from simcore_service_api_server.services_rpc.storage import StorageService
from sqlalchemy.ext.asyncio import AsyncEngine

from ._service_jobs import JobService
from .exceptions.function_errors import (
FunctionJobCacheNotFoundError,
FunctionJobProjectMissingError,
)
from .models.api_resources import JobLinks
from .models.domain.functions import PreRegisteredFunctionJobData
from .models.schemas.jobs import JobInputs, JobPricingSpecification
Expand Down Expand Up @@ -102,33 +90,6 @@ async def list_function_jobs(
**pagination_kwargs,
)

async def list_function_jobs_with_status(
self,
*,
filter_by_function_id: FunctionID | None = None,
filter_by_function_job_ids: list[FunctionJobID] | None = None,
filter_by_function_job_collection_id: FunctionJobCollectionID | None = None,
pagination_offset: PageOffsetInt | None = None,
pagination_limit: PageLimitInt | None = None,
) -> tuple[
list[RegisteredFunctionJobWithStatus],
PageMetaInfoLimitOffset,
]:
"""Lists all function jobs for a user with pagination"""

pagination_kwargs = as_dict_exclude_none(
pagination_offset=pagination_offset, pagination_limit=pagination_limit
)

return await self._web_rpc_client.list_function_jobs_with_status(
user_id=self.user_id,
product_name=self.product_name,
filter_by_function_id=filter_by_function_id,
filter_by_function_job_ids=filter_by_function_job_ids,
filter_by_function_job_collection_id=filter_by_function_job_collection_id,
**pagination_kwargs,
)

async def validate_function_inputs(
self, *, function_id: FunctionID, inputs: FunctionInputs
) -> tuple[bool, str]:
Expand Down Expand Up @@ -158,54 +119,6 @@ async def validate_function_inputs(
f"Unsupported function schema class {function.input_schema.schema_class}",
)

async def inspect_function_job(
self, function: RegisteredFunction, function_job: RegisteredFunctionJob
) -> FunctionJobStatus:
"""Raises FunctionJobProjectNotRegisteredError if no project is associated with job"""
stored_job_status = await self._web_rpc_client.get_function_job_status(
function_job_id=function_job.uid,
user_id=self.user_id,
product_name=self.product_name,
)

if stored_job_status.status in (RunningState.SUCCESS, RunningState.FAILED):
return stored_job_status

if (
function.function_class == FunctionClass.PROJECT
and function_job.function_class == FunctionClass.PROJECT
):
if function_job.project_job_id is None:
raise FunctionJobProjectMissingError
job_status = await self._job_service.inspect_study_job(
job_id=function_job.project_job_id,
)
elif (function.function_class == FunctionClass.SOLVER) and (
function_job.function_class == FunctionClass.SOLVER
):
if function_job.solver_job_id is None:
raise FunctionJobProjectMissingError
job_status = await self._job_service.inspect_solver_job(
solver_key=function.solver_key,
version=function.solver_version,
job_id=function_job.solver_job_id,
)
else:
raise UnsupportedFunctionFunctionJobClassCombinationError(
function_class=function.function_class,
function_job_class=function_job.function_class,
)

new_job_status = FunctionJobStatus(status=job_status.state)

return await self._web_rpc_client.update_function_job_status(
function_job_id=function_job.uid,
user_id=self.user_id,
product_name=self.product_name,
job_status=new_job_status,
check_write_permissions=False,
)

async def create_function_job_inputs( # pylint: disable=no-self-use
self,
*,
Expand All @@ -220,58 +133,6 @@ async def create_function_job_inputs( # pylint: disable=no-self-use
values=joined_inputs or {},
)

async def get_cached_function_job(
self,
*,
function: RegisteredFunction,
job_inputs: JobInputs,
) -> RegisteredFunctionJob:
"""
N.B. this function checks access rights
raises FunctionsExecuteApiAccessDeniedError if user cannot execute functions
raises FunctionJobCacheNotFoundError if no cached job is found
"""

user_api_access_rights = (
await self._web_rpc_client.get_functions_user_api_access_rights(
user_id=self.user_id, product_name=self.product_name
)
)
if not user_api_access_rights.execute_functions:
raise FunctionsExecuteApiAccessDeniedError(
user_id=self.user_id,
function_id=function.uid,
)

user_permissions = await self._web_rpc_client.get_function_user_permissions(
function_id=function.uid,
user_id=self.user_id,
product_name=self.product_name,
)
if not user_permissions.execute:
raise FunctionExecuteAccessDeniedError(
user_id=self.user_id,
function_id=function.uid,
)

if cached_function_jobs := await self._web_rpc_client.find_cached_function_jobs(
function_id=function.uid,
inputs=job_inputs.values,
user_id=self.user_id,
product_name=self.product_name,
):
for cached_function_job in cached_function_jobs:
job_status = await self.inspect_function_job(
function=function,
function_job=cached_function_job,
)
if job_status.status == RunningState.SUCCESS:
return cached_function_job

raise FunctionJobCacheNotFoundError

async def pre_register_function_job(
self,
*,
Expand Down Expand Up @@ -466,69 +327,3 @@ async def run_function(
raise UnsupportedFunctionClassError(
function_class=function.function_class,
)

async def function_job_outputs(
self,
*,
function: RegisteredFunction,
function_job: RegisteredFunctionJob,
user_id: UserID,
product_name: ProductName,
stored_job_outputs: FunctionOutputs | None,
async_pg_engine: AsyncEngine,
) -> FunctionOutputs:

if stored_job_outputs is not None:
return stored_job_outputs

try:
job_status = await self.inspect_function_job(
function=function,
function_job=function_job,
)
except FunctionJobProjectMissingError:
return None

if job_status.status != RunningState.SUCCESS:
return None

if (
function.function_class == FunctionClass.PROJECT
and function_job.function_class == FunctionClass.PROJECT
):
if function_job.project_job_id is None:
return None
new_outputs = dict(
(
await self._job_service.get_study_job_outputs(
study_id=function.project_id,
job_id=function_job.project_job_id,
)
).results
)
elif (
function.function_class == FunctionClass.SOLVER
and function_job.function_class == FunctionClass.SOLVER
):
if function_job.solver_job_id is None:
return None
new_outputs = dict(
(
await self._job_service.get_solver_job_outputs(
solver_key=function.solver_key,
version=function.solver_version,
job_id=function_job.solver_job_id,
async_pg_engine=async_pg_engine,
)
).results
)
else:
raise UnsupportedFunctionClassError(function_class=function.function_class)

return await self._web_rpc_client.update_function_job_outputs(
function_job_id=function_job.uid,
user_id=user_id,
product_name=product_name,
outputs=new_outputs,
check_write_permissions=False,
)
Loading
Loading