Skip to content

Commit d653882

Browse files
committed
start introducing celery task manager into function job service layer
1 parent d443f36 commit d653882

File tree

3 files changed

+59
-58
lines changed

3 files changed

+59
-58
lines changed

services/api-server/src/simcore_service_api_server/_service_function_jobs.py

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from dataclasses import dataclass
2-
from typing import overload
2+
from typing import Final, overload
33

44
import jsonschema
55
from common_library.exclude import as_dict_exclude_none
@@ -38,21 +38,28 @@
3838
from models_library.rpc_pagination import PageLimitInt
3939
from models_library.users import UserID
4040
from pydantic import ValidationError
41+
from servicelib.celery.models import TaskUUID
42+
from servicelib.celery.task_manager import TaskManager
4143
from simcore_service_api_server._service_functions import FunctionService
4244
from simcore_service_api_server.services_rpc.storage import StorageService
4345
from sqlalchemy.ext.asyncio import AsyncEngine
4446

4547
from ._service_jobs import JobService
48+
from .api.routes.tasks import _get_task_filter
4649
from .exceptions.function_errors import (
4750
FunctionJobCacheNotFoundError,
48-
FunctionJobProjectMissingError,
4951
)
5052
from .models.api_resources import JobLinks
5153
from .models.domain.functions import PreRegisteredFunctionJobData
5254
from .models.schemas.jobs import JobInputs, JobPricingSpecification
5355
from .services_http.webserver import AuthSession
5456
from .services_rpc.wb_api_server import WbApiRpcClient
5557

58+
_JOB_CREATION_TASK_STATUS_PREFIX: Final[str] = "JOB_CREATION_TASK_STATUS_"
59+
_JOB_CREATION_TASK_NOT_YET_SCHEDULED_STATUS: Final[str] = (
60+
f"{_JOB_CREATION_TASK_STATUS_PREFIX}NOT_YET_SCHEDULED"
61+
)
62+
5663

