Skip to content

Commit f3ca256

Browse files
author
Andrei Neagu
committed
purged unused
1 parent 1c0d5ab commit f3ca256

File tree

14 files changed

+44
-209
lines changed

14 files changed

+44
-209
lines changed

packages/models-library/src/models_library/osparc_jobs.py

Lines changed: 0 additions & 3 deletions
This file was deleted.

packages/models-library/src/models_library/rabbitmq_messages.py

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from typing import Any, Literal, TypeAlias
77

88
import arrow
9-
from models_library.osparc_jobs import OsparcJobId
109
from pydantic import BaseModel, Field
1110

1211
from .products import ProductName
@@ -51,11 +50,8 @@ def body(self) -> bytes:
5150
return self.model_dump_json().encode()
5251

5352

54-
class WorkerJobMessageBase(BaseModel):
53+
class ProjectMessageBase(BaseModel):
5554
user_id: UserID
56-
57-
58-
class ProjectMessageBase(WorkerJobMessageBase):
5955
project_id: ProjectID
6056

6157

@@ -97,13 +93,11 @@ class ProgressType(StrAutoEnum):
9793

9894
PROJECT_CLOSING = auto()
9995

100-
WORKER_JOB_EXPORTING = auto()
101-
10296

10397
class ProgressMessageMixin(RabbitMessageBase):
104-
channel_name: Literal[
98+
channel_name: Literal["simcore.services.progress.v2"] = (
10599
"simcore.services.progress.v2"
106-
] = "simcore.services.progress.v2"
100+
)
107101
progress_type: ProgressType = (
108102
ProgressType.COMPUTATION_RUNNING
109103
) # NOTE: backwards compatible
@@ -123,17 +117,10 @@ def routing_key(self) -> str | None:
123117
return f"{self.project_id}.all_nodes"
124118

125119

126-
class ProgressRabbitMessageWorkerJob(ProgressMessageMixin, WorkerJobMessageBase):
127-
osparc_job_id: OsparcJobId
128-
129-
def routing_key(self) -> str | None:
130-
return f"{self.user_id}.worker_job"
131-
132-
133120
class InstrumentationRabbitMessage(RabbitMessageBase, NodeMessageBase):
134-
channel_name: Literal[
121+
channel_name: Literal["simcore.services.instrumentation"] = (
135122
"simcore.services.instrumentation"
136-
] = "simcore.services.instrumentation"
123+
)
137124
metrics: str
138125
service_uuid: NodeID
139126
service_type: str
@@ -223,9 +210,9 @@ def routing_key(self) -> str | None:
223210

224211

225212
class RabbitResourceTrackingStartedMessage(RabbitResourceTrackingBaseMessage):
226-
message_type: Literal[
213+
message_type: Literal[RabbitResourceTrackingMessageType.TRACKING_STARTED] = (
227214
RabbitResourceTrackingMessageType.TRACKING_STARTED
228-
] = RabbitResourceTrackingMessageType.TRACKING_STARTED
215+
)
229216

230217
wallet_id: WalletID | None
231218
wallet_name: str | None
@@ -263,9 +250,9 @@ class RabbitResourceTrackingStartedMessage(RabbitResourceTrackingBaseMessage):
263250

264251

265252
class RabbitResourceTrackingHeartbeatMessage(RabbitResourceTrackingBaseMessage):
266-
message_type: Literal[
253+
message_type: Literal[RabbitResourceTrackingMessageType.TRACKING_HEARTBEAT] = (
267254
RabbitResourceTrackingMessageType.TRACKING_HEARTBEAT
268-
] = RabbitResourceTrackingMessageType.TRACKING_HEARTBEAT
255+
)
269256

270257

271258
class SimcorePlatformStatus(StrAutoEnum):
@@ -274,9 +261,9 @@ class SimcorePlatformStatus(StrAutoEnum):
274261

275262

276263
class RabbitResourceTrackingStoppedMessage(RabbitResourceTrackingBaseMessage):
277-
message_type: Literal[
264+
message_type: Literal[RabbitResourceTrackingMessageType.TRACKING_STOPPED] = (
278265
RabbitResourceTrackingMessageType.TRACKING_STOPPED
279-
] = RabbitResourceTrackingMessageType.TRACKING_STOPPED
266+
)
280267

