Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
RegisteredFunctionJobCollection,
)
from models_library.functions import (
FunctionJobStatus,
FunctionOutputs,
FunctionUserAccessRights,
FunctionUserApiAccessRights,
)
Expand Down Expand Up @@ -300,6 +302,82 @@ async def get_function_job(
return TypeAdapter(RegisteredFunctionJob).validate_python(result)


@log_decorator(_logger, level=logging.DEBUG)
async def get_function_job_status(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
user_id: UserID,
function_job_id: FunctionJobID,
product_name: ProductName,
) -> FunctionJobStatus:
result = await rabbitmq_rpc_client.request(
WEBSERVER_RPC_NAMESPACE,
TypeAdapter(RPCMethodName).validate_python("get_function_job_status"),
function_job_id=function_job_id,
user_id=user_id,
product_name=product_name,
)
return TypeAdapter(FunctionJobStatus).validate_python(result)


@log_decorator(_logger, level=logging.DEBUG)
async def get_function_job_outputs(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
user_id: UserID,
function_job_id: FunctionJobID,
product_name: ProductName,
) -> FunctionOutputs:
result = await rabbitmq_rpc_client.request(
WEBSERVER_RPC_NAMESPACE,
TypeAdapter(RPCMethodName).validate_python("get_function_job_outputs"),
function_job_id=function_job_id,
user_id=user_id,
product_name=product_name,
)
return TypeAdapter(FunctionOutputs).validate_python(result)


@log_decorator(_logger, level=logging.DEBUG)
async def update_function_job_status(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
user_id: UserID,
product_name: ProductName,
function_job_id: FunctionJobID,
job_status: FunctionJobStatus,
) -> FunctionJobStatus:
result = await rabbitmq_rpc_client.request(
WEBSERVER_RPC_NAMESPACE,
TypeAdapter(RPCMethodName).validate_python("update_function_job_status"),
function_job_id=function_job_id,
job_status=job_status,
user_id=user_id,
product_name=product_name,
)
return TypeAdapter(FunctionJobStatus).validate_python(result)


@log_decorator(_logger, level=logging.DEBUG)
async def update_function_job_outputs(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
user_id: UserID,
product_name: ProductName,
function_job_id: FunctionJobID,
outputs: FunctionOutputs,
) -> FunctionOutputs:
result = await rabbitmq_rpc_client.request(
WEBSERVER_RPC_NAMESPACE,
TypeAdapter(RPCMethodName).validate_python("update_function_job_outputs"),
function_job_id=function_job_id,
outputs=outputs,
user_id=user_id,
product_name=product_name,
)
return TypeAdapter(FunctionOutputs).validate_python(result)


@log_decorator(_logger, level=logging.DEBUG)
async def delete_function_job(
rabbitmq_rpc_client: RabbitMQRPCClient,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Annotated

from fastapi import Depends
from models_library.functions import (
FunctionJob,
FunctionJobID,
FunctionJobStatus,
FunctionOutputs,
RegisteredFunction,
)
from models_library.products import ProductName
from models_library.users import UserID
from simcore_service_api_server.api.dependencies.authentication import (
get_current_user_id,
get_product_name,
)
from simcore_service_api_server.api.dependencies.webserver_rpc import (
get_wb_api_rpc_client,
)
from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient


async def get_stored_job_outputs(
function_job_id: FunctionJobID,
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
user_id: Annotated[UserID, Depends(get_current_user_id)],
product_name: Annotated[ProductName, Depends(get_product_name)],
) -> FunctionOutputs:

return await wb_api_rpc.get_function_job_outputs(
function_job_id=function_job_id, user_id=user_id, product_name=product_name
)


async def get_function_job_dependency(
function_job_id: FunctionJobID,
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
user_id: Annotated[UserID, Depends(get_current_user_id)],
product_name: Annotated[ProductName, Depends(get_product_name)],
) -> FunctionJob:
return await wb_api_rpc.get_function_job(
function_job_id=function_job_id, user_id=user_id, product_name=product_name
)


async def get_function_from_functionjob(
function_job: Annotated[FunctionJob, Depends(get_function_job_dependency)],
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
user_id: Annotated[UserID, Depends(get_current_user_id)],
product_name: Annotated[ProductName, Depends(get_product_name)],
) -> RegisteredFunction:
return await wb_api_rpc.get_function(
function_id=function_job.function_uid,
user_id=user_id,
product_name=product_name,
)


async def get_stored_job_status(
function_job_id: FunctionJobID,
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
user_id: Annotated[UserID, Depends(get_current_user_id)],
product_name: Annotated[ProductName, Depends(get_product_name)],
) -> FunctionJobStatus:
return await wb_api_rpc.get_function_job_status(
function_job_id=function_job_id, user_id=user_id, product_name=product_name
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,28 @@
from fastapi_pagination.api import create_page
from models_library.api_schemas_long_running_tasks.tasks import TaskGet
from models_library.api_schemas_webserver.functions import (
Function,
FunctionClass,
FunctionJob,
FunctionJobID,
FunctionJobStatus,
FunctionOutputs,
RegisteredFunctionJob,
)
from models_library.functions import RegisteredFunction
from models_library.functions_errors import (
UnsupportedFunctionClassError,
UnsupportedFunctionFunctionJobClassCombinationError,
)
from models_library.products import ProductName
from models_library.projects_state import RunningState
from models_library.users import UserID
from servicelib.fastapi.dependencies import get_app
from simcore_service_api_server.api.dependencies.functions import (
get_function_from_functionjob,
get_function_job_dependency,
get_stored_job_outputs,
get_stored_job_status,
)
from sqlalchemy.ext.asyncio import AsyncEngine

from ..._service_jobs import JobService
Expand Down Expand Up @@ -171,19 +178,19 @@ async def delete_function_job(
),
)
async def function_job_status(
function_job_id: FunctionJobID,
function_job: Annotated[
RegisteredFunctionJob, Depends(get_function_job_dependency)
],
function: Annotated[RegisteredFunction, Depends(get_function_from_functionjob)],
stored_job_status: Annotated[FunctionJobStatus, Depends(get_stored_job_status)],
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
user_id: Annotated[UserID, Depends(get_current_user_id)],
product_name: Annotated[ProductName, Depends(get_product_name)],
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
) -> FunctionJobStatus:

function, function_job = await get_function_from_functionjobid(
wb_api_rpc=wb_api_rpc,
function_job_id=function_job_id,
user_id=user_id,
product_name=product_name,
)
if stored_job_status.status in (RunningState.SUCCESS, RunningState.FAILED):
return stored_job_status

if (
function.function_class == FunctionClass.PROJECT
Expand All @@ -195,9 +202,7 @@ async def function_job_status(
user_id=user_id,
director2_api=director2_api,
)
return FunctionJobStatus(status=job_status.state)

if (function.function_class == FunctionClass.SOLVER) and (
elif (function.function_class == FunctionClass.SOLVER) and (
function_job.function_class == FunctionClass.SOLVER
):
job_status = await solvers_jobs.inspect_job(
Expand All @@ -207,11 +212,19 @@ async def function_job_status(
user_id=user_id,
director2_api=director2_api,
)
return FunctionJobStatus(status=job_status.state)
else:
raise UnsupportedFunctionFunctionJobClassCombinationError(
function_class=function.function_class,
function_job_class=function_job.function_class,
)

new_job_status = FunctionJobStatus(status=job_status.state)

raise UnsupportedFunctionFunctionJobClassCombinationError(
function_class=function.function_class,
function_job_class=function_job.function_class,
return await wb_api_rpc.update_function_job_status(
function_job_id=function_job.uid,
user_id=user_id,
product_name=product_name,
job_status=new_job_status,
)


Expand All @@ -220,7 +233,7 @@ async def get_function_from_functionjobid(
function_job_id: FunctionJobID,
user_id: Annotated[UserID, Depends(get_current_user_id)],
product_name: Annotated[ProductName, Depends(get_product_name)],
) -> tuple[Function, FunctionJob]:
) -> tuple[RegisteredFunction, RegisteredFunctionJob]:
function_job = await get_function_job(
wb_api_rpc=wb_api_rpc,
function_job_id=function_job_id,
Expand Down Expand Up @@ -250,27 +263,27 @@ async def get_function_from_functionjobid(
changelog=CHANGE_LOGS["function_job_outputs"],
),
)
async def function_job_outputs(
function_job_id: FunctionJobID,
async def get_function_job_outputs(
function_job: Annotated[
RegisteredFunctionJob, Depends(get_function_job_dependency)
],
function: Annotated[RegisteredFunction, Depends(get_function_from_functionjob)],
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
user_id: Annotated[UserID, Depends(get_current_user_id)],
product_name: Annotated[ProductName, Depends(get_product_name)],
storage_client: Annotated[StorageApi, Depends(get_api_client(StorageApi))],
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)],
stored_job_outputs: Annotated[FunctionOutputs, Depends(get_stored_job_outputs)],
) -> FunctionOutputs:
function, function_job = await get_function_from_functionjobid(
wb_api_rpc=wb_api_rpc,
function_job_id=function_job_id,
user_id=user_id,
product_name=product_name,
)
if stored_job_outputs is not None:
return stored_job_outputs

if (
function.function_class == FunctionClass.PROJECT
and function_job.function_class == FunctionClass.PROJECT
):
return dict(
new_outputs = dict(
(
await studies_jobs.get_study_job_outputs(
study_id=function.project_id,
Expand All @@ -281,12 +294,11 @@ async def function_job_outputs(
)
).results
)

if (
elif (
function.function_class == FunctionClass.SOLVER
and function_job.function_class == FunctionClass.SOLVER
):
return dict(
new_outputs = dict(
(
await solvers_jobs_read.get_job_outputs(
solver_key=function.solver_key,
Expand All @@ -299,7 +311,15 @@ async def function_job_outputs(
)
).results
)
raise UnsupportedFunctionClassError(function_class=function.function_class)
else:
raise UnsupportedFunctionClassError(function_class=function.function_class)

return await wb_api_rpc.update_function_job_outputs(
function_job_id=function_job.uid,
user_id=user_id,
product_name=product_name,
outputs=new_outputs,
)


@function_job_router.post(
Expand Down
Loading
Loading