Skip to content

Commit d0a97fb

Browse files
adapt code
1 parent 87a457f commit d0a97fb

File tree

4 files changed

+33
-15
lines changed

4 files changed

+33
-15
lines changed

packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
class AsyncJobStatus(BaseModel):
1515
job_id: AsyncJobId
16-
progress: ProgressReport
16+
progress: ProgressReport | None
1717
done: bool
1818
started: datetime
1919
stopped: datetime | None

services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
# pylint: disable=unused-argument
22
from datetime import datetime
3-
from uuid import uuid4
43

54
from fastapi import FastAPI
65
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
@@ -15,9 +14,11 @@
1514
ResultError,
1615
StatusError,
1716
)
18-
from models_library.progress_bar import ProgressReport
1917
from servicelib.rabbitmq import RPCRouter
2018

19+
from ...modules.celery.models import TaskStatus, TaskUUID
20+
from ...modules.celery.utils import get_celery_client
21+
2122
router = RPCRouter()
2223

2324

@@ -36,13 +37,17 @@ async def get_status(
3637
) -> AsyncJobStatus:
3738
assert app # nosec
3839
assert job_id_data # nosec
39-
progress_report = ProgressReport(actual_value=0.5, total=1.0, attempt=1)
40+
41+
task_status: TaskStatus = await get_celery_client(app).get_task_status(
42+
task_context=job_id_data.model_dump(),
43+
task_uuid=job_id,
44+
)
4045
return AsyncJobStatus(
4146
job_id=job_id,
42-
progress=progress_report,
47+
progress=task_status.progress_report,
4348
done=False,
44-
started=datetime.now(),
45-
stopped=None,
49+
started=datetime.now(), # TODO: retrieve that
50+
stopped=None, # TODO: retrieve that
4651
)
4752

4853

@@ -53,12 +58,23 @@ async def get_result(
5358
assert app # nosec
5459
assert job_id # nosec
5560
assert job_id_data # nosec
56-
return AsyncJobResult(result="Here's your result.", error=None)
61+
62+
result = await get_celery_client(app).get_result(
63+
task_context=job_id_data.model_dump(),
64+
task_uuid=job_id,
65+
)
66+
67+
return AsyncJobResult(result=result, error=None)
5768

5869

5970
@router.expose()
6071
async def list_jobs(
61-
app: FastAPI, filter_: str, job_id_data: AsyncJobNameData
72+
app: FastAPI, filter_: str, job_id_data: AsyncJobNameData # TODO: implement filter
6273
) -> list[AsyncJobGet]:
6374
assert app # nosec
64-
return [AsyncJobGet(job_id=AsyncJobId(f"{uuid4()}"))]
75+
76+
task_uuids: set[TaskUUID] = await get_celery_client(app).get_task_uuids(
77+
task_context=job_id_data.model_dump(),
78+
)
79+
80+
return [AsyncJobGet(job_id=task_uuid) for task_uuid in task_uuids]

services/storage/src/simcore_service_storage/api/rpc/_data_export.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
from uuid import uuid4
2-
31
from fastapi import FastAPI
42
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
53
AsyncJobGet,
6-
AsyncJobId,
74
AsyncJobNameData,
85
)
96
from models_library.api_schemas_storage.data_export_async_jobs import (
@@ -17,6 +14,7 @@
1714
from ...datcore_dsm import DatCoreDataManager
1815
from ...dsm import get_dsm_provider
1916
from ...exceptions.errors import FileAccessRightError
17+
from ...modules.celery.utils import get_celery_client
2018
from ...modules.datcore_adapter.datcore_adapter_exceptions import DatcoreAdapterError
2119
from ...simcore_s3_dsm import SimcoreS3DataManager
2220

@@ -53,6 +51,10 @@ async def start_data_export(
5351
location_id=data_export_start.location_id,
5452
) from err
5553

54+
task_uuid = await get_celery_client(app).send_task(
55+
"export_data", task_context=job_id_data.model_dump()
56+
)
57+
5658
return AsyncJobGet(
57-
job_id=AsyncJobId(f"{uuid4()}"),
59+
job_id=task_uuid,
5860
)

services/storage/src/simcore_service_storage/modules/celery/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def _get_completed_task_ids(self, task_context: TaskContext) -> set[TaskUUID]:
105105
return set()
106106

107107
@make_async()
108-
def get_task_ids(self, task_context: TaskContext) -> set[TaskUUID]:
108+
def get_task_uuids(self, task_context: TaskContext) -> set[TaskUUID]:
109109
all_task_ids = self._get_completed_task_ids(task_context)
110110

111111
for task_inspect_status in _CELERY_INSPECT_TASK_STATUSES:

0 commit comments

Comments
 (0)