Skip to content

Commit e5859f6

Browse files
author
Andrei Neagu
committed
fixed export task
1 parent 7ced8e7 commit e5859f6

File tree

4 files changed

+22
-44
lines changed

4 files changed

+22
-44
lines changed

services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from ...modules.celery._celery_types import register_celery_types
77
from ...modules.celery._task import define_task
8-
from ...modules.celery.tasks import export_data
8+
from ._data_export import data_export
99
from ._paths import compute_path_size
1010

1111
_logger = logging.getLogger(__name__)
@@ -18,5 +18,5 @@ def setup_worker_tasks(app: Celery) -> None:
1818
logging.INFO,
1919
msg="Storage setup Worker Tasks",
2020
):
21-
define_task(app, export_data)
21+
define_task(app, data_export)
2222
define_task(app, compute_path_size)

services/storage/src/simcore_service_storage/api/rpc/_data_export.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from ...modules.celery import get_celery_client
1818
from ...modules.datcore_adapter.datcore_adapter_exceptions import DatcoreAdapterError
1919
from ...simcore_s3_dsm import SimcoreS3DataManager
20+
from .._worker_tasks._data_export import data_export
2021

2122
router = RPCRouter()
2223

@@ -52,9 +53,9 @@ async def start_data_export(
5253
) from err
5354

5455
task_uuid = await get_celery_client(app).send_task(
55-
"export_data",
56+
data_export.__name__,
5657
task_context=job_id_data.model_dump(),
57-
files=data_export_start.file_and_folder_ids, # ANE: adapt here your signature
58+
files=data_export_start.file_and_folder_ids,
5859
)
5960

6061
return AsyncJobGet(

services/storage/src/simcore_service_storage/modules/celery/tasks.py

Lines changed: 0 additions & 29 deletions
This file was deleted.
Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,34 @@
11
import logging
22

3-
from celery import Celery # type: ignore[import-untyped]
3+
from celery import Celery, Task # type: ignore[import-untyped]
4+
from common_library.async_tools import make_async
45
from models_library.progress_bar import ProgressReport
56
from servicelib.logging_utils import log_context
67

7-
from .models import TaskID
8-
98
_logger = logging.getLogger(__name__)
109

1110

1211
class CeleryTaskQueueWorker:
1312
def __init__(self, celery_app: Celery) -> None:
1413
self.celery_app = celery_app
1514

16-
def set_task_progress(
17-
self, task_name: str, task_id: TaskID, report: ProgressReport
18-
) -> None:
15+
@make_async()
16+
def set_task_progress(self, task: Task, report: ProgressReport) -> None:
17+
assert task.name # nosec
18+
task_name = task.name
19+
task_id = task.request.id
20+
1921
with log_context(
2022
_logger,
2123
logging.DEBUG,
2224
msg=f"Setting progress for {task_name}: {report.model_dump_json()}",
2325
):
24-
self.celery_app.tasks[task_name].update_state(
25-
task_id=task_id,
26-
state="RUNNING",
27-
meta=report.model_dump(mode="json"),
28-
)
26+
try:
27+
self.celery_app.tasks[task_name].update_state(
28+
task_id=task_id,
29+
state="RUNNING",
30+
meta=report.model_dump(mode="json"),
31+
)
32+
except ValueError as e:
33+
if "task_id must not be empty. Got None instead." not in f"{e}":
34+
raise

0 commit comments

Comments
 (0)