281268
simcore_platform_status: SimcorePlatformStatus = Field(
282269
...,
@@ -310,9 +297,9 @@ class CreditsLimit(IntEnum):
310297

311298

312299
class WalletCreditsLimitReachedMessage(RabbitMessageBase):
313-
channel_name: Literal[
300+
channel_name: Literal["io.simcore.service.wallets-credit-limit-reached"] = (
314301
"io.simcore.service.wallets-credit-limit-reached"
315-
] = "io.simcore.service.wallets-credit-limit-reached"
302+
)
316303
created_at: datetime.datetime = Field(
317304
default_factory=lambda: arrow.utcnow().datetime,
318305
description="message creation datetime",

packages/models-library/tests/test_rabbit_messages.py

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from models_library.rabbitmq_messages import (
55
ProgressRabbitMessageNode,
66
ProgressRabbitMessageProject,
7-
ProgressRabbitMessageWorkerJob,
87
ProgressType,
98
)
109
from pydantic import TypeAdapter
@@ -36,22 +35,10 @@
3635
ProgressRabbitMessageProject,
3736
id="project_progress",
3837
),
39-
pytest.param(
40-
ProgressRabbitMessageWorkerJob(
41-
user_id=faker.pyint(min_value=1),
42-
progress_type=ProgressType.PROJECT_CLOSING,
43-
report=ProgressReport(actual_value=0.4, total=1),
44-
osparc_job_id=faker.pystr(),
45-
).model_dump_json(),
46-
ProgressRabbitMessageWorkerJob,
47-
id="worker_job_progress",
48-
),
4938
],
5039
)
5140
async def test_raw_message_parsing(raw_data: str, class_type: type):
5241
result = TypeAdapter(
53-
ProgressRabbitMessageNode
54-
| ProgressRabbitMessageProject
55-
| ProgressRabbitMessageWorkerJob
42+
ProgressRabbitMessageNode | ProgressRabbitMessageProject
5643
).validate_json(raw_data)
5744
assert type(result) is class_type

services/storage/src/simcore_service_storage/api/_worker_tasks/_data_export.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from models_library.users import UserID
88

99
from ...dsm import get_dsm_provider
10-
from ...modules.celery.utils import get_celery_worker, get_fastapi_app
10+
from ...modules.celery.utils import get_celery_worker_client, get_fastapi_app
1111
from ...simcore_s3_dsm import SimcoreS3DataManager
1212
from ._tqdm_utils import get_export_progress, set_absolute_progress
1313

