Skip to content

Commit b8bda32

Browse files
authored
Store function outputs 🎨 ♻️ (#8142)
1 parent 3ef4975 commit b8bda32

File tree

15 files changed

+1116
-440
lines changed

15 files changed

+1116
-440
lines changed

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
RegisteredFunctionJobCollection,
1818
)
1919
from models_library.functions import (
20+
FunctionJobStatus,
21+
FunctionOutputs,
2022
FunctionUserAccessRights,
2123
FunctionUserApiAccessRights,
2224
)
@@ -300,6 +302,82 @@ async def get_function_job(
300302
return TypeAdapter(RegisteredFunctionJob).validate_python(result)
301303

302304

305+
@log_decorator(_logger, level=logging.DEBUG)
306+
async def get_function_job_status(
307+
rabbitmq_rpc_client: RabbitMQRPCClient,
308+
*,
309+
user_id: UserID,
310+
function_job_id: FunctionJobID,
311+
product_name: ProductName,
312+
) -> FunctionJobStatus:
313+
result = await rabbitmq_rpc_client.request(
314+
WEBSERVER_RPC_NAMESPACE,
315+
TypeAdapter(RPCMethodName).validate_python("get_function_job_status"),
316+
function_job_id=function_job_id,
317+
user_id=user_id,
318+
product_name=product_name,
319+
)
320+
return TypeAdapter(FunctionJobStatus).validate_python(result)
321+
322+
323+
@log_decorator(_logger, level=logging.DEBUG)
324+
async def get_function_job_outputs(
325+
rabbitmq_rpc_client: RabbitMQRPCClient,
326+
*,
327+
user_id: UserID,
328+
function_job_id: FunctionJobID,
329+
product_name: ProductName,
330+
) -> FunctionOutputs:
331+
result = await rabbitmq_rpc_client.request(
332+
WEBSERVER_RPC_NAMESPACE,
333+
TypeAdapter(RPCMethodName).validate_python("get_function_job_outputs"),
334+
function_job_id=function_job_id,
335+
user_id=user_id,
336+
product_name=product_name,
337+
)
338+
return TypeAdapter(FunctionOutputs).validate_python(result)
339+
340+
341+
@log_decorator(_logger, level=logging.DEBUG)
342+
async def update_function_job_status(
343+
rabbitmq_rpc_client: RabbitMQRPCClient,
344+
*,
345+
user_id: UserID,
346+
product_name: ProductName,
347+
function_job_id: FunctionJobID,
348+
job_status: FunctionJobStatus,
349+
) -> FunctionJobStatus:
350+
result = await rabbitmq_rpc_client.request(
351+
WEBSERVER_RPC_NAMESPACE,
352+
TypeAdapter(RPCMethodName).validate_python("update_function_job_status"),
353+
function_job_id=function_job_id,
354+
job_status=job_status,
355+
user_id=user_id,
356+
product_name=product_name,
357+
)
358+
return TypeAdapter(FunctionJobStatus).validate_python(result)
359+
360+
361+
@log_decorator(_logger, level=logging.DEBUG)
362+
async def update_function_job_outputs(
363+
rabbitmq_rpc_client: RabbitMQRPCClient,
364+
*,
365+
user_id: UserID,
366+
product_name: ProductName,
367+
function_job_id: FunctionJobID,
368+
outputs: FunctionOutputs,
369+
) -> FunctionOutputs:
370+
result = await rabbitmq_rpc_client.request(
371+
WEBSERVER_RPC_NAMESPACE,
372+
TypeAdapter(RPCMethodName).validate_python("update_function_job_outputs"),
373+
function_job_id=function_job_id,
374+
outputs=outputs,
375+
user_id=user_id,
376+
product_name=product_name,
377+
)
378+
return TypeAdapter(FunctionOutputs).validate_python(result)
379+
380+
303381
@log_decorator(_logger, level=logging.DEBUG)
304382
async def delete_function_job(
305383
rabbitmq_rpc_client: RabbitMQRPCClient,
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
from typing import Annotated
2+
3+
from fastapi import Depends
4+
from models_library.functions import (
5+
FunctionJob,
6+
FunctionJobID,
7+
FunctionJobStatus,
8+
FunctionOutputs,
9+
RegisteredFunction,
10+
)
11+
from models_library.products import ProductName
12+
from models_library.users import UserID
13+
from simcore_service_api_server.api.dependencies.authentication import (
14+
get_current_user_id,
15+
get_product_name,
16+
)
17+
from simcore_service_api_server.api.dependencies.webserver_rpc import (
18+
get_wb_api_rpc_client,
19+
)
20+
from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient
21+
22+
23+
async def get_stored_job_outputs(
24+
function_job_id: FunctionJobID,
25+
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
26+
user_id: Annotated[UserID, Depends(get_current_user_id)],
27+
product_name: Annotated[ProductName, Depends(get_product_name)],
28+
) -> FunctionOutputs:
29+
30+
return await wb_api_rpc.get_function_job_outputs(
31+
function_job_id=function_job_id, user_id=user_id, product_name=product_name
32+
)
33+
34+
35+
async def get_function_job_dependency(
36+
function_job_id: FunctionJobID,
37+
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
38+
user_id: Annotated[UserID, Depends(get_current_user_id)],
39+
product_name: Annotated[ProductName, Depends(get_product_name)],
40+
) -> FunctionJob:
41+
return await wb_api_rpc.get_function_job(
42+
function_job_id=function_job_id, user_id=user_id, product_name=product_name
43+
)
44+
45+
46+
async def get_function_from_functionjob(
47+
function_job: Annotated[FunctionJob, Depends(get_function_job_dependency)],
48+
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
49+
user_id: Annotated[UserID, Depends(get_current_user_id)],
50+
product_name: Annotated[ProductName, Depends(get_product_name)],
51+
) -> RegisteredFunction:
52+
return await wb_api_rpc.get_function(
53+
function_id=function_job.function_uid,
54+
user_id=user_id,
55+
product_name=product_name,
56+
)
57+
58+
59+
async def get_function_from_functionjobid(
60+
function_job_id: FunctionJobID,
61+
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
62+
user_id: Annotated[UserID, Depends(get_current_user_id)],
63+
product_name: Annotated[ProductName, Depends(get_product_name)],
64+
) -> RegisteredFunction:
65+
function_job = await get_function_job_dependency(
66+
function_job_id=function_job_id,
67+
wb_api_rpc=wb_api_rpc,
68+
user_id=user_id,
69+
product_name=product_name,
70+
)
71+
return await get_function_from_functionjob(
72+
function_job=function_job,
73+
wb_api_rpc=wb_api_rpc,
74+
user_id=user_id,
75+
product_name=product_name,
76+
)
77+
78+
79+
async def get_stored_job_status(
80+
function_job_id: FunctionJobID,
81+
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
82+
user_id: Annotated[UserID, Depends(get_current_user_id)],
83+
product_name: Annotated[ProductName, Depends(get_product_name)],
84+
) -> FunctionJobStatus:
85+
return await wb_api_rpc.get_function_job_status(
86+
function_job_id=function_job_id, user_id=user_id, product_name=product_name
87+
)

services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,13 @@
1212
RegisteredFunctionJobCollection,
1313
)
1414
from models_library.products import ProductName
15-
from models_library.users import UserID # Import UserID
15+
from models_library.users import UserID
16+
from simcore_service_api_server.api.dependencies.functions import (
17+
get_stored_job_status, # Import UserID
18+
)
19+
from simcore_service_api_server.api.dependencies.functions import (
20+
get_function_from_functionjobid,
21+
)
1622

1723
from ...models.pagination import Page, PaginationParams
1824
from ...models.schemas.errors import ErrorGet
@@ -221,13 +227,30 @@ async def function_job_collection_status(
221227
job_statuses = await asyncio.gather(
222228
*[
223229
function_job_status(
224-
job_id,
230+
function_job=await get_function_job(
231+
function_job_id=function_job_id,
232+
wb_api_rpc=wb_api_rpc,
233+
user_id=user_id,
234+
product_name=product_name,
235+
),
236+
function=await get_function_from_functionjobid(
237+
function_job_id=function_job_id,
238+
wb_api_rpc=wb_api_rpc,
239+
user_id=user_id,
240+
product_name=product_name,
241+
),
242+
stored_job_status=await get_stored_job_status(
243+
function_job_id=function_job_id,
244+
user_id=user_id,
245+
product_name=product_name,
246+
wb_api_rpc=wb_api_rpc,
247+
),
225248
wb_api_rpc=wb_api_rpc,
226249
director2_api=director2_api,
227250
user_id=user_id,
228251
product_name=product_name,
229252
)
230-
for job_id in function_job_collection.job_ids
253+
for function_job_id in function_job_collection.job_ids
231254
]
232255
)
233256
return FunctionJobCollectionStatus(

services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,28 @@
44
from fastapi_pagination.api import create_page
55
from models_library.api_schemas_long_running_tasks.tasks import TaskGet
66
from models_library.api_schemas_webserver.functions import (
7-
Function,
87
FunctionClass,
98
FunctionJob,
109
FunctionJobID,
1110
FunctionJobStatus,
1211
FunctionOutputs,
1312
RegisteredFunctionJob,
1413
)
14+
from models_library.functions import RegisteredFunction
1515
from models_library.functions_errors import (
1616
UnsupportedFunctionClassError,
1717
UnsupportedFunctionFunctionJobClassCombinationError,
1818
)
1919
from models_library.products import ProductName
20+
from models_library.projects_state import RunningState
2021
from models_library.users import UserID
2122
from servicelib.fastapi.dependencies import get_app
23+
from simcore_service_api_server.api.dependencies.functions import (
24+
get_function_from_functionjob,
25+
get_function_job_dependency,
26+
get_stored_job_outputs,
27+
get_stored_job_status,
28+
)
2229
from sqlalchemy.ext.asyncio import AsyncEngine
2330

2431
from ..._service_jobs import JobService
@@ -171,19 +178,19 @@ async def delete_function_job(
171178
),
172179
)
173180
async def function_job_status(
174-
function_job_id: FunctionJobID,
181+
function_job: Annotated[
182+
RegisteredFunctionJob, Depends(get_function_job_dependency)
183+
],
184+
function: Annotated[RegisteredFunction, Depends(get_function_from_functionjob)],
185+
stored_job_status: Annotated[FunctionJobStatus, Depends(get_stored_job_status)],
175186
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
176187
user_id: Annotated[UserID, Depends(get_current_user_id)],
177188
product_name: Annotated[ProductName, Depends(get_product_name)],
178189
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
179190
) -> FunctionJobStatus:
180191

181-
function, function_job = await get_function_from_functionjobid(
182-
wb_api_rpc=wb_api_rpc,
183-
function_job_id=function_job_id,
184-
user_id=user_id,
185-
product_name=product_name,
186-
)
192+
if stored_job_status.status in (RunningState.SUCCESS, RunningState.FAILED):
193+
return stored_job_status
187194

188195
if (
189196
function.function_class == FunctionClass.PROJECT
@@ -195,9 +202,7 @@ async def function_job_status(
195202
user_id=user_id,
196203
director2_api=director2_api,
197204
)
198-
return FunctionJobStatus(status=job_status.state)
199-
200-
if (function.function_class == FunctionClass.SOLVER) and (
205+
elif (function.function_class == FunctionClass.SOLVER) and (
201206
function_job.function_class == FunctionClass.SOLVER
202207
):
203208
job_status = await solvers_jobs.inspect_job(
@@ -207,11 +212,19 @@ async def function_job_status(
207212
user_id=user_id,
208213
director2_api=director2_api,
209214
)
210-
return FunctionJobStatus(status=job_status.state)
215+
else:
216+
raise UnsupportedFunctionFunctionJobClassCombinationError(
217+
function_class=function.function_class,
218+
function_job_class=function_job.function_class,
219+
)
220+
221+
new_job_status = FunctionJobStatus(status=job_status.state)
211222

212-
raise UnsupportedFunctionFunctionJobClassCombinationError(
213-
function_class=function.function_class,
214-
function_job_class=function_job.function_class,
223+
return await wb_api_rpc.update_function_job_status(
224+
function_job_id=function_job.uid,
225+
user_id=user_id,
226+
product_name=product_name,
227+
job_status=new_job_status,
215228
)
216229

217230

@@ -220,7 +233,7 @@ async def get_function_from_functionjobid(
220233
function_job_id: FunctionJobID,
221234
user_id: Annotated[UserID, Depends(get_current_user_id)],
222235
product_name: Annotated[ProductName, Depends(get_product_name)],
223-
) -> tuple[Function, FunctionJob]:
236+
) -> tuple[RegisteredFunction, RegisteredFunctionJob]:
224237
function_job = await get_function_job(
225238
wb_api_rpc=wb_api_rpc,
226239
function_job_id=function_job_id,
@@ -251,26 +264,26 @@ async def get_function_from_functionjobid(
251264
),
252265
)
253266
async def function_job_outputs(
254-
function_job_id: FunctionJobID,
267+
function_job: Annotated[
268+
RegisteredFunctionJob, Depends(get_function_job_dependency)
269+
],
270+
function: Annotated[RegisteredFunction, Depends(get_function_from_functionjob)],
255271
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
256272
user_id: Annotated[UserID, Depends(get_current_user_id)],
257273
product_name: Annotated[ProductName, Depends(get_product_name)],
258274
storage_client: Annotated[StorageApi, Depends(get_api_client(StorageApi))],
259275
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
260276
async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)],
277+
stored_job_outputs: Annotated[FunctionOutputs, Depends(get_stored_job_outputs)],
261278
) -> FunctionOutputs:
262-
function, function_job = await get_function_from_functionjobid(
263-
wb_api_rpc=wb_api_rpc,
264-
function_job_id=function_job_id,
265-
user_id=user_id,
266-
product_name=product_name,
267-
)
279+
if stored_job_outputs is not None:
280+
return stored_job_outputs
268281

269282
if (
270283
function.function_class == FunctionClass.PROJECT
271284
and function_job.function_class == FunctionClass.PROJECT
272285
):
273-
return dict(
286+
new_outputs = dict(
274287
(
275288
await studies_jobs.get_study_job_outputs(
276289
study_id=function.project_id,
@@ -281,12 +294,11 @@ async def function_job_outputs(
281294
)
282295
).results
283296
)
284-
285-
if (
297+
elif (
286298
function.function_class == FunctionClass.SOLVER
287299
and function_job.function_class == FunctionClass.SOLVER
288300
):
289-
return dict(
301+
new_outputs = dict(
290302
(
291303
await solvers_jobs_read.get_job_outputs(
292304
solver_key=function.solver_key,
@@ -299,7 +311,15 @@ async def function_job_outputs(
299311
)
300312
).results
301313
)
302-
raise UnsupportedFunctionClassError(function_class=function.function_class)
314+
else:
315+
raise UnsupportedFunctionClassError(function_class=function.function_class)
316+
317+
return await wb_api_rpc.update_function_job_outputs(
318+
function_job_id=function_job.uid,
319+
user_id=user_id,
320+
product_name=product_name,
321+
outputs=new_outputs,
322+
)
303323

304324

305325
@function_job_router.post(

0 commit comments

Comments
 (0)