Skip to content

Commit 1645e74

Browse files
committed
added endpoint for exporting function jobs logs
1 parent 4a098e1 commit 1645e74

File tree

4 files changed

+110
-8
lines changed

4 files changed

+110
-8
lines changed

services/api-server/src/simcore_service_api_server/_service_jobs.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import logging
22
from collections.abc import Callable
33
from dataclasses import dataclass
4+
from pathlib import Path
45

56
from common_library.exclude import as_dict_exclude_none
7+
from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobGet
68
from models_library.api_schemas_webserver.projects import ProjectCreateNew, ProjectGet
79
from models_library.products import ProductName
810
from models_library.projects import ProjectID
@@ -15,9 +17,9 @@
1517
from models_library.users import UserID
1618
from pydantic import HttpUrl
1719
from servicelib.logging_utils import log_context
18-
from simcore_service_api_server.models.basic_types import NameValueTuple
1920

20-
from .models.schemas.jobs import Job, JobInputs
21+
from .models.basic_types import NameValueTuple
22+
from .models.schemas.jobs import Job, JobID, JobInputs
2123
from .models.schemas.programs import Program
2224
from .models.schemas.solvers import Solver
2325
from .services_http.solver_job_models_converters import (
@@ -26,6 +28,8 @@
2628
create_new_project_for_job,
2729
)
2830
from .services_http.webserver import AuthSession
31+
from .services_rpc.director_v2 import DirectorV2Service
32+
from .services_rpc.storage import StorageService
2933
from .services_rpc.wb_api_server import WbApiRpcClient
3034

3135
_logger = logging.getLogger(__name__)
@@ -35,6 +39,8 @@
3539
class JobService:
3640
_web_rest_client: AuthSession
3741
_web_rpc_client: WbApiRpcClient
42+
_storage_rpc_client: StorageService
43+
_directorv2_rpc_client: DirectorV2Service
3844
user_id: UserID
3945
product_name: ProductName
4046

@@ -147,3 +153,17 @@ async def create_job(
147153
job_id=job.id,
148154
)
149155
return job, new_project
156+
157+
async def start_log_export(
158+
self,
159+
job_id: JobID,
160+
) -> AsyncJobGet:
161+
file_ids = await self._directorv2_rpc_client.get_computation_task_log_file_ids(
162+
project_id=job_id
163+
)
164+
async_job_get = await self._storage_rpc_client.start_data_export(
165+
paths_to_export=[
166+
Path(elm.file_id) for elm in file_ids if elm.file_id is not None
167+
],
168+
)
169+
return async_job_get

services/api-server/src/simcore_service_api_server/api/dependencies/services.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
from ..._service_studies import StudyService
1515
from ...services_http.webserver import AuthSession
1616
from ...services_rpc.catalog import CatalogService
17+
from ...services_rpc.director_v2 import DirectorV2Service
18+
from ...services_rpc.storage import StorageService
1719
from ...services_rpc.wb_api_server import WbApiRpcClient
1820
from ...utils.client_base import BaseServiceClientApi
1921
from .authentication import get_current_user_id, get_product_name
@@ -61,9 +63,29 @@ def get_catalog_service(
6163
)
6264

6365

