Skip to content

Commit c219b03

Browse files
committed
@pcrespov decouple task models
1 parent 7c3ea75 commit c219b03

File tree

6 files changed

+85
-49
lines changed

6 files changed

+85
-49
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from datetime import datetime
2+
from typing import Any, TypeAlias
3+
from uuid import UUID
4+
5+
from pydantic import BaseModel, Field, PositiveFloat, model_validator
6+
from typing_extensions import Self
7+
8+
TaskRpcId: TypeAlias = UUID
9+
10+
11+
class TaskRpcStatus(BaseModel):
12+
task_id: TaskRpcId
13+
task_progress: PositiveFloat = Field(..., ge=0.0, le=1.0)
14+
done: bool
15+
started: datetime
16+
stopped: datetime | None
17+
18+
@model_validator(mode="after")
19+
def _check_consistency(self) -> Self:
20+
progress_done = self.task_progress == 1.0
21+
done_done = self.done == True
22+
stopped_done = self.stopped is not None
23+
24+
if (progress_done != done_done) or (done_done != stopped_done):
25+
raise ValueError(
26+
f"Inconsistent data: {self.task_progress=}, {self.done=}, {self.stopped=}"
27+
)
28+
return self
29+
30+
31+
class TaskRpcResult(BaseModel):
32+
result: Any | None
33+
error: Any | None
34+
35+
36+
class TaskRpcGet(BaseModel):
37+
task_id: TaskRpcId
38+
task_name: str
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# pylint: disable=R6301
22
from pathlib import Path
33

4-
from models_library.api_schemas_long_running_tasks.base import TaskId
4+
from models_library.api_schemas_rpc_long_running_tasks.tasks import TaskRpcId
55
from pydantic import BaseModel, Field
66

77

@@ -11,4 +11,4 @@ class ZipTaskStartInput(BaseModel):
1111

1212
class ZipTaskAbortOutput(BaseModel):
1313
result: bool
14-
task_id: TaskId
14+
task_id: TaskRpcId
Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
from typing import Final
22

3-
from models_library.api_schemas_long_running_tasks.base import TaskId
4-
from models_library.api_schemas_long_running_tasks.tasks import (
5-
TaskGet,
6-
TaskResult,
7-
TaskStatus,
3+
from models_library.api_schemas_rpc_long_running_tasks.tasks import (
4+
TaskRpcGet,
5+
TaskRpcResult,
6+
TaskRpcStatus,
87
)
98
from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE
109
from models_library.api_schemas_storage.zipping_tasks import (
@@ -13,6 +12,7 @@
1312
)
1413
from models_library.rabbitmq_basic_types import RPCMethodName
1514
from pydantic import NonNegativeInt, TypeAdapter
15+
from simcore_service_storage.api.rpc._zipping import TaskRpcId
1616

1717
from ... import RabbitMQRPCClient
1818

@@ -23,19 +23,19 @@
2323

2424
async def start_zipping(
2525
rabbitmq_rpc_client: RabbitMQRPCClient, *, paths: ZipTaskStartInput
26-
) -> TaskGet:
26+
) -> TaskRpcGet:
2727
result = await rabbitmq_rpc_client.request(
2828
STORAGE_RPC_NAMESPACE,
2929
_RPC_METHOD_NAME_ADAPTER.validate_python("start_zipping"),
3030
paths=paths,
3131
timeout_s=_DEFAULT_TIMEOUT_S,
3232
)
33-
assert isinstance(result, TaskGet)
33+
assert isinstance(result, TaskRpcGet)
3434
return result
3535

3636

