Skip to content

Commit 91b7593

Browse files
refactor: storage rpc routes
1 parent b109758 commit 91b7593

File tree

7 files changed

+44
-38
lines changed

7 files changed

+44
-38
lines changed

packages/celery-library/src/celery_library/rpc/__init__.py

Whitespace-only changes.

services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py renamed to packages/celery-library/src/celery_library/rpc/_async_jobs.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
TransferrableCeleryError,
88
decode_celery_transferrable_error,
99
)
10-
from fastapi import FastAPI
1110
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
1211
AsyncJobGet,
1312
AsyncJobId,
@@ -22,21 +21,22 @@
2221
JobSchedulerError,
2322
)
2423
from servicelib.celery.models import TaskState
24+
from servicelib.celery.task_manager import TaskManager
2525
from servicelib.logging_utils import log_catch
2626
from servicelib.rabbitmq import RPCRouter
2727

28-
from ...modules.celery import get_task_manager_from_app
29-
3028
_logger = logging.getLogger(__name__)
3129
router = RPCRouter()
3230

3331

3432
@router.expose(reraise_if_error_type=(JobSchedulerError,))
35-
async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData):
36-
assert app # nosec
33+
async def cancel(
34+
task_manager: TaskManager, job_id: AsyncJobId, job_id_data: AsyncJobNameData
35+
):
36+
assert task_manager # nosec
3737
assert job_id_data # nosec
3838
try:
39-
await get_task_manager_from_app(app).cancel_task(
39+
await task_manager.cancel_task(
4040
task_context=job_id_data.model_dump(),
4141
task_uuid=job_id,
4242
)
@@ -46,13 +46,13 @@ async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
4646

4747
@router.expose(reraise_if_error_type=(JobSchedulerError,))
4848
async def status(
49-
app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
49+
task_manager: TaskManager, job_id: AsyncJobId, job_id_data: AsyncJobNameData
5050
) -> AsyncJobStatus:
51-
assert app # nosec
51+
assert task_manager # nosec
5252
assert job_id_data # nosec
5353

5454
try:
55-
task_status = await get_task_manager_from_app(app).get_task_status(
55+
task_status = await task_manager.get_task_status(
5656
task_context=job_id_data.model_dump(),
5757
task_uuid=job_id,
5858
)
@@ -75,20 +75,20 @@ async def status(
7575
)
7676
)
7777
async def result(
78-
app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
78+
task_manager: TaskManager, job_id: AsyncJobId, job_id_data: AsyncJobNameData
7979
) -> AsyncJobResult:
80-
assert app # nosec
80+
assert task_manager # nosec
8181
assert job_id # nosec
8282
assert job_id_data # nosec
8383

8484
try:
85-
_status = await get_task_manager_from_app(app).get_task_status(
85+
_status = await task_manager.get_task_status(
8686
task_context=job_id_data.model_dump(),
8787
task_uuid=job_id,
8888
)
8989
if not _status.is_done:
9090
raise JobNotDoneError(job_id=job_id)
91-
_result = await get_task_manager_from_app(app).get_task_result(
91+
_result = await task_manager.get_task_result(
9292
task_context=job_id_data.model_dump(),
9393
task_uuid=job_id,
9494
)
@@ -122,12 +122,12 @@ async def result(
122122

123123
@router.expose(reraise_if_error_type=(JobSchedulerError,))
124124
async def list_jobs(
125-
app: FastAPI, filter_: str, job_id_data: AsyncJobNameData
125+
task_manager: TaskManager, filter_: str, job_id_data: AsyncJobNameData
126126
) -> list[AsyncJobGet]:
127127
_ = filter_
128-
assert app # nosec
128+
assert task_manager # nosec
129129
try:
130-
tasks = await get_task_manager_from_app(app).list_tasks(
130+
tasks = await task_manager.list_tasks(
131131
task_context=job_id_data.model_dump(),
132132
)
133133
except CeleryError as exc:

services/storage/src/simcore_service_storage/api/rpc/_paths.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
import logging
22
from pathlib import Path
33

4-
from fastapi import FastAPI
54
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
65
AsyncJobGet,
76
AsyncJobNameData,
87
)
98
from models_library.projects_nodes_io import LocationID
109
from servicelib.celery.models import TaskMetadata
10+
from servicelib.celery.task_manager import TaskManager
1111
from servicelib.rabbitmq import RPCRouter
1212

13-
from ...modules.celery import get_task_manager_from_app
1413
from .._worker_tasks._paths import compute_path_size as remote_compute_path_size
1514
from .._worker_tasks._paths import delete_paths as remote_delete_paths
1615

@@ -20,13 +19,13 @@
2019

2120
@router.expose(reraise_if_error_type=None)
2221
async def compute_path_size(
23-
app: FastAPI,
22+
task_manager: TaskManager,
2423
job_id_data: AsyncJobNameData,
2524
location_id: LocationID,
2625
path: Path,
2726
) -> AsyncJobGet:
2827
task_name = remote_compute_path_size.__name__
29-
task_uuid = await get_task_manager_from_app(app).submit_task(
28+
task_uuid = await task_manager.submit_task(
3029
task_metadata=TaskMetadata(
3130
name=task_name,
3231
),
@@ -41,13 +40,13 @@ async def compute_path_size(
4140

4241
@router.expose(reraise_if_error_type=None)
4342
async def delete_paths(
44-
app: FastAPI,
43+
task_manager: TaskManager,
4544
job_id_data: AsyncJobNameData,
4645
location_id: LocationID,
4746
paths: set[Path],
4847
) -> AsyncJobGet:
4948
task_name = remote_delete_paths.__name__
50-
task_uuid = await get_task_manager_from_app(app).submit_task(
49+
task_uuid = await task_manager.submit_task(
5150
task_metadata=TaskMetadata(
5251
name=task_name,
5352
),

services/storage/src/simcore_service_storage/api/rpc/_simcore_s3.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,26 @@
1-
from fastapi import FastAPI
21
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
32
AsyncJobGet,
43
AsyncJobNameData,
54
)
65
from models_library.api_schemas_storage.storage_schemas import FoldersBody
76
from models_library.api_schemas_webserver.storage import PathToExport
87
from servicelib.celery.models import TaskMetadata, TasksQueue
8+
from servicelib.celery.task_manager import TaskManager
99
from servicelib.rabbitmq import RPCRouter
1010

11-
from ...modules.celery import get_task_manager_from_app
1211
from .._worker_tasks._simcore_s3 import deep_copy_files_from_project, export_data
1312

1413
router = RPCRouter()
1514

1615

1716
@router.expose(reraise_if_error_type=None)
1817
async def copy_folders_from_project(
19-
app: FastAPI,
18+
task_manager: TaskManager,
2019
job_id_data: AsyncJobNameData,
2120
body: FoldersBody,
2221
) -> AsyncJobGet:
2322
task_name = deep_copy_files_from_project.__name__
24-
task_uuid = await get_task_manager_from_app(app).submit_task(
23+
task_uuid = await task_manager.submit_task(
2524
task_metadata=TaskMetadata(
2625
name=task_name,
2726
),
@@ -35,10 +34,12 @@ async def copy_folders_from_project(
3534

3635
@router.expose()
3736
async def start_export_data(
38-
app: FastAPI, job_id_data: AsyncJobNameData, paths_to_export: list[PathToExport]
37+
task_manager: TaskManager,
38+
job_id_data: AsyncJobNameData,
39+
paths_to_export: list[PathToExport],
3940
) -> AsyncJobGet:
4041
task_name = export_data.__name__
41-
task_uuid = await get_task_manager_from_app(app).submit_task(
42+
task_uuid = await task_manager.submit_task(
4243
task_metadata=TaskMetadata(
4344
name=task_name,
4445
ephemeral=False,

services/storage/src/simcore_service_storage/api/rpc/routes.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import logging
22

3+
from celery_library.rpc import _async_jobs
34
from fastapi import FastAPI
45
from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE
56
from servicelib.logging_utils import log_context
67
from servicelib.rabbitmq import RPCRouter
8+
from simcore_service_storage.modules.celery import get_task_manager_from_app
79

810
from ...modules.rabbitmq import get_rabbitmq_rpc_server
9-
from . import _async_jobs, _paths, _simcore_s3
11+
from . import _paths, _simcore_s3
1012

1113
_logger = logging.getLogger(__name__)
1214

@@ -18,15 +20,18 @@
1820
]
1921

2022

21-
def setup_rpc_api_routes(app: FastAPI) -> None:
23+
def setup_rpc_routes(app: FastAPI) -> None:
2224
async def startup() -> None:
2325
with log_context(
2426
_logger,
2527
logging.INFO,
2628
msg="Storage startup RPC API Routes",
2729
):
2830
rpc_server = get_rabbitmq_rpc_server(app)
31+
task_manager = get_task_manager_from_app(app)
2932
for router in ROUTERS:
30-
await rpc_server.register_router(router, STORAGE_RPC_NAMESPACE, app)
33+
await rpc_server.register_router(
34+
router, STORAGE_RPC_NAMESPACE, task_manager=task_manager
35+
)
3136

3237
app.add_event_handler("startup", startup)

services/storage/src/simcore_service_storage/core/application.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
APP_WORKER_STARTED_BANNER_MSG,
3333
)
3434
from ..api.rest.routes import setup_rest_api_routes
35-
from ..api.rpc.routes import setup_rpc_api_routes
35+
from ..api.rpc.routes import setup_rpc_routes
3636
from ..dsm import setup_dsm
3737
from ..dsm_cleaner import setup_dsm_cleaner
3838
from ..exceptions.handlers import set_exception_handlers
@@ -92,10 +92,11 @@ def create_app(settings: ApplicationSettings) -> FastAPI: # noqa: C901
9292

9393
if not settings.STORAGE_WORKER_MODE:
9494
setup_rabbitmq(app)
95-
setup_rpc_api_routes(app)
9695

9796
assert settings.STORAGE_CELERY # nosec
9897
setup_task_manager(app, celery_settings=settings.STORAGE_CELERY)
98+
99+
setup_rpc_routes(app)
99100
setup_rest_api_long_running_tasks_for_uploads(app)
100101
setup_rest_api_routes(app, API_VTAG)
101102
set_exception_handlers(app)

services/storage/src/simcore_service_storage/modules/celery/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
def setup_task_manager(app: FastAPI, celery_settings: CelerySettings) -> None:
1515
async def on_startup() -> None:
16-
app.state.celery_client = await create_task_manager(
16+
app.state.task_manager = await create_task_manager(
1717
create_app(celery_settings), celery_settings
1818
)
1919

@@ -24,7 +24,7 @@ async def on_startup() -> None:
2424

2525

2626
def get_task_manager_from_app(app: FastAPI) -> CeleryTaskManager:
27-
assert hasattr(app.state, "celery_client") # nosec
28-
celery_client = app.state.celery_client
29-
assert isinstance(celery_client, CeleryTaskManager) # nosec
30-
return celery_client
27+
assert hasattr(app.state, "task_manager") # nosec
28+
task_manager = app.state.task_manager
29+
assert isinstance(task_manager, CeleryTaskManager) # nosec
30+
return task_manager

0 commit comments

Comments
 (0)