66+
def get_storage_service(
67+
rpc_client: Annotated[RabbitMQRPCClient, Depends(get_rabbitmq_rpc_client)],
68+
user_id: Annotated[UserID, Depends(get_current_user_id)],
69+
product_name: Annotated[ProductName, Depends(get_product_name)],
70+
) -> StorageService:
71+
return StorageService(
72+
_rpc_client=rpc_client,
73+
_user_id=user_id,
74+
_product_name=product_name,
75+
)
76+
77+
78+
def get_directorv2_service(
79+
rpc_client: Annotated[RabbitMQRPCClient, Depends(get_rabbitmq_rpc_client)],
80+
) -> DirectorV2Service:
81+
return DirectorV2Service(_rpc_client=rpc_client)
82+
83+
6484
def get_job_service(
6585
web_rest_api: Annotated[AuthSession, Depends(get_webserver_session)],
6686
web_rpc_api: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
87+
storage_service: Annotated[StorageService, Depends(get_storage_service)],
88+
directorv2_service: Annotated[DirectorV2Service, Depends(get_directorv2_service)],
6789
user_id: Annotated[UserID, Depends(get_current_user_id)],
6890
product_name: Annotated[ProductName, Depends(get_product_name)],
6991
) -> JobService:
@@ -74,6 +96,8 @@ def get_job_service(
7496
return JobService(
7597
_web_rest_client=web_rest_api,
7698
_web_rpc_client=web_rpc_api,
99+
_storage_rpc_client=storage_service,
100+
_directorv2_rpc_client=directorv2_service,
77101
user_id=user_id,
78102
product_name=product_name,
79103
)

services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from typing import Annotated, Final
22

3-
from fastapi import APIRouter, Depends, status
3+
from fastapi import APIRouter, Depends, FastAPI, status
44
from fastapi_pagination.api import create_page
5+
from models_library.api_schemas_long_running_tasks.tasks import TaskGet
56
from models_library.api_schemas_webserver.functions import (
67
Function,
78
FunctionClass,
@@ -17,8 +18,10 @@
1718
)
1819
from models_library.products import ProductName
1920
from models_library.users import UserID
21+
from servicelib.fastapi.dependencies import get_app
2022
from sqlalchemy.ext.asyncio import AsyncEngine
2123

24+
from ..._service_jobs import JobService
2225
from ...models.pagination import Page, PaginationParams
2326
from ...models.schemas.errors import ErrorGet
2427
from ...services_http.director_v2 import DirectorV2Api
@@ -27,7 +30,7 @@
2730
from ...services_rpc.wb_api_server import WbApiRpcClient
2831
from ..dependencies.authentication import get_current_user_id, get_product_name
2932
from ..dependencies.database import get_db_asyncpg_engine
30-
from ..dependencies.services import get_api_client
33+
from ..dependencies.services import get_api_client, get_job_service
3134
from ..dependencies.webserver_http import get_webserver_session
3235
from ..dependencies.webserver_rpc import get_wb_api_rpc_client
3336
from . import solvers_jobs, solvers_jobs_read, studies_jobs
@@ -297,3 +300,58 @@ async def function_job_outputs(
297300
).results
298301
)
299302
raise UnsupportedFunctionClassError(function_class=function.function_class)
303+
304+
305+
@function_job_router.post(
306+
"/{function_job_id:uuid}/logs",
307+
response_model=TaskGet,
308+
responses={**_COMMON_FUNCTION_JOB_ERROR_RESPONSES},
309+
)
310+
async def start_function_job_logs(
311+
function_job_id: FunctionJobID,
312+
app: Annotated[FastAPI, Depends(get_app)],
313+
job_service: Annotated[JobService, Depends(get_job_service)],
314+
user_id: Annotated[UserID, Depends(get_current_user_id)],
315+
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
316+
product_name: Annotated[ProductName, Depends(get_product_name)],
317+
):
318+
function, function_job = await get_function_from_functionjobid(
319+
wb_api_rpc=wb_api_rpc,
320+
function_job_id=function_job_id,
321+
user_id=user_id,
322+
product_name=product_name,
323+
)
324+
app_router = app.router
325+
326+
if (
327+
function.function_class == FunctionClass.PROJECT
328+
and function_job.function_class == FunctionClass.PROJECT
329+
):
330+
async_job_get = await job_service.start_log_export(
331+
job_id=function_job.project_job_id,
332+
)
333+
_task_id = f"{async_job_get.job_id}"
334+
return TaskGet(
335+
task_id=_task_id,
336+
task_name=async_job_get.job_name,
337+
status_href=app_router.url_path_for("get_task_status", task_id=_task_id),
338+
abort_href=app_router.url_path_for("cancel_task", task_id=_task_id),
339+
result_href=app_router.url_path_for("get_task_result", task_id=_task_id),
340+
)
341+
342+
if (
343+
function.function_class == FunctionClass.SOLVER
344+
and function_job.function_class == FunctionClass.SOLVER
345+
):
346+
async_job_get = await job_service.start_log_export(
347+
job_id=function_job.solver_job_id,
348+
)
349+
_task_id = f"{async_job_get.job_id}"
350+
return TaskGet(
351+
task_id=_task_id,
352+
task_name=async_job_get.job_name,
353+
status_href=app_router.url_path_for("get_task_status", task_id=_task_id),
354+
abort_href=app_router.url_path_for("cancel_task", task_id=_task_id),
355+
result_href=app_router.url_path_for("get_task_result", task_id=_task_id),
356+
)
357+
raise UnsupportedFunctionClassError(function_class=function.function_class)

services/api-server/src/simcore_service_api_server/services_rpc/storage.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@
1616
@dataclass(frozen=True, kw_only=True)
1717
class StorageService:
1818
_rpc_client: RabbitMQRPCClient
19+
_user_id: UserID
20+
_product_name: ProductName
1921

2022
@_exception_mapper(rpc_exception_map={})
2123
async def start_data_export(
2224
self,
23-
user_id: UserID,
24-
product_name: ProductName,
2525
paths_to_export: list[PathToExport],
2626
) -> AsyncJobGet:
2727
async_job_get, _ = await start_export_data(
2828
self._rpc_client,
29-
user_id=user_id,
30-
product_name=product_name,
29+
user_id=self._user_id,
30+
product_name=self._product_name,
3131
paths_to_export=paths_to_export,
3232
)
3333
return async_job_get

0 commit comments

Comments
 (0)