3737
async def abort_zipping(
38-
rabbitmq_rpc_client: RabbitMQRPCClient, *, task_id: TaskId
38+
rabbitmq_rpc_client: RabbitMQRPCClient, *, task_id: TaskRpcId
3939
) -> ZipTaskAbortOutput:
4040
result = await rabbitmq_rpc_client.request(
4141
STORAGE_RPC_NAMESPACE,
@@ -48,26 +48,26 @@ async def abort_zipping(
4848

4949

5050
async def get_zipping_status(
51-
rabbitmq_rpc_client: RabbitMQRPCClient, *, task_id: TaskId
52-
) -> TaskStatus:
51+
rabbitmq_rpc_client: RabbitMQRPCClient, *, task_id: TaskRpcId
52+
) -> TaskRpcStatus:
5353
result = await rabbitmq_rpc_client.request(
5454
STORAGE_RPC_NAMESPACE,
5555
_RPC_METHOD_NAME_ADAPTER.validate_python("get_zipping_status"),
5656
task_id=task_id,
5757
timeout_s=_DEFAULT_TIMEOUT_S,
5858
)
59-
assert isinstance(result, TaskStatus)
59+
assert isinstance(result, TaskRpcStatus)
6060
return result
6161

6262

6363
async def get_zipping_result(
64-
rabbitmq_rpc_client: RabbitMQRPCClient, *, task_id: TaskId
65-
) -> TaskResult:
64+
rabbitmq_rpc_client: RabbitMQRPCClient, *, task_id: TaskRpcId
65+
) -> TaskRpcResult:
6666
result = await rabbitmq_rpc_client.request(
6767
STORAGE_RPC_NAMESPACE,
6868
_RPC_METHOD_NAME_ADAPTER.validate_python("get_zipping_result"),
6969
task_id=task_id,
7070
timeout_s=_DEFAULT_TIMEOUT_S,
7171
)
72-
assert isinstance(result, TaskResult)
72+
assert isinstance(result, TaskRpcResult)
7373
return result

services/storage/src/simcore_service_storage/api/rpc/_zipping.py

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,11 @@
22
from uuid import uuid4
33

