Skip to content

Commit 411f9ee

Browse files
author
Andrei Neagu
committed
add progress sending via rabbitmq
1 parent dbf3f29 commit 411f9ee

File tree

5 files changed

+83
-12
lines changed

5 files changed

+83
-12
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from typing import TypeAlias
2+
3+
OsparcJobId: TypeAlias = str

packages/models-library/src/models_library/rabbitmq_messages.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import Any, Literal, TypeAlias
77

88
import arrow
9+
from models_library.osparc_jobs import OsparcJobId
910
from pydantic import BaseModel, Field
1011

1112
from .products import ProductName
@@ -123,6 +124,8 @@ def routing_key(self) -> str | None:
123124

124125

125126
class ProgressRabbitMessageWorkerJob(ProgressMessageMixin, WorkerJobMessageBase):
127+
osparc_job_id: OsparcJobId
128+
126129
def routing_key(self) -> str | None:
127130
return f"{self.user_id}.worker_job"
128131

services/storage/src/simcore_service_storage/api/celery/_export_task.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
from asyncio import AbstractEventLoop, get_event_loop, run_coroutine_threadsafe
2-
from typing import TypeAlias, cast
2+
from typing import cast
33
from uuid import uuid4
44

55
from fastapi import FastAPI
6+
from models_library.osparc_jobs import OsparcJobId
67
from models_library.projects_nodes_io import StorageFileID
78
from models_library.users import UserID
89
from servicelib.progress_bar import ProgressReport, ReportCB
910

1011
from ...dsm import SimcoreS3DataManager, get_dsm_provider
12+
from ...modules.rabbitmq import post_task_progress_message
1113
from ._tqdm_utils import get_export_progress, set_absolute_progress
1214

13-
OsparcJobID: TypeAlias = str
14-
1515