@@ -33,7 +33,7 @@ async def data_export(
3333

3434
async def _progress_cb(report: ProgressReport) -> None:
3535
set_absolute_progress(pbar, current_progress=report.actual_value)
36-
await get_celery_worker(task.app).set_task_progress(task, report)
36+
await get_celery_worker_client(task.app).set_task_progress(task, report)
3737

3838
return await dsm.create_s3_export(
3939
user_id, paths_to_export, progress_cb=_progress_cb

services/storage/src/simcore_service_storage/modules/celery/signals.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
from ...modules.celery import get_event_loop, set_event_loop
1414
from ...modules.celery.utils import (
1515
get_fastapi_app,
16-
set_celery_worker,
16+
set_celery_worker_client,
1717
set_fastapi_app,
1818
)
19-
from ...modules.celery.worker import CeleryTaskQueueWorker
19+
from ...modules.celery.worker import CeleryWorkerClient
2020

2121
_logger = logging.getLogger(__name__)
2222

@@ -48,7 +48,7 @@ async def lifespan():
4848
set_event_loop(fastapi_app, loop)
4949

5050
set_fastapi_app(sender.app, fastapi_app)
51-
set_celery_worker(sender.app, CeleryTaskQueueWorker(sender.app))
51+
set_celery_worker_client(sender.app, CeleryWorkerClient(sender.app))
5252

5353
loop.run_forever()
5454

services/storage/src/simcore_service_storage/modules/celery/utils.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
from celery import Celery # type: ignore[import-untyped]
22
from fastapi import FastAPI
33

4-
from .worker import CeleryTaskQueueWorker
4+
from .worker import CeleryWorkerClient
55

6-
_WORKER_KEY = "celery_worker"
6+
_WORKER_KEY = "celery_worker_client"
77
_FASTAPI_APP_KEY = "fastapi_app"
88

99

10-
def get_celery_worker(celery_app: Celery) -> CeleryTaskQueueWorker:
10+
def get_celery_worker_client(celery_app: Celery) -> CeleryWorkerClient:
1111
worker = celery_app.conf[_WORKER_KEY]
12-
assert isinstance(worker, CeleryTaskQueueWorker)
12+
assert isinstance(worker, CeleryWorkerClient)
1313
return worker
1414

1515

@@ -19,7 +19,7 @@ def get_fastapi_app(celery_app: Celery) -> FastAPI:
1919
return fastapi_app
2020

2121

22-
def set_celery_worker(celery_app: Celery, worker: CeleryTaskQueueWorker) -> None:
22+
def set_celery_worker_client(celery_app: Celery, worker: CeleryWorkerClient) -> None:
2323
celery_app.conf[_WORKER_KEY] = worker
2424

2525

services/storage/src/simcore_service_storage/modules/celery/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
_logger = logging.getLogger(__name__)
99

1010

11-
class CeleryTaskQueueWorker:
11+
class CeleryWorkerClient:
1212
def __init__(self, celery_app: Celery) -> None:
1313
self.celery_app = celery_app
1414

services/storage/src/simcore_service_storage/modules/rabbitmq.py

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,7 @@
33
from typing import cast
44

55
from fastapi import FastAPI
6-
from models_library.osparc_jobs import OsparcJobId
7-
from models_library.progress_bar import ProgressReport
8-
from models_library.rabbitmq_messages import (
9-
ProgressRabbitMessageWorkerJob,
10-
ProgressType,
11-
RabbitMessageBase,
12-
)
13-
from models_library.users import UserID
6+
from models_library.rabbitmq_messages import RabbitMessageBase
147
from servicelib.logging_utils import log_catch, log_context
158
from servicelib.rabbitmq import (
169
RabbitMQClient,
@@ -77,16 +70,3 @@ async def post_message(app: FastAPI, message: RabbitMessageBase) -> None:
7770
with log_catch(_logger, reraise=False), suppress(ConfigurationError):
7871
# NOTE: if rabbitmq was not initialized the error does not need to flood the logs
7972
await get_rabbitmq_client(app).publish(message.channel_name, message)
80-
81-
82-
async def post_task_progress_message(
83-
app: FastAPI, user_id: UserID, osparc_job_id: OsparcJobId, report: ProgressReport
84-
) -> None:
85-
with log_catch(_logger, reraise=False):
86-
message = ProgressRabbitMessageWorkerJob.model_construct(
87-
user_id=user_id,
88-
osparc_job_id=osparc_job_id,
89-
progress_type=ProgressType.WORKER_JOB_EXPORTING,
90-
report=report,
91-
)
92-
await post_message(app, message)

services/storage/tests/unit/modules/celery/conftest.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
on_worker_init,
1414
on_worker_shutdown,
1515
)
16-
from simcore_service_storage.modules.celery.worker import CeleryTaskQueueWorker
16+
from simcore_service_storage.modules.celery.worker import CeleryWorkerClient
1717

1818

1919
@pytest.fixture
@@ -97,6 +97,6 @@ def celery_worker_controller(
9797
@pytest.fixture
9898
def celery_worker(
9999
celery_worker_controller: TestWorkController,
100-
) -> CeleryTaskQueueWorker:
100+
) -> CeleryWorkerClient:
101101
assert isinstance(celery_worker_controller.app, Celery)
102-
return CeleryTaskQueueWorker(celery_worker_controller.app)
102+
return CeleryWorkerClient(celery_worker_controller.app)

services/storage/tests/unit/modules/celery/test_celery.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,24 @@
2020
TaskState,
2121
)
2222
from simcore_service_storage.modules.celery.utils import (
23-
get_celery_worker,
23+
get_celery_worker_client,
2424
get_fastapi_app,
2525
)
2626
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed
2727

2828
_logger = logging.getLogger(__name__)
2929

3030

31-
async def _async_archive(
32-
celery_app: Celery, task_name: str, task_id: str, files: list[str]
33-
) -> str:
34-
worker = get_celery_worker(celery_app)
31+
async def _async_archive(celery_app: Celery, task: Task, files: list[str]) -> str:
32+
worker_client = get_celery_worker_client(celery_app)
3533

3634
def sleep_for(seconds: float) -> None:
3735
time.sleep(seconds)
3836

3937
for n, file in enumerate(files, start=1):
4038
with log_context(_logger, logging.INFO, msg=f"Processing file {file}"):
41-
worker.set_task_progress(
42-
task_name=task_name,
43-
task_id=task_id,
44-
report=ProgressReport(actual_value=n / len(files) * 10),
39+
await worker_client.set_task_progress(
40+
task, ProgressReport(actual_value=n / len(files) * 10)
4541
)
4642
await asyncio.get_event_loop().run_in_executor(None, sleep_for, 1)
4743

@@ -52,7 +48,7 @@ def sync_archive(task: Task, files: list[str]) -> str:
5248
assert task.name
5349
_logger.info("Calling async_archive")
5450
return asyncio.run_coroutine_threadsafe(
55-
_async_archive(task.app, task.name, task.request.id, files),
51+
_async_archive(task, task.request.id, files),
5652
get_event_loop(get_fastapi_app(task.app)),
5753
).result()
5854

0 commit comments

Comments
 (0)