44
from fastapi import FastAPI
5-
from models_library.api_schemas_long_running_tasks.base import (
6-
ProgressPercent,
7-
TaskId,
8-
TaskProgress,
9-
)
10-
from models_library.api_schemas_long_running_tasks.tasks import (
11-
TaskGet,
12-
TaskResult,
13-
TaskStatus,
5+
from models_library.api_schemas_rpc_long_running_tasks.tasks import (
6+
TaskRpcGet,
7+
TaskRpcId,
8+
TaskRpcResult,
9+
TaskRpcStatus,
1410
)
1511
from models_library.api_schemas_storage.zipping_tasks import (
1612
ZipTaskAbortOutput,
@@ -22,36 +18,34 @@
2218

2319

2420
@router.expose()
25-
async def start_zipping(app: FastAPI, paths: ZipTaskStartInput) -> TaskGet:
21+
async def start_zipping(app: FastAPI, paths: ZipTaskStartInput) -> TaskRpcGet:
2622
assert app # nosec
27-
return TaskGet(
28-
task_id=f"{uuid4()}",
23+
return TaskRpcGet(
24+
task_id=uuid4(),
2925
task_name=", ".join(str(p) for p in paths.paths),
30-
status_href="status_url",
31-
result_href="result url",
32-
abort_href="abort url",
3326
)
3427

3528

3629
@router.expose()
37-
async def abort_zipping(app: FastAPI, task_id: TaskId) -> ZipTaskAbortOutput:
30+
async def abort_zipping(app: FastAPI, task_id: TaskRpcId) -> ZipTaskAbortOutput:
3831
assert app # nosec
3932
return ZipTaskAbortOutput(result=True, task_id=task_id)
4033

4134

4235
@router.expose()
43-
async def get_zipping_status(app: FastAPI, task_id: TaskId) -> TaskStatus:
36+
async def get_zipping_status(app: FastAPI, task_id: TaskRpcId) -> TaskRpcStatus:
4437
assert app # nosec
45-
progress = TaskProgress(
38+
return TaskRpcStatus(
4639
task_id=task_id,
47-
message="Here's a status for you. You are welcome",
48-
percent=ProgressPercent(0.5),
40+
task_progress=0.5,
41+
done=False,
42+
started=datetime.now(),
43+
stopped=None,
4944
)
50-
return TaskStatus(task_progress=progress, done=False, started=datetime.now())
5145

5246

5347
@router.expose()
54-
async def get_zipping_result(app: FastAPI, task_id: TaskId) -> TaskResult:
48+
async def get_zipping_result(app: FastAPI, task_id: TaskRpcId) -> TaskRpcResult:
5549
assert app # nosec
5650
assert task_id # nosec
57-
return TaskResult(result="Here's your result.", error=None)
51+
return TaskRpcResult(result="Here's your result.", error=None)

services/storage/src/simcore_service_storage/core/application.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
APP_NAME,
2626
APP_STARTED_BANNER_MSG,
2727
)
28-
from ..api.rabbitmq_rpc.routes import setup_rpc_api_routes
2928
from ..api.rest.routes import setup_rest_api_routes
29+
from ..api.rpc.routes import setup_rpc_api_routes
3030
from ..dsm import setup_dsm
3131
from ..dsm_cleaner import setup_dsm_cleaner
3232
from ..exceptions.handlers import set_exception_handlers

services/storage/tests/unit/test_zipping.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@
66
import pytest
77
from faker import Faker
88
from fastapi import FastAPI
9-
from models_library.api_schemas_long_running_tasks.tasks import TaskGet, TaskStatus
9+
from models_library.api_schemas_rpc_long_running_tasks.tasks import (
10+
TaskRpcGet,
11+
TaskRpcId,
12+
TaskRpcResult,
13+
TaskRpcStatus,
14+
)
1015
from models_library.api_schemas_storage.zipping_tasks import (
1116
ZipTaskAbortOutput,
1217
ZipTaskStartInput,
@@ -17,7 +22,6 @@
1722
from servicelib.rabbitmq import RabbitMQRPCClient
1823
from servicelib.rabbitmq.rpc_interfaces.storage import zipping
1924
from settings_library.rabbit import RabbitSettings
20-
from simcore_service_storage.api.rabbitmq_rpc._zipping import TaskId, TaskResult
2125
from simcore_service_storage.core.settings import ApplicationSettings
2226

2327
pytest_plugins = [
@@ -73,24 +77,24 @@ async def test_start_zipping(rpc_client: RabbitMQRPCClient, faker: Faker):
7377
result = await zipping.start_zipping(
7478
rpc_client, paths=ZipTaskStartInput(paths=[Path(faker.file_path())])
7579
)
76-
assert isinstance(result, TaskGet)
80+
assert isinstance(result, TaskRpcGet)
7781

7882

7983
async def test_abort_zipping(rpc_client: RabbitMQRPCClient, faker: Faker):
80-
_task_id = TaskId(f"{faker.uuid4()}")
84+
_task_id = TaskRpcId(faker.uuid4())
8185
result = await zipping.abort_zipping(rpc_client, task_id=_task_id)
8286
assert isinstance(result, ZipTaskAbortOutput)
8387
assert result.task_id == _task_id
8488

8589

8690
async def test_get_zipping_status(rpc_client: RabbitMQRPCClient, faker: Faker):
87-
_task_id = TaskId(f"{faker.uuid4()}")
91+
_task_id = TaskRpcId(faker.uuid4())
8892
result = await zipping.get_zipping_status(rpc_client, task_id=_task_id)
89-
assert isinstance(result, TaskStatus)
90-
assert result.task_progress.task_id == _task_id
93+
assert isinstance(result, TaskRpcStatus)
94+
assert result.task_id == _task_id
9195

9296

9397
async def test_get_zipping_result(rpc_client: RabbitMQRPCClient, faker: Faker):
94-
_task_id = TaskId(f"{faker.uuid4()}")
98+
_task_id = TaskRpcId(faker.uuid4())
9599
result = await zipping.get_zipping_result(rpc_client, task_id=_task_id)
96-
assert isinstance(result, TaskResult)
100+
assert isinstance(result, TaskRpcResult)

0 commit comments

Comments
 (0)