Skip to content

Commit 6389fda

Browse files
committed
use app-name for ASYNC_JOB_CLIENT_NAME in api-server
1 parent 31fc291 commit 6389fda

File tree

6 files changed

+27
-30
lines changed

6 files changed

+27
-30
lines changed

services/api-server/src/simcore_service_api_server/api/dependencies/celery.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
from typing import Final
2-
31
from celery_library.task_manager import CeleryTaskManager
42
from fastapi import FastAPI
53

6-
ASYNC_JOB_CLIENT_NAME: Final[str] = "api-server"
7-
84

95
def get_task_manager(app: FastAPI) -> CeleryTaskManager:
106
assert hasattr(app.state, "task_manager") # nosec

services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@
1818
RegisteredFunctionJob,
1919
RegisteredFunctionJobCollection,
2020
)
21-
from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobFilter
2221
from models_library.functions import FunctionJobCollection, FunctionJobID
2322
from models_library.products import ProductName
2423
from models_library.projects import ProjectID
2524
from models_library.projects_nodes_io import NodeID
2625
from models_library.users import UserID
27-
from servicelib.celery.models import TaskFilter, TaskID, TaskMetadata, TasksQueue
26+
from servicelib.celery.models import TaskID, TaskMetadata, TasksQueue
2827
from servicelib.fastapi.dependencies import get_reverse_url_mapper
2928
from servicelib.utils import limited_gather
3029

