Skip to content

Commit a6d85cc

Browse files
♻️ Further cleanup of async jobs framework (#7424)
1 parent 7202315 commit a6d85cc

File tree

8 files changed

+28
-20
lines changed

8 files changed

+28
-20
lines changed

services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ async def list_jobs(
110110
app: FastAPI, filter_: str, job_id_data: AsyncJobNameData
111111
) -> list[AsyncJobGet]:
112112
assert app # nosec
113-
114113
try:
115114
task_uuids = await get_celery_client(app).get_task_uuids(
116115
task_context=job_id_data.model_dump(),

services/storage/src/simcore_service_storage/api/rpc/_data_export.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import logging
2+
13
from celery.exceptions import CeleryError # type: ignore[import-untyped]
24
from fastapi import FastAPI
35
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
@@ -19,6 +21,8 @@
1921
from ...modules.datcore_adapter.datcore_adapter_exceptions import DatcoreAdapterError
2022
from ...simcore_s3_dsm import SimcoreS3DataManager
2123

24+
_logger = logging.getLogger(__name__)
25+
2226
router = RPCRouter()
2327

2428

services/storage/src/simcore_service_storage/api/rpc/_paths.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
from pathlib import Path
23

34
from fastapi import FastAPI
@@ -11,6 +12,7 @@
1112
from ...modules.celery import get_celery_client
1213
from .._worker_tasks._paths import compute_path_size as remote_compute_path_size
1314

15+
_logger = logging.getLogger(__name__)
1416
router = RPCRouter()
1517

1618

@@ -23,7 +25,6 @@ async def compute_path_size(
2325
path: Path,
2426
) -> AsyncJobGet:
2527
assert app # nosec
26-
2728
task_uuid = await get_celery_client(app).send_task(
2829
remote_compute_path_size.__name__,
2930
task_context=job_id_data.model_dump(),

services/storage/src/simcore_service_storage/modules/celery/client.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
_CELERY_TASK_ID_KEY_ENCODING = "utf-8"
3939

4040
_MIN_PROGRESS_VALUE = 0.0
41-
_MAX_PROGRESS_VALUE = 100.0
41+
_MAX_PROGRESS_VALUE = 1.0
4242

4343

4444
def _build_context_prefix(task_context: TaskContext) -> list[str]:
@@ -109,8 +109,12 @@ def _get_progress_report(
109109
TaskState.ERROR,
110110
TaskState.SUCCESS,
111111
):
112-
return ProgressReport(actual_value=_MAX_PROGRESS_VALUE)
113-
return ProgressReport(actual_value=_MIN_PROGRESS_VALUE)
112+
return ProgressReport(
113+
actual_value=_MAX_PROGRESS_VALUE, total=_MAX_PROGRESS_VALUE
114+
)
115+
return ProgressReport(
116+
actual_value=_MIN_PROGRESS_VALUE, total=_MAX_PROGRESS_VALUE
117+
)
114118

115119
def _get_state(self, task_context: TaskContext, task_uuid: TaskUUID) -> TaskState:
116120
task_id = _build_task_id(task_context, task_uuid)

services/storage/src/simcore_service_storage/modules/celery/models.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from enum import StrEnum, auto
2-
from typing import Any, Final, Self, TypeAlias
2+
from typing import Any, Self, TypeAlias
33
from uuid import UUID
44

55
from models_library.progress_bar import ProgressReport
@@ -9,9 +9,6 @@
99
TaskID: TypeAlias = str
1010
TaskUUID: TypeAlias = UUID
1111

12-
_MIN_PROGRESS: Final[float] = 0.0
13-
_MAX_PROGRESS: Final[float] = 100.0
14-
1512

1613
class TaskState(StrEnum):
1714
PENDING = auto()
@@ -36,13 +33,15 @@ def is_done(self) -> bool:
3633
@model_validator(mode="after")
3734
def _check_consistency(self) -> Self:
3835
value = self.progress_report.actual_value
36+
min_value = 0.0
37+
max_value = self.progress_report.total
3938

4039
valid_states = {
41-
TaskState.PENDING: value == _MIN_PROGRESS,
42-
TaskState.RUNNING: _MIN_PROGRESS <= value <= _MAX_PROGRESS,
43-
TaskState.SUCCESS: value == _MAX_PROGRESS,
44-
TaskState.ABORTED: value == _MAX_PROGRESS,
45-
TaskState.ERROR: value == _MAX_PROGRESS,
40+
TaskState.PENDING: value == min_value,
41+
TaskState.RUNNING: min_value <= value <= max_value,
42+
TaskState.SUCCESS: value == max_value,
43+
TaskState.ABORTED: value == max_value,
44+
TaskState.ERROR: value == max_value,
4645
}
4746

4847
if not valid_states.get(self.task_state, True):

services/storage/src/simcore_service_storage/modules/celery/tasks.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
def export_data(task: Task, files: list[StorageFileID]):
1515
_logger.info("Exporting files: %s", files)
16+
assert len(files) > 0
1617
for n, file in enumerate(files, start=1):
1718
with log_context(
1819
_logger,
@@ -23,7 +24,7 @@ def export_data(task: Task, files: list[StorageFileID]):
2324
get_celery_worker(task.app).set_task_progress(
2425
task_name=task.name,
2526
task_id=task.request.id,
26-
report=ProgressReport(actual_value=n / len(files) * 100),
27+
report=ProgressReport(actual_value=n / len(files), total=1),
2728
)
2829
time.sleep(10)
2930
return "done"

services/storage/tests/unit/modules/celery/test_celery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def sleep_for(seconds: float) -> None:
4141
worker.set_task_progress(
4242
task_name=task_name,
4343
task_id=task_id,
44-
report=ProgressReport(actual_value=n / len(files) * 10),
44+
report=ProgressReport(actual_value=n / len(files), total=1.0),
4545
)
4646
await asyncio.get_event_loop().run_in_executor(None, sleep_for, 1)
4747

services/storage/tests/unit/test_data_export.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ async def test_get_data_export_status_error(
454454
"get_task_status_object": TaskStatus(
455455
task_uuid=TaskUUID(_faker.uuid4()),
456456
task_state=TaskState.SUCCESS,
457-
progress_report=ProgressReport(actual_value=100),
457+
progress_report=ProgressReport(actual_value=1, total=1),
458458
),
459459
"get_task_result_object": "result",
460460
"get_task_uuids_object": [AsyncJobId(_faker.uuid4())],
@@ -489,7 +489,7 @@ async def test_get_data_export_result_success(
489489
"get_task_status_object": TaskStatus(
490490
task_uuid=TaskUUID(_faker.uuid4()),
491491
task_state=TaskState.RUNNING,
492-
progress_report=ProgressReport(actual_value=50),
492+
progress_report=ProgressReport(actual_value=0.5, total=1.0),
493493
),
494494
"get_task_result_object": _faker.text(),
495495
"get_task_uuids_object": [AsyncJobId(_faker.uuid4())],
@@ -501,7 +501,7 @@ async def test_get_data_export_result_success(
501501
"get_task_status_object": TaskStatus(
502502
task_uuid=TaskUUID(_faker.uuid4()),
503503
task_state=TaskState.ABORTED,
504-
progress_report=ProgressReport(actual_value=100),
504+
progress_report=ProgressReport(actual_value=1.0, total=1.0),
505505
),
506506
"get_task_result_object": _faker.text(),
507507
"get_task_uuids_object": [AsyncJobId(_faker.uuid4())],
@@ -513,7 +513,7 @@ async def test_get_data_export_result_success(
513513
"get_task_status_object": TaskStatus(
514514
task_uuid=TaskUUID(_faker.uuid4()),
515515
task_state=TaskState.ERROR,
516-
progress_report=ProgressReport(actual_value=100),
516+
progress_report=ProgressReport(actual_value=1.0, total=1.0),
517517
),
518518
"get_task_result_object": _faker.text(),
519519
"get_task_uuids_object": [AsyncJobId(_faker.uuid4())],

0 commit comments

Comments
 (0)