Skip to content

Commit 9c9adf0

Browse files
refactor
1 parent 754c065 commit 9c9adf0

File tree

2 files changed

+25
-19
lines changed

2 files changed

+25
-19
lines changed

services/storage/src/simcore_service_storage/api/rpc/_data_export.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from uuid import uuid4
2-
31
from fastapi import FastAPI
42
from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobGet, AsyncJobId
53
from models_library.api_schemas_storage.data_export_async_jobs import (
@@ -10,6 +8,9 @@
108
)
119
from servicelib.rabbitmq import RPCRouter
1210

11+
from ...modules.celery.client import CeleryTaskQueueClient, TaskIDParts
12+
from ...modules.celery.utils import get_celery_client
13+
1314
router = RPCRouter()
1415

1516

@@ -24,7 +25,18 @@ async def start_data_export(
2425
app: FastAPI, paths: DataExportTaskStartInput
2526
) -> AsyncJobGet:
2627
assert app # nosec
28+
29+
client: CeleryTaskQueueClient = get_celery_client(app)
30+
31+
task_id = await client.send_task(
32+
task_name="sync_archive",
33+
task_id_parts=TaskIDParts(
34+
user_id=paths.user_id, product_name=paths.product_name
35+
),
36+
files=paths.paths,
37+
)
38+
2739
return AsyncJobGet(
28-
job_id=AsyncJobId(f"{uuid4()}"),
40+
job_id=AsyncJobId(task_id),
2941
job_name=", ".join(str(p) for p in paths.paths),
3042
)

services/storage/src/simcore_service_storage/modules/celery/client.py

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,22 @@
1010

1111
from .models import TaskID, TaskIDParts, TaskStatus
1212

13-
_PREFIX: Final = "ct"
14-
1513
_logger = logging.getLogger(__name__)
1614

17-
18-
def _get_task_id_components(task_id_parts: TaskIDParts) -> list[str]:
19-
return sorted(map(str, task_id_parts.values()))
20-
21-
22-
def _get_components_prefix(name: str, task_id_parts: TaskIDParts) -> list[str]:
23-
return [_PREFIX, name, *_get_task_id_components(task_id_parts)]
15+
_CELERY_TASK_META_PREFIX = "celery-task-meta-"
16+
_PREFIX: Final[str] = "ct"
2417

2518

26-
def _get_task_id_prefix(name: str, task_id_parts: TaskIDParts) -> TaskID:
27-
return "::".join(_get_components_prefix(name, task_id_parts))
19+
def _build_parts_prefix(name: str, task_id_parts: TaskIDParts) -> list[str]:
20+
return [_PREFIX, name, *[f"{task_id_parts[key]}" for key in sorted(task_id_parts)]]
2821

2922

30-
def _get_task_id(name: str, task_id_parts: TaskIDParts) -> TaskID:
31-
return "::".join([*_get_components_prefix(name, task_id_parts), f"{uuid4()}"])
23+
def build_task_id_prefix(name: str, task_id_parts: TaskIDParts) -> TaskID:
24+
return "::".join(_build_parts_prefix(name, task_id_parts))
3225

3326

34-
_CELERY_TASK_META_PREFIX = "celery-task-meta-"
27+
def build_task_id(name: str, task_id_parts: TaskIDParts) -> TaskID:
28+
return "::".join([*_build_parts_prefix(name, task_id_parts), f"{uuid4()}"])
3529

3630

3731
class CeleryTaskQueueClient:
@@ -42,7 +36,7 @@ def __init__(self, celery_app: Celery):
4236
def send_task(
4337
self, task_name: str, *, task_id_parts: TaskIDParts, **task_params
4438
) -> TaskID:
45-
task_id = _get_task_id(task_name, task_id_parts)
39+
task_id = build_task_id(task_name, task_id_parts)
4640
_logger.debug("Submitting task %s: %s", task_name, task_id)
4741
task = self._celery_app.send_task(
4842
task_name, task_id=task_id, kwargs=task_params
@@ -84,7 +78,7 @@ def _get_completed_task_ids(
8478
) -> list[TaskID]:
8579
search_key = (
8680
_CELERY_TASK_META_PREFIX
87-
+ _get_task_id_prefix(task_name, task_id_parts)
81+
+ build_task_id_prefix(task_name, task_id_parts)
8882
+ "*"
8983
)
9084
redis = self._celery_app.backend.client

0 commit comments

Comments
 (0)