Skip to content

Commit a8e106e

Browse files
committed
get rid of AsyncJobFilter
1 parent e0ed541 commit a8e106e

File tree

10 files changed

+114
-111
lines changed

10 files changed

+114
-111
lines changed

packages/celery-library/tests/unit/test_async_jobs.py

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from faker import Faker
1818
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
1919
AsyncJobGet,
20-
AsyncJobOwnerMetadata,
2120
)
2221
from models_library.api_schemas_rpc_async_jobs.exceptions import (
2322
JobError,
@@ -79,25 +78,23 @@ def product_name(faker: Faker) -> ProductName:
7978

8079
@router.expose()
8180
async def rpc_sync_job(
82-
task_manager: TaskManager, *, job_filter: AsyncJobOwnerMetadata, **kwargs: Any
81+
task_manager: TaskManager, *, owner_metadata: OwnerMetadata, **kwargs: Any
8382
) -> AsyncJobGet:
8483
task_name = sync_job.__name__
85-
task_filter = OwnerMetadata.model_validate(job_filter.model_dump())
8684
task_uuid = await task_manager.submit_task(
87-
ExecutionMetadata(name=task_name), owner_metadata=task_filter, **kwargs
85+
ExecutionMetadata(name=task_name), owner_metadata=owner_metadata, **kwargs
8886
)
8987

9088
return AsyncJobGet(job_id=task_uuid, job_name=task_name)
9189

9290

9391
@router.expose()
9492
async def rpc_async_job(
95-
task_manager: TaskManager, *, job_filter: AsyncJobOwnerMetadata, **kwargs: Any
93+
task_manager: TaskManager, *, owner_metadata: OwnerMetadata, **kwargs: Any
9694
) -> AsyncJobGet:
9795
task_name = async_job.__name__
98-
task_filter = OwnerMetadata.model_validate(job_filter.model_dump())
9996
task_uuid = await task_manager.submit_task(
100-
ExecutionMetadata(name=task_name), owner_metadata=task_filter, **kwargs
97+
ExecutionMetadata(name=task_name), owner_metadata=owner_metadata, **kwargs
10198
)
10299

103100
return AsyncJobGet(job_id=task_uuid, job_name=task_name)
@@ -158,15 +155,15 @@ async def _start_task_via_rpc(
158155
user_id: UserID,
159156
product_name: ProductName,
160157
**kwargs: Any,
161-
) -> tuple[AsyncJobGet, AsyncJobOwnerMetadata]:
162-
job_filter = AsyncJobOwnerMetadata(
158+
) -> tuple[AsyncJobGet, OwnerMetadata]:
159+
job_filter = OwnerMetadata(
163160
user_id=user_id, product_name=product_name, owner="pytest_client"
164161
)
165162
async_job_get = await async_jobs.submit(
166163
rabbitmq_rpc_client=client,
167164
rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE,
168165
method_name=rpc_task_name,
169-
job_filter=job_filter,
166+
owner_metadata=job_filter,
170167
**kwargs,
171168
)
172169
return async_job_get, job_filter
@@ -197,7 +194,7 @@ async def _wait_for_job(
197194
rpc_client: RabbitMQRPCClient,
198195
*,
199196
async_job_get: AsyncJobGet,
200-
job_filter: AsyncJobOwnerMetadata,
197+
owner_metadata: OwnerMetadata,
201198
stop_after: timedelta = timedelta(seconds=5),
202199
) -> None:
203200

@@ -212,7 +209,7 @@ async def _wait_for_job(
212209
rpc_client,
213210
rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE,
214211
job_id=async_job_get.job_id,
215-
job_filter=job_filter,
212+
owner_metadata=owner_metadata,
216213
)
217214
assert (
218215
result.done is True
@@ -246,7 +243,7 @@ async def test_async_jobs_workflow(
246243
exposed_rpc_start: str,
247244
payload: Any,
248245
):
249-
async_job_get, job_filter = await _start_task_via_rpc(
246+
async_job_get, owner_metadata = await _start_task_via_rpc(
250247
async_jobs_rabbitmq_rpc_client,
251248
rpc_task_name=exposed_rpc_start,
252249
user_id=user_id,
@@ -258,21 +255,21 @@ async def test_async_jobs_workflow(
258255
jobs = await async_jobs.list_jobs(
259256
async_jobs_rabbitmq_rpc_client,
260257
rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE,
261-
job_filter=job_filter,
258+
owner_metadata=owner_metadata,
262259
)
263260
assert len(jobs) > 0
264261

265262
await _wait_for_job(
266263
async_jobs_rabbitmq_rpc_client,
267264
async_job_get=async_job_get,
268-
job_filter=job_filter,
265+
owner_metadata=owner_metadata,
269266
)
270267

271268
async_job_result = await async_jobs.result(
272269
async_jobs_rabbitmq_rpc_client,
273270
rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE,
274271
job_id=async_job_get.job_id,
275-
job_filter=job_filter,
272+
owner_metadata=owner_metadata,
276273
)
277274
assert async_job_result.result == payload
278275

@@ -310,7 +307,7 @@ async def test_async_jobs_cancel(
310307
jobs = await async_jobs.list_jobs(
311308
async_jobs_rabbitmq_rpc_client,
312309
rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE,
313-
job_filter=job_filter,
310+
owner_metadata=job_filter,
314311
)
315312
assert async_job_get.job_id not in [job.job_id for job in jobs]
316313

@@ -319,15 +316,15 @@ async def test_async_jobs_cancel(
319316
async_jobs_rabbitmq_rpc_client,
320317
rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE,
321318
job_id=async_job_get.job_id,
322-
job_filter=job_filter,
319+
owner_metadata=job_filter,
323320
)
324321

325322
with pytest.raises(JobMissingError):
326323
await async_jobs.result(
327324
async_jobs_rabbitmq_rpc_client,
328325
rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE,
329326
job_id=async_job_get.job_id,
330-
job_filter=job_filter,
327+
owner_metadata=job_filter,
331328
)
332329

333330

@@ -357,7 +354,7 @@ async def test_async_jobs_raises(
357354
exposed_rpc_start: str,
358355
error: Exception,
359356
):
360-
async_job_get, job_filter = await _start_task_via_rpc(
357+
async_job_get, owner_metadata = await _start_task_via_rpc(
361358
async_jobs_rabbitmq_rpc_client,
362359
rpc_task_name=exposed_rpc_start,
363360
user_id=user_id,
@@ -369,7 +366,7 @@ async def test_async_jobs_raises(
369366
await _wait_for_job(
370367
async_jobs_rabbitmq_rpc_client,
371368
async_job_get=async_job_get,
372-
job_filter=job_filter,
369+
owner_metadata=owner_metadata,
373370
stop_after=timedelta(minutes=1),
374371
)
375372

@@ -378,7 +375,7 @@ async def test_async_jobs_raises(
378375
async_jobs_rabbitmq_rpc_client,
379376
rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE,
380377
job_id=async_job_get.job_id,
381-
job_filter=job_filter,
378+
owner_metadata=owner_metadata,
382379
)
383380
assert exc.value.exc_type == type(error).__name__
384381
assert exc.value.exc_msg == f"{error}"

packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
from typing import Annotated, Any, TypeAlias
22
from uuid import UUID
33

4-
from models_library.products import ProductName
5-
from models_library.users import UserID
64
from pydantic import BaseModel, ConfigDict, StringConstraints
75

86
from ..progress_bar import ProgressReport
@@ -42,23 +40,3 @@ class AsyncJobGet(BaseModel):
4240
class AsyncJobAbort(BaseModel):
4341
result: bool
4442
job_id: AsyncJobId
45-
46-
47-
class AsyncJobOwnerMetadata(BaseModel):
48-
"""Data for controlling access to an async job"""
49-
50-
model_config = ConfigDict(
51-
extra="allow",
52-
json_schema_extra={
53-
"examples": [
54-
{
55-
"product_name": "osparc",
56-
"user_id": 123,
57-
"owner": "web_client",
58-
}
59-
]
60-
},
61-
)
62-
user_id: UserID
63-
product_name: ProductName
64-
owner: Annotated[str, StringConstraints(min_length=1, pattern=r"^[a-z_-]+$")]

packages/pytest-simcore/src/pytest_simcore/helpers/async_jobs_server.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
66
AsyncJobGet,
77
AsyncJobId,
8-
AsyncJobOwnerMetadata,
98
AsyncJobResult,
109
AsyncJobStatus,
1110
)
@@ -14,6 +13,7 @@
1413
from models_library.rabbitmq_basic_types import RPCNamespace
1514
from pydantic import validate_call
1615
from pytest_mock import MockType
16+
from servicelib.celery.models import OwnerMetadata
1717
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
1818

1919

@@ -28,7 +28,7 @@ async def cancel(
2828
*,
2929
rpc_namespace: RPCNamespace,
3030
job_id: AsyncJobId,
31-
job_filter: AsyncJobOwnerMetadata,
31+
owner_metadata: OwnerMetadata,
3232
) -> None:
3333
if self.exception is not None:
3434
raise self.exception
@@ -41,7 +41,7 @@ async def status(
4141
*,
4242
rpc_namespace: RPCNamespace,
4343
job_id: AsyncJobId,
44-
job_filter: AsyncJobOwnerMetadata,
44+
owner_metadata: OwnerMetadata,
4545
) -> AsyncJobStatus:
4646
if self.exception is not None:
4747
raise self.exception
@@ -63,7 +63,7 @@ async def result(
6363
*,
6464
rpc_namespace: RPCNamespace,
6565
job_id: AsyncJobId,
66-
job_filter: AsyncJobOwnerMetadata,
66+
owner_metadata: OwnerMetadata,
6767
) -> AsyncJobResult:
6868
if self.exception is not None:
6969
raise self.exception
@@ -75,7 +75,7 @@ async def list_jobs(
7575
rabbitmq_rpc_client: RabbitMQRPCClient | MockType,
7676
*,
7777
rpc_namespace: RPCNamespace,
78-
job_filter: AsyncJobOwnerMetadata,
78+
job_filter: OwnerMetadata,
7979
filter_: str = "",
8080
) -> list[AsyncJobGet]:
8181
if self.exception is not None:

packages/pytest-simcore/src/pytest_simcore/helpers/storage_rpc_server.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010

1111
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
1212
AsyncJobGet,
13-
AsyncJobOwnerMetadata,
1413
)
1514
from models_library.api_schemas_webserver.storage import PathToExport
1615
from pydantic import TypeAdapter, validate_call
1716
from pytest_mock import MockType
17+
from servicelib.celery.models import OwnerMetadata
1818
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
1919

2020

@@ -27,18 +27,18 @@ async def start_export_data(
2727
*,
2828
paths_to_export: list[PathToExport],
2929
export_as: Literal["path", "download_link"],
30-
job_filter: AsyncJobOwnerMetadata,
31-
) -> tuple[AsyncJobGet, AsyncJobOwnerMetadata]:
30+
owner_metadata: OwnerMetadata,
31+
) -> tuple[AsyncJobGet, OwnerMetadata]:
3232
assert rabbitmq_rpc_client
33-
assert job_filter
33+
assert owner_metadata
3434
assert paths_to_export
3535
assert export_as
3636

3737
async_job_get = TypeAdapter(AsyncJobGet).validate_python(
3838
AsyncJobGet.model_json_schema()["examples"][0],
3939
)
40-
async_job_filter = TypeAdapter(AsyncJobOwnerMetadata).validate_python(
41-
AsyncJobOwnerMetadata.model_json_schema()["examples"][0],
40+
async_job_filter = TypeAdapter(OwnerMetadata).validate_python(
41+
OwnerMetadata.model_json_schema()["examples"][0],
4242
)
4343

4444
return async_job_get, async_job_filter

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE
77
from models_library.projects_nodes_io import LocationID
88
from models_library.rabbitmq_basic_types import RPCMethodName
9+
from models_library.users import UserID
910
from pydantic import TypeAdapter
1011

1112
from ....celery.models import OwnerMetadata
@@ -19,6 +20,7 @@ async def compute_path_size(
1920
location_id: LocationID,
2021
path: Path,
2122
owner_metadata: OwnerMetadata,
23+
user_id: UserID
2224
) -> tuple[AsyncJobGet, OwnerMetadata]:
2325
async_job_rpc_get = await submit(
2426
rabbitmq_rpc_client=client,
@@ -27,6 +29,7 @@ async def compute_path_size(
2729
owner_metadata=owner_metadata,
2830
location_id=location_id,
2931
path=path,
32+
user_id=user_id,
3033
)
3134
return async_job_rpc_get, owner_metadata
3235

@@ -37,6 +40,7 @@ async def delete_paths(
3740
location_id: LocationID,
3841
paths: set[Path],
3942
owner_metadata: OwnerMetadata,
43+
user_id: UserID
4044
) -> tuple[AsyncJobGet, OwnerMetadata]:
4145
async_job_rpc_get = await submit(
4246
rabbitmq_rpc_client=client,
@@ -45,5 +49,6 @@ async def delete_paths(
4549
owner_metadata=owner_metadata,
4650
location_id=location_id,
4751
paths=paths,
52+
user_id=user_id,
4853
)
4954
return async_job_rpc_get, owner_metadata

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/simcore_s3.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from models_library.api_schemas_storage.storage_schemas import FoldersBody
88
from models_library.api_schemas_webserver.storage import PathToExport
99
from models_library.rabbitmq_basic_types import RPCMethodName
10+
from models_library.users import UserID
1011
from pydantic import TypeAdapter
1112
from servicelib.celery.models import OwnerMetadata
1213

@@ -15,7 +16,11 @@
1516

1617

1718
async def copy_folders_from_project(
18-
client: RabbitMQRPCClient, *, body: FoldersBody, owner_metadata: OwnerMetadata
19+
client: RabbitMQRPCClient,
20+
*,
21+
body: FoldersBody,
22+
owner_metadata: OwnerMetadata,
23+
user_id: UserID
1924
) -> tuple[AsyncJobGet, OwnerMetadata]:
2025
async_job_rpc_get = await submit(
2126
rabbitmq_rpc_client=client,
@@ -25,6 +30,7 @@ async def copy_folders_from_project(
2530
),
2631
owner_metadata=owner_metadata,
2732
body=body,
33+
user_id=user_id,
2834
)
2935
return async_job_rpc_get, owner_metadata
3036

@@ -34,7 +40,8 @@ async def start_export_data(
3440
*,
3541
paths_to_export: list[PathToExport],
3642
export_as: Literal["path", "download_link"],
37-
owner_metadata: OwnerMetadata
43+
owner_metadata: OwnerMetadata,
44+
user_id: UserID
3845
) -> tuple[AsyncJobGet, OwnerMetadata]:
3946
async_job_rpc_get = await submit(
4047
rabbitmq_rpc_client,
@@ -43,5 +50,6 @@ async def start_export_data(
4350
owner_metadata=owner_metadata,
4451
paths_to_export=paths_to_export,
4552
export_as=export_as,
53+
user_id=user_id,
4654
)
4755
return async_job_rpc_get, owner_metadata

0 commit comments

Comments
 (0)