5764
def join_inputs(
5865
default_inputs: FunctionInputs | None,
@@ -68,6 +75,23 @@ def join_inputs(
6875
return {**default_inputs, **function_inputs}
6976

7077

78+
async def _celery_task_status(
79+
job_creation_task_id: TaskID | None,
80+
task_manager: TaskManager,
81+
user_id: UserID,
82+
product_name: ProductName,
83+
) -> FunctionJobStatus:
84+
if job_creation_task_id is None:
85+
return FunctionJobStatus(status=_JOB_CREATION_TASK_NOT_YET_SCHEDULED_STATUS)
86+
task_filter = _get_task_filter(user_id, product_name)
87+
task_status = await task_manager.get_task_status(
88+
task_uuid=TaskUUID(job_creation_task_id), task_filter=task_filter
89+
)
90+
return FunctionJobStatus(
91+
status=f"{_JOB_CREATION_TASK_STATUS_PREFIX}{task_status.task_state}"
92+
)
93+
94+
7195
@dataclass(frozen=True, kw_only=True)
7296
class FunctionJobService:
7397
user_id: UserID
@@ -159,7 +183,10 @@ async def validate_function_inputs(
159183
)
160184

161185
async def inspect_function_job(
162-
self, function: RegisteredFunction, function_job: RegisteredFunctionJob
186+
self,
187+
function: RegisteredFunction,
188+
function_job: RegisteredFunctionJob,
189+
task_manager: TaskManager,
163190
) -> FunctionJobStatus:
164191
"""Raises FunctionJobProjectNotRegisteredError if no project is associated with job"""
165192
stored_job_status = await self._web_rpc_client.get_function_job_status(
@@ -176,15 +203,25 @@ async def inspect_function_job(
176203
and function_job.function_class == FunctionClass.PROJECT
177204
):
178205
if function_job.project_job_id is None:
179-
raise FunctionJobProjectMissingError
206+
return await _celery_task_status(
207+
job_creation_task_id=function_job.job_creation_task_id,
208+
task_manager=task_manager,
209+
user_id=self.user_id,
210+
product_name=self.product_name,
211+
)
180212
job_status = await self._job_service.inspect_study_job(
181213
job_id=function_job.project_job_id,
182214
)
183215
elif (function.function_class == FunctionClass.SOLVER) and (
184216
function_job.function_class == FunctionClass.SOLVER
185217
):
186218
if function_job.solver_job_id is None:
187-
raise FunctionJobProjectMissingError
219+
return await _celery_task_status(
220+
job_creation_task_id=function_job.job_creation_task_id,
221+
task_manager=task_manager,
222+
user_id=self.user_id,
223+
product_name=self.product_name,
224+
)
188225
job_status = await self._job_service.inspect_solver_job(
189226
solver_key=function.solver_key,
190227
version=function.solver_version,
@@ -225,6 +262,7 @@ async def get_cached_function_job(
225262
*,
226263
function: RegisteredFunction,
227264
job_inputs: JobInputs,
265+
task_manager: TaskManager,
228266
) -> RegisteredFunctionJob:
229267
"""
230268
N.B. this function checks access rights
@@ -266,6 +304,7 @@ async def get_cached_function_job(
266304
job_status = await self.inspect_function_job(
267305
function=function,
268306
function_job=cached_function_job,
307+
task_manager=task_manager,
269308
)
270309
if job_status.status == RunningState.SUCCESS:
271310
return cached_function_job
@@ -476,18 +515,15 @@ async def function_job_outputs(
476515
product_name: ProductName,
477516
stored_job_outputs: FunctionOutputs | None,
478517
async_pg_engine: AsyncEngine,
518+
task_manager: TaskManager,
479519
) -> FunctionOutputs:
480520

481521
if stored_job_outputs is not None:
482522
return stored_job_outputs
483523

484-
try:
485-
job_status = await self.inspect_function_job(
486-
function=function,
487-
function_job=function_job,
488-
)
489-
except FunctionJobProjectMissingError:
490-
return None
524+
job_status = await self.inspect_function_job(
525+
function=function, function_job=function_job, task_manager=task_manager
526+
)
491527

492528
if job_status.status != RunningState.SUCCESS:
493529
return None

services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import Annotated, Final
22

3-
from fastapi import APIRouter, Depends, status
3+
from fastapi import APIRouter, Depends, FastAPI, status
44
from fastapi_pagination.api import create_page
55
from fastapi_pagination.bases import AbstractPage
66
from models_library.api_schemas_webserver.functions import (
@@ -14,12 +14,14 @@
1414
from models_library.products import ProductName
1515
from models_library.users import UserID
1616
from servicelib.utils import limited_gather
17-
from simcore_service_api_server._service_function_jobs import FunctionJobService
1817

18+
from ..._service_function_jobs import FunctionJobService
1919
from ...models.pagination import Page, PaginationParams
2020
from ...models.schemas.errors import ErrorGet
2121
from ...services_rpc.wb_api_server import WbApiRpcClient
22+
from ..dependencies.application import get_app
2223
from ..dependencies.authentication import get_current_user_id, get_product_name
24+
from ..dependencies.celery import get_task_manager
2325
from ..dependencies.functions import (
2426
get_function_from_functionjobid,
2527
)
@@ -254,6 +256,7 @@ async def function_job_collection_list_function_jobs_list(
254256
),
255257
)
256258
async def function_job_collection_status(
259+
app: Annotated[FastAPI, Depends(get_app)],
257260
function_job_collection_id: FunctionJobCollectionID,
258261
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
259262
user_id: Annotated[UserID, Depends(get_current_user_id)], # Updated type
@@ -262,6 +265,7 @@ async def function_job_collection_status(
262265
FunctionJobService, Depends(get_function_job_service)
263266
],
264267
) -> FunctionJobCollectionStatus:
268+
task_manager = get_task_manager(app)
265269
function_job_collection = await get_function_job_collection(
266270
function_job_collection_id=function_job_collection_id,
267271
wb_api_rpc=wb_api_rpc,
@@ -284,6 +288,7 @@ async def function_job_collection_status(
284288
user_id=user_id,
285289
product_name=product_name,
286290
),
291+
task_manager=task_manager,
287292
)
288293
for function_job_id in function_job_collection.job_ids
289294
]

services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from logging import getLogger
22
from typing import Annotated, Final
33

4-
from common_library.error_codes import create_error_code
54
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Query, status
65
from fastapi_pagination.api import create_page
76
from models_library.api_schemas_long_running_tasks.tasks import TaskGet
@@ -16,20 +15,16 @@
1615
from models_library.functions import RegisteredFunction
1716
from models_library.functions_errors import (
1817
UnsupportedFunctionClassError,
19-
UnsupportedFunctionFunctionJobClassCombinationError,
2018
)
2119
from models_library.products import ProductName
2220
from models_library.projects_state import RunningState
2321
from models_library.users import UserID
24-
from servicelib.celery.models import TaskUUID
2522
from servicelib.fastapi.dependencies import get_app
26-
from servicelib.logging_errors import create_troubleshootting_log_kwargs
2723
from sqlalchemy.ext.asyncio import AsyncEngine
2824

2925
from ..._service_function_jobs import FunctionJobService
3026
from ..._service_functions import FunctionService
3127
from ..._service_jobs import JobService
32-
from ...exceptions.function_errors import FunctionJobProjectMissingError
3328
from ...models.domain.functions import PageRegisteredFunctionJobWithorWithoutStatus
3429
from ...models.pagination import PaginationParams
3530
from ...models.schemas.errors import ErrorGet
@@ -55,7 +50,6 @@
5550
FMSG_CHANGELOG_NEW_IN_VERSION,
5651
create_route_description,
5752
)
58-
from .tasks import _get_task_filter
5953

6054
_logger = getLogger(__name__)
6155

@@ -66,7 +60,6 @@
6660
JOB_LIST_FILTER_PAGE_RELEASE_VERSION = "0.11.0"
6761
JOB_LOG_RELEASE_VERSION = "0.11.0"
6862
WITH_STATUS_RELEASE_VERSION = "0.13.0"
69-
_JOB_CREATION_TASK_STATUS_PREFIX: Final[str] = "JOB_CREATION_TASK_STATUS_"
7063

7164
function_job_router = APIRouter()
7265

@@ -280,43 +273,10 @@ async def function_job_status(
280273
FunctionJobService, Depends(get_function_job_service)
281274
],
282275
) -> FunctionJobStatus:
283-
try:
284-
return await function_job_service.inspect_function_job(
285-
function=function, function_job=function_job
286-
)
287-
except FunctionJobProjectMissingError as exc:
288-
if (
289-
function.function_class == FunctionClass.PROJECT
290-
and function_job.function_class == FunctionClass.PROJECT
291-
) or (
292-
function.function_class == FunctionClass.SOLVER
293-
and function_job.function_class == FunctionClass.SOLVER
294-
):
295-
if task_id := function_job.job_creation_task_id:
296-
task_manager = get_task_manager(app)
297-
task_filter = _get_task_filter(user_id, product_name)
298-
task_status = await task_manager.get_task_status(
299-
task_uuid=TaskUUID(task_id), task_filter=task_filter
300-
)
301-
return FunctionJobStatus(
302-
status=f"{_JOB_CREATION_TASK_STATUS_PREFIX}{task_status.task_state}"
303-
)
304-
user_error_msg = f"The creation of job {function_job.uid} failed"
305-
support_id = create_error_code(Exception())
306-
_logger.exception(
307-
**create_troubleshootting_log_kwargs(
308-
user_error_msg,
309-
error=Exception(),
310-
error_code=support_id,
311-
tip="Initial call to run metamodeling function must have failed",
312-
)
313-
)
314-
raise
315-
316-
raise UnsupportedFunctionFunctionJobClassCombinationError(
317-
function_class=function.function_class,
318-
function_job_class=function_job.function_class,
319-
) from exc
276+
task_manager = get_task_manager(app)
277+
return await function_job_service.inspect_function_job(
278+
function=function, function_job=function_job, task_manager=task_manager
279+
)
320280

321281

322282
async def get_function_from_functionjobid(

0 commit comments

Comments
 (0)