1616
async def _async_export(
1717
app: FastAPI,
1818
*,
19-
osparc_job_id: OsparcJobID,
19+
osparc_job_id: OsparcJobId,
2020
user_id: UserID,
2121
paths_to_export: list[StorageFileID],
2222
progress_cb: ReportCB,
@@ -26,9 +26,7 @@ async def _async_export(
2626
get_dsm_provider(app).get(SimcoreS3DataManager.get_location_id()),
2727
)
2828

29-
with get_export_progress(
30-
total=1, description=f"export job: {osparc_job_id}"
31-
) as pbar:
29+
with get_export_progress(total=1, description=f"{osparc_job_id}") as pbar:
3230

3331
def _sync_progress_updates(report: ProgressReport) -> None:
3432
set_absolute_progress(pbar, current_progress=report.actual_value)
@@ -38,7 +36,7 @@ def _sync_progress_updates(report: ProgressReport) -> None:
3836

3937
async def _progress_cb(report: ProgressReport) -> None:
4038
await get_event_loop().run_in_executor(None, _sync_progress_updates, report)
41-
# TODO: propagate progress via socketio -> also include `osparc_job_id`
39+
await post_task_progress_message(app, user_id, osparc_job_id, report)
4240

4341
return await dsm.create_s3_export(
4442
user_id, paths_to_export, progress_cb=_progress_cb
@@ -55,10 +53,11 @@ def fake_celery_export( # TODO: replace this with the Celery interface
5553
# The Celery task should propagate this information to the FE togehter with the progress (nmaybe as metadata?).
5654
# When progress is published via socket.io `osparc_job_id` will be used by the FE to associate the value
5755
# to the proper elment.
58-
osparc_job_id = f"some_random_id_{uuid4()}"
56+
osparc_job_id = f"{user_id}_fake_celery_export_{uuid4()}"
5957

6058
def _progress_cb(report: ProgressReport) -> None:
61-
_ = report # TODO: propagate to celery task status
59+
# TODO: propagate progress via CeleryWorkerInterface
60+
_ = report
6261

6362
return run_coroutine_threadsafe(
6463
_async_export(

services/storage/src/simcore_service_storage/modules/rabbitmq.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,17 @@
11
import logging
2+
from contextlib import suppress
23
from typing import cast
34

45
from fastapi import FastAPI
5-
from servicelib.logging_utils import log_context
6+
from models_library.osparc_jobs import OsparcJobId
7+
from models_library.progress_bar import ProgressReport
8+
from models_library.rabbitmq_messages import (
9+
ProgressRabbitMessageWorkerJob,
10+
ProgressType,
11+
RabbitMessageBase,
12+
)
13+
from models_library.users import UserID
14+
from servicelib.logging_utils import log_catch, log_context
615
from servicelib.rabbitmq import (
716
RabbitMQClient,
817
RabbitMQRPCClient,
@@ -52,7 +61,7 @@ async def on_shutdown() -> None:
5261

5362

5463
def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient:
55-
if not app.state.rabbitmq_client:
64+
if not hasattr(app.state, "rabbitmq_client"):
5665
raise ConfigurationError(
5766
msg="RabbitMQ client is not available. Please check the configuration."
5867
)
@@ -62,3 +71,22 @@ def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient:
6271
def get_rabbitmq_rpc_server(app: FastAPI) -> RabbitMQRPCClient:
6372
assert app.state.rabbitmq_rpc_server # nosec
6473
return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_server)
74+
75+
76+
async def post_message(app: FastAPI, message: RabbitMessageBase) -> None:
77+
with log_catch(_logger, reraise=False), suppress(ConfigurationError):
78+
# NOTE: if rabbitmq was not initialized the error does not need to flood the logs
79+
await get_rabbitmq_client(app).publish(message.channel_name, message)
80+
81+
82+
async def post_task_progress_message(
83+
app: FastAPI, user_id: UserID, osparc_job_id: OsparcJobId, report: ProgressReport
84+
) -> None:
85+
with log_catch(_logger, reraise=False):
86+
message = ProgressRabbitMessageWorkerJob.model_construct(
87+
user_id=user_id,
88+
osparc_job_id=osparc_job_id,
89+
progress_type=ProgressType.WORKER_JOB_EXPORTING,
90+
report=report,
91+
)
92+
await post_message(app, message)
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from asyncio import AbstractEventLoop
2+
from unittest.mock import AsyncMock
3+
4+
import pytest
5+
from fastapi import FastAPI
6+
from models_library.projects_nodes_io import StorageFileID
7+
from pydantic import TypeAdapter
8+
from pytest_mock import MockerFixture
9+
from simcore_service_storage.api.celery._export_task import fake_celery_export
10+
11+
pytest_simcore_core_services_selection = ["postgres"]
12+
pytest_simcore_ops_services_selection = ["adminer"]
13+
14+
15+
@pytest.fixture
16+
def mock_rabbitmq_post_message(mocker: MockerFixture) -> AsyncMock:
17+
return mocker.patch("simcore_service_storage.modules.rabbitmq.post_message")
18+
19+
20+
async def test_celery_export_task(
21+
event_loop: AbstractEventLoop,
22+
initialized_app: FastAPI,
23+
mock_rabbitmq_post_message: AsyncMock,
24+
):
25+
# TODO: rewrite this test with the Celery job subscription tests from GCR
26+
27+
file_id = await event_loop.run_in_executor(
28+
None, fake_celery_export, event_loop, initialized_app, 1, []
29+
)
30+
31+
# check progress values, since there are no files to read progress goes from 0 to 1
32+
assert mock_rabbitmq_post_message.call_count == 2
33+
progress_values = [
34+
x.args[1].report.actual_value for x in mock_rabbitmq_post_message.call_args_list
35+
]
36+
assert progress_values == [0, 1]
37+
38+
assert TypeAdapter(StorageFileID).validate_python(file_id)

0 commit comments

Comments
 (0)