@@ -33,6 +32,7 @@
3332
from ...celery_worker.worker_tasks.functions_tasks import (
3433
run_function as run_function_task,
3534
)
35+
from ...clients.celery_task_manager import get_task_filter
3636
from ...exceptions.function_errors import FunctionJobCacheNotFoundError
3737
from ...models.pagination import Page, PaginationParams
3838
from ...models.schemas.errors import ErrorGet
@@ -44,7 +44,7 @@
4444
get_current_user_id,
4545
get_product_name,
4646
)
47-
from ..dependencies.celery import ASYNC_JOB_CLIENT_NAME, get_task_manager
47+
from ..dependencies.celery import get_task_manager
4848
from ..dependencies.services import get_function_job_service, get_function_service
4949
from ..dependencies.webserver_rpc import get_wb_api_rpc_client
5050
from ._constants import (
@@ -368,12 +368,9 @@ async def run_function(
368368
)
369369

370370
# run function in celery task
371-
job_filter = AsyncJobFilter(
372-
user_id=user_identity.user_id,
373-
product_name=user_identity.product_name,
374-
client_name=ASYNC_JOB_CLIENT_NAME,
371+
task_filter = get_task_filter(
372+
user_id=user_identity.user_id, product_name=user_identity.product_name
375373
)
376-
task_filter = TaskFilter.model_validate(job_filter.model_dump())
377374
task_name = run_function_task.__name__
378375

379376
task_uuid = await task_manager.submit_task(

services/api-server/src/simcore_service_api_server/api/routes/tasks.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,18 @@
1010
TaskStatus,
1111
)
1212
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
13-
AsyncJobFilter,
1413
AsyncJobId,
1514
)
1615
from models_library.products import ProductName
1716
from models_library.users import UserID
18-
from servicelib.celery.models import TaskFilter, TaskState, TaskUUID
17+
from servicelib.celery.models import TaskState, TaskUUID
1918
from servicelib.fastapi.dependencies import get_app
2019
from servicelib.logging_errors import create_troubleshootting_log_kwargs
2120

2221
from ...models.schemas.base import ApiServerEnvelope
2322
from ...models.schemas.errors import ErrorGet
2423
from ..dependencies.authentication import get_current_user_id, get_product_name
25-
from ..dependencies.celery import ASYNC_JOB_CLIENT_NAME, get_task_manager
24+
from ..dependencies.celery import get_task_manager
2625
from ._constants import (
2726
FMSG_CHANGELOG_NEW_IN_VERSION,
2827
create_route_description,
@@ -32,13 +31,6 @@
3231
_logger = logging.getLogger(__name__)
3332

3433

35-
def _get_task_filter(user_id: UserID, product_name: ProductName) -> TaskFilter:
36-
job_filter = AsyncJobFilter(
37-
user_id=user_id, product_name=product_name, client_name=ASYNC_JOB_CLIENT_NAME
38-
)
39-
return TaskFilter.model_validate(job_filter.model_dump())
40-
41-
4234
_DEFAULT_TASK_STATUS_CODES: dict[int | str, dict[str, Any]] = {
4335
status.HTTP_500_INTERNAL_SERVER_ERROR: {
4436
"description": "Internal server error",

services/api-server/src/simcore_service_api_server/clients/celery_task_manager.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,27 @@
11
from celery_library.common import create_app, create_task_manager
22
from celery_library.types import register_celery_types, register_pydantic_types
33
from fastapi import FastAPI
4+
from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobFilter
5+
from models_library.products import ProductName
6+
from models_library.users import UserID
7+
from servicelib.celery.models import TaskFilter
48
from settings_library.celery import CelerySettings
59

10+
from .._meta import APP_NAME
611
from ..celery_worker.worker_tasks.tasks import pydantic_types_to_register
712

813

14+
def get_job_filter(user_id: UserID, product_name: ProductName) -> AsyncJobFilter:
15+
return AsyncJobFilter(
16+
user_id=user_id, product_name=product_name, client_name=APP_NAME
17+
)
18+
19+
20+
def get_task_filter(user_id: UserID, product_name: ProductName) -> TaskFilter:
21+
job_filter = get_job_filter(user_id=user_id, product_name=product_name)
22+
return TaskFilter.model_validate(job_filter.model_dump())
23+
24+
925
def setup_task_manager(app: FastAPI, celery_settings: CelerySettings) -> None:
1026
async def on_startup() -> None:
1127
app.state.task_manager = await create_task_manager(

services/api-server/src/simcore_service_api_server/services_rpc/storage.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from functools import partial
33

44
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
5-
AsyncJobFilter,
65
AsyncJobGet,
76
)
87
from models_library.api_schemas_webserver.storage import PathToExport
@@ -11,7 +10,7 @@
1110
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
1211
from servicelib.rabbitmq.rpc_interfaces.storage import simcore_s3 as storage_rpc
1312

14-
from ..api.dependencies.celery import ASYNC_JOB_CLIENT_NAME
13+
from ..clients.celery_task_manager import get_job_filter
1514
from ..exceptions.service_errors_utils import service_exception_mapper
1615

1716
_exception_mapper = partial(service_exception_mapper, service_name="Storage")
@@ -32,10 +31,9 @@ async def start_data_export(
3231
self._rpc_client,
3332
paths_to_export=paths_to_export,
3433
export_as="download_link",
35-
job_filter=AsyncJobFilter(
34+
job_filter=get_job_filter(
3635
user_id=self._user_id,
3736
product_name=self._product_name,
38-
client_name=ASYNC_JOB_CLIENT_NAME,
3937
),
4038
)
4139
return async_job_get

services/api-server/tests/unit/api_functions/celery/test_functions_celery.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from fastapi import FastAPI, status
2424
from httpx import AsyncClient, BasicAuth, HTTPStatusError
2525
from models_library.api_schemas_long_running_tasks.tasks import TaskResult, TaskStatus
26-
from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobFilter
2726
from models_library.functions import (
2827
FunctionClass,
2928
FunctionID,
@@ -50,12 +49,12 @@
5049
from simcore_service_api_server._meta import API_VTAG
5150
from simcore_service_api_server.api.dependencies.authentication import Identity
5251
from simcore_service_api_server.api.dependencies.celery import (
53-
ASYNC_JOB_CLIENT_NAME,
5452
get_task_manager,
5553
)
5654
from simcore_service_api_server.celery_worker.worker_tasks.functions_tasks import (
5755
run_function as run_function_task,
5856
)
57+
from simcore_service_api_server.clients.celery_task_manager import get_job_filter
5958
from simcore_service_api_server.exceptions.backend_errors import BaseBackEndError
6059
from simcore_service_api_server.models.api_resources import JobLinks
6160
from simcore_service_api_server.models.domain.functions import (
@@ -281,10 +280,9 @@ async def test_celery_error_propagation(
281280
with_api_server_celery_worker: TestWorkController,
282281
):
283282

284-
job_filter = AsyncJobFilter(
283+
job_filter = get_job_filter(
285284
user_id=user_identity.user_id,
286285
product_name=user_identity.product_name,
287-
client_name=ASYNC_JOB_CLIENT_NAME,
288286
)
289287
task_manager = get_task_manager(app=app)
290288
task_uuid = await task_manager.submit_task(

0 commit comments

Comments
 (0)