Skip to content

Commit b8325d8

Browse files
committed
first attempt at definining run function task
1 parent 22c6c2e commit b8325d8

File tree

3 files changed

+112
-8
lines changed

3 files changed

+112
-8
lines changed

services/api-server/src/simcore_service_api_server/api/dependencies/webserver_http.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from common_library.json_serialization import json_dumps
55
from cryptography.fernet import Fernet
66
from fastapi import Depends, FastAPI, HTTPException, status
7-
from fastapi.requests import Request
87

98
from ..._constants import MSG_BACKEND_SERVICE_UNAVAILABLE
109
from ...core.settings import ApplicationSettings, WebServerSettings
@@ -29,19 +28,16 @@ def _get_settings(
2928
return settings
3029

3130

32-
def _get_encrypt(request: Request) -> Fernet | None:
33-
e: Fernet | None = getattr(request.app.state, "webserver_fernet", None)
34-
return e
35-
36-
3731
def get_session_cookie(
3832
identity: Annotated[str, Depends(get_active_user_email)],
3933
settings: Annotated[WebServerSettings, Depends(_get_settings)],
40-
fernet: Annotated[Fernet | None, Depends(_get_encrypt)],
34+
app: Annotated[FastAPI, Depends(get_app)],
4135
) -> dict:
4236
# Based on aiohttp_session and aiohttp_security
4337
# SEE services/web/server/tests/unit/with_dbs/test_login.py
4438

39+
fernet: Fernet | None = getattr(app.state, "webserver_fernet", None)
40+
4541
if fernet is None:
4642
raise HTTPException(
4743
status.HTTP_503_SERVICE_UNAVAILABLE, detail=MSG_BACKEND_SERVICE_UNAVAILABLE
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
from celery import Task
2+
from celery_library.utils import get_app_server
3+
from fastapi import FastAPI # type: ignore[import-untyped]
4+
from models_library.functions import FunctionInputs, RegisteredFunction
5+
from models_library.projects_nodes_io import NodeID
6+
from servicelib.celery.models import TaskID
7+
8+
from ...api.dependencies.authentication import Identity
9+
from ...api.dependencies.rabbitmq import get_rabbitmq_rpc_client
10+
from ...api.dependencies.services import (
11+
get_api_client,
12+
get_catalog_service,
13+
get_directorv2_service,
14+
get_function_job_service,
15+
get_job_service,
16+
get_solver_service,
17+
get_storage_service,
18+
get_wb_api_rpc_client,
19+
get_webserver_session,
20+
)
21+
from ...api.dependencies.webserver_http import get_session_cookie
22+
from ...models.api_resources import JobLinks
23+
from ...models.schemas.jobs import JobPricingSpecification
24+
from ...services_http.director_v2 import DirectorV2Api
25+
from ...services_http.storage import StorageApi
26+
27+
28+
async def _assemble_function_job_service(app: FastAPI, identity: Identity):
29+
# to avoid this show we could introduce a dependency injection
30+
# system which is not linked to FastAPI (i.e. can be resolved manually).
31+
# See also https://github.com/fastapi/fastapi/issues/1105#issuecomment-609919850
32+
settings = app.state.settings
33+
assert settings.API_SERVER_WEBSERVER # nosec
34+
session_cookie = get_session_cookie(
35+
identity=identity.email, settings=settings.API_SERVER_WEBSERVER, app=app
36+
)
37+
38+
rpc_client = get_rabbitmq_rpc_client(app=app)
39+
web_server_rest_client = get_webserver_session(
40+
app=app, session_cookies=session_cookie, identity=identity
41+
)
42+
web_api_rpc_client = await get_wb_api_rpc_client(app=app)
43+
director2_api = get_api_client(DirectorV2Api)
44+
assert isinstance(director2_api, DirectorV2Api)
45+
storage_api = get_api_client(StorageApi)
46+
assert isinstance(storage_api, StorageApi)
47+
catalog_service = get_catalog_service(
48+
rpc_client=rpc_client,
49+
user_id=identity.user_id,
50+
product_name=identity.product_name,
51+
)
52+
53+
storage_service = get_storage_service(
54+
rpc_client=rpc_client,
55+
user_id=identity.user_id,
56+
product_name=identity.product_name,
57+
)
58+
directorv2_service = get_directorv2_service(rpc_client=rpc_client)
59+
60+
solver_service = get_solver_service(
61+
catalog_service=catalog_service,
62+
user_id=identity.user_id,
63+
product_name=identity.product_name,
64+
)
65+
66+
job_service = get_job_service(
67+
web_rest_api=web_server_rest_client,
68+
director2_api=director2_api,
69+
storage_api=storage_api,
70+
web_rpc_api=web_api_rpc_client,
71+
storage_service=storage_service,
72+
directorv2_service=directorv2_service,
73+
user_id=identity.user_id,
74+
product_name=identity.product_name,
75+
solver_service=solver_service,
76+
)
77+
78+
return get_function_job_service(
79+
web_rpc_api=web_api_rpc_client,
80+
job_service=job_service,
81+
user_id=identity.user_id,
82+
product_name=identity.product_name,
83+
)
84+
85+
86+
async def run_function(
87+
task: Task,
88+
task_id: TaskID,
89+
*,
90+
identity: Identity, # user identity
91+
function: RegisteredFunction,
92+
function_inputs: FunctionInputs,
93+
pricing_spec: JobPricingSpecification | None,
94+
job_links: JobLinks,
95+
x_simcore_parent_project_uuid: NodeID | None,
96+
x_simcore_parent_node_id: NodeID | None,
97+
):
98+
app = get_app_server(task.app).app
99+
function_job_service = await _assemble_function_job_service(app, identity)
100+
101+
return await function_job_service.run_function(
102+
function=function,
103+
function_inputs=function_inputs,
104+
pricing_spec=pricing_spec,
105+
job_links=job_links,
106+
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,
107+
x_simcore_parent_node_id=x_simcore_parent_node_id,
108+
)

services/api-server/src/simcore_service_api_server/celery/worker_main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from ..core.application import create_app
1515
from ..core.settings import ApplicationSettings
16-
from ._worker_tasks import setup_worker_tasks
16+
from ._worker_tasks.tasks import setup_worker_tasks
1717

1818
_settings = ApplicationSettings.create_from_envs()
1919

0 commit comments

Comments
 (0)