Skip to content

Commit 26abc99

Browse files
committed
improve naming
1 parent 51ef580 commit 26abc99

File tree

27 files changed

+193
-163
lines changed

27 files changed

+193
-163
lines changed

packages/celery-library/src/celery_library/backends/redis.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
from pydantic import ValidationError
88
from servicelib.celery.models import (
99
Task,
10-
TaskFilter,
10+
TaskExecutionMetadata,
1111
TaskID,
1212
TaskInfoStore,
13-
TaskMetadata,
13+
TaskOwnerMetadata,
1414
Wildcard,
1515
)
1616
from servicelib.redis import RedisClientSDK, handle_redis_returns_union_types
@@ -35,7 +35,7 @@ def __init__(self, redis_client_sdk: RedisClientSDK) -> None:
3535
async def create_task(
3636
self,
3737
task_id: TaskID,
38-
task_metadata: TaskMetadata,
38+
task_metadata: TaskExecutionMetadata,
3939
expiry: timedelta,
4040
) -> None:
4141
task_key = _build_key(task_id)
@@ -51,7 +51,7 @@ async def create_task(
5151
expiry,
5252
)
5353

54-
async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None:
54+
async def get_task_metadata(self, task_id: TaskID) -> TaskExecutionMetadata | None:
5555
raw_result = await handle_redis_returns_union_types(
5656
self._redis_client_sdk.redis.hget(
5757
_build_key(task_id), _CELERY_TASK_METADATA_KEY
@@ -61,7 +61,7 @@ async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None:
6161
return None
6262

6363
try:
64-
return TaskMetadata.model_validate_json(raw_result)
64+
return TaskExecutionMetadata.model_validate_json(raw_result)
6565
except ValidationError as exc:
6666
_logger.debug(
6767
"Failed to deserialize task metadata for task %s: %s", task_id, f"{exc}"
@@ -85,7 +85,7 @@ async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
8585
)
8686
return None
8787

88-
async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
88+
async def list_tasks(self, task_filter: TaskOwnerMetadata) -> list[Task]:
8989
search_key = _CELERY_TASK_INFO_PREFIX + task_filter.create_task_id(
9090
task_uuid=Wildcard()
9191
)
@@ -112,10 +112,10 @@ async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
112112
continue
113113

114114
with contextlib.suppress(ValidationError):
115-
task_metadata = TaskMetadata.model_validate_json(raw_metadata)
115+
task_metadata = TaskExecutionMetadata.model_validate_json(raw_metadata)
116116
tasks.append(
117117
Task(
118-
uuid=TaskFilter.get_task_uuid(key),
118+
uuid=TaskOwnerMetadata.get_task_uuid(key),
119119
metadata=task_metadata,
120120
)
121121
)

packages/celery-library/src/celery_library/rpc/_async_jobs.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44

55
from celery.exceptions import CeleryError # type: ignore[import-untyped]
66
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
7-
AsyncJobFilter,
87
AsyncJobGet,
98
AsyncJobId,
9+
AsyncJobOwnerMetadata,
1010
AsyncJobResult,
1111
AsyncJobStatus,
1212
)
@@ -17,7 +17,7 @@
1717
JobNotDoneError,
1818
JobSchedulerError,
1919
)
20-
from servicelib.celery.models import TaskFilter, TaskState
20+
from servicelib.celery.models import TaskOwnerMetadata, TaskState
2121
from servicelib.celery.task_manager import TaskManager
2222
from servicelib.logging_utils import log_catch
2323
from servicelib.rabbitmq import RPCRouter
@@ -34,11 +34,11 @@
3434

3535
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
3636
async def cancel(
37-
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
37+
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobOwnerMetadata
3838
):
3939
assert task_manager # nosec
4040
assert job_filter # nosec
41-
task_filter = TaskFilter.model_validate(job_filter.model_dump())
41+
task_filter = TaskOwnerMetadata.model_validate(job_filter.model_dump())
4242
try:
4343
await task_manager.cancel_task(
4444
task_filter=task_filter,
@@ -52,12 +52,12 @@ async def cancel(
5252

5353
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
5454
async def status(
55-
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
55+
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobOwnerMetadata
5656
) -> AsyncJobStatus:
5757
assert task_manager # nosec
5858
assert job_filter # nosec
5959

60-
task_filter = TaskFilter.model_validate(job_filter.model_dump())
60+
task_filter = TaskOwnerMetadata.model_validate(job_filter.model_dump())
6161
try:
6262
task_status = await task_manager.get_task_status(
6363
task_filter=task_filter,
@@ -85,13 +85,13 @@ async def status(
8585
)
8686
)
8787
async def result(
88-
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
88+
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobOwnerMetadata
8989
) -> AsyncJobResult:
9090
assert task_manager # nosec
9191
assert job_id # nosec
9292
assert job_filter # nosec
9393

94-
task_filter = TaskFilter.model_validate(job_filter.model_dump())
94+
task_filter = TaskOwnerMetadata.model_validate(job_filter.model_dump())
9595

9696
try:
9797
_status = await task_manager.get_task_status(
@@ -134,10 +134,10 @@ async def result(
134134

135135
@router.expose(reraise_if_error_type=(JobSchedulerError,))
136136
async def list_jobs(
137-
task_manager: TaskManager, job_filter: AsyncJobFilter
137+
task_manager: TaskManager, job_filter: AsyncJobOwnerMetadata
138138
) -> list[AsyncJobGet]:
139139
assert task_manager # nosec
140-
task_filter = TaskFilter.model_validate(job_filter.model_dump())
140+
task_filter = TaskOwnerMetadata.model_validate(job_filter.model_dump())
141141
try:
142142
tasks = await task_manager.list_tasks(
143143
task_filter=task_filter,

packages/celery-library/src/celery_library/task_manager.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@
1010
from servicelib.celery.models import (
1111
TASK_DONE_STATES,
1212
Task,
13-
TaskFilter,
13+
TaskExecutionMetadata,
1414
TaskID,
1515
TaskInfoStore,
16-
TaskMetadata,
16+
TaskOwnerMetadata,
1717
TaskState,
1818
TaskStatus,
1919
TaskUUID,
@@ -39,9 +39,9 @@ class CeleryTaskManager:
3939

4040
async def submit_task(
4141
self,
42-
task_metadata: TaskMetadata,
42+
task_metadata: TaskExecutionMetadata,
4343
*,
44-
task_filter: TaskFilter,
44+
task_filter: TaskOwnerMetadata,
4545
**task_params,
4646
) -> TaskUUID:
4747
with log_context(
@@ -85,7 +85,9 @@ async def submit_task(
8585

8686
return task_uuid
8787

88-
async def cancel_task(self, task_filter: TaskFilter, task_uuid: TaskUUID) -> None:
88+
async def cancel_task(
89+
self, task_filter: TaskOwnerMetadata, task_uuid: TaskUUID
90+
) -> None:
8991
with log_context(
9092
_logger,
9193
logging.DEBUG,
@@ -106,7 +108,7 @@ def _forget_task(self, task_id: TaskID) -> None:
106108
self._celery_app.AsyncResult(task_id).forget()
107109

108110
async def get_task_result(
109-
self, task_filter: TaskFilter, task_uuid: TaskUUID
111+
self, task_filter: TaskOwnerMetadata, task_uuid: TaskUUID
110112
) -> Any:
111113
with log_context(
112114
_logger,
@@ -149,7 +151,7 @@ def _get_task_celery_state(self, task_id: TaskID) -> TaskState:
149151
return TaskState(self._celery_app.AsyncResult(task_id).state)
150152

151153
async def get_task_status(
152-
self, task_filter: TaskFilter, task_uuid: TaskUUID
154+
self, task_filter: TaskOwnerMetadata, task_uuid: TaskUUID
153155
) -> TaskStatus:
154156
with log_context(
155157
_logger,
@@ -169,7 +171,7 @@ async def get_task_status(
169171
),
170172
)
171173

172-
async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
174+
async def list_tasks(self, task_filter: TaskOwnerMetadata) -> list[Task]:
173175
with log_context(
174176
_logger,
175177
logging.DEBUG,

packages/celery-library/tests/unit/test_async_jobs.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
from common_library.errors_classes import OsparcErrorMixin
1717
from faker import Faker
1818
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
19-
AsyncJobFilter,
2019
AsyncJobGet,
20+
AsyncJobOwnerMetadata,
2121
)
2222
from models_library.api_schemas_rpc_async_jobs.exceptions import (
2323
JobError,
@@ -27,7 +27,7 @@
2727
from models_library.rabbitmq_basic_types import RPCNamespace
2828
from models_library.users import UserID
2929
from pydantic import TypeAdapter
30-
from servicelib.celery.models import TaskFilter, TaskID, TaskMetadata
30+
from servicelib.celery.models import TaskExecutionMetadata, TaskID, TaskOwnerMetadata
3131
from servicelib.celery.task_manager import TaskManager
3232
from servicelib.rabbitmq import RabbitMQRPCClient, RPCRouter
3333
from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs
@@ -79,25 +79,25 @@ def product_name(faker: Faker) -> ProductName:
7979

8080
@router.expose()
8181
async def rpc_sync_job(
82-
task_manager: TaskManager, *, job_filter: AsyncJobFilter, **kwargs: Any
82+
task_manager: TaskManager, *, job_filter: AsyncJobOwnerMetadata, **kwargs: Any
8383
) -> AsyncJobGet:
8484
task_name = sync_job.__name__
85-
task_filter = TaskFilter.model_validate(job_filter.model_dump())
85+
task_filter = TaskOwnerMetadata.model_validate(job_filter.model_dump())
8686
task_uuid = await task_manager.submit_task(
87-
TaskMetadata(name=task_name), task_filter=task_filter, **kwargs
87+
TaskExecutionMetadata(name=task_name), task_filter=task_filter, **kwargs
8888
)
8989

9090
return AsyncJobGet(job_id=task_uuid, job_name=task_name)
9191

9292

9393
@router.expose()
9494
async def rpc_async_job(
95-
task_manager: TaskManager, *, job_filter: AsyncJobFilter, **kwargs: Any
95+
task_manager: TaskManager, *, job_filter: AsyncJobOwnerMetadata, **kwargs: Any
9696
) -> AsyncJobGet:
9797
task_name = async_job.__name__
98-
task_filter = TaskFilter.model_validate(job_filter.model_dump())
98+
task_filter = TaskOwnerMetadata.model_validate(job_filter.model_dump())
9999
task_uuid = await task_manager.submit_task(
100-
TaskMetadata(name=task_name), task_filter=task_filter, **kwargs
100+
TaskExecutionMetadata(name=task_name), task_filter=task_filter, **kwargs
101101
)
102102

103103
return AsyncJobGet(job_id=task_uuid, job_name=task_name)
@@ -158,8 +158,8 @@ async def _start_task_via_rpc(
158158
user_id: UserID,
159159
product_name: ProductName,
160160
**kwargs: Any,
161-
) -> tuple[AsyncJobGet, AsyncJobFilter]:
162-
job_filter = AsyncJobFilter(
161+
) -> tuple[AsyncJobGet, AsyncJobOwnerMetadata]:
162+
job_filter = AsyncJobOwnerMetadata(
163163
user_id=user_id, product_name=product_name, task_owner="pytest_client"
164164
)
165165
async_job_get = await async_jobs.submit(
@@ -197,7 +197,7 @@ async def _wait_for_job(
197197
rpc_client: RabbitMQRPCClient,
198198
*,
199199
async_job_get: AsyncJobGet,
200-
job_filter: AsyncJobFilter,
200+
job_filter: AsyncJobOwnerMetadata,
201201
stop_after: timedelta = timedelta(seconds=5),
202202
) -> None:
203203

packages/celery-library/tests/unit/test_tasks.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
from faker import Faker
2222
from models_library.progress_bar import ProgressReport
2323
from servicelib.celery.models import (
24-
TaskFilter,
24+
TaskExecutionMetadata,
2525
TaskID,
26-
TaskMetadata,
26+
TaskOwnerMetadata,
2727
TaskState,
2828
TaskUUID,
2929
Wildcard,
@@ -39,7 +39,7 @@
3939
pytest_simcore_ops_services_selection = []
4040

4141

42-
class MyTaskFilter(TaskFilter):
42+
class MyTaskFilter(TaskOwnerMetadata):
4343
user_id: int
4444

4545

@@ -106,7 +106,7 @@ async def test_submitting_task_calling_async_function_results_with_success_state
106106
task_filter = MyTaskFilter(user_id=42)
107107

108108
task_uuid = await celery_task_manager.submit_task(
109-
TaskMetadata(
109+
TaskExecutionMetadata(
110110
name=fake_file_processor.__name__,
111111
),
112112
task_filter=task_filter,
@@ -137,7 +137,7 @@ async def test_submitting_task_with_failure_results_with_error(
137137
task_filter = MyTaskFilter(user_id=42)
138138

139139
task_uuid = await celery_task_manager.submit_task(
140-
TaskMetadata(
140+
TaskExecutionMetadata(
141141
name=failure_task.__name__,
142142
),
143143
task_filter=task_filter,
@@ -166,7 +166,7 @@ async def test_cancelling_a_running_task_aborts_and_deletes(
166166
task_filter = MyTaskFilter(user_id=42)
167167

168168
task_uuid = await celery_task_manager.submit_task(
169-
TaskMetadata(
169+
TaskExecutionMetadata(
170170
name=dreamer_task.__name__,
171171
),
172172
task_filter=task_filter,
@@ -189,7 +189,7 @@ async def test_listing_task_uuids_contains_submitted_task(
189189
task_filter = MyTaskFilter(user_id=42)
190190

191191
task_uuid = await celery_task_manager.submit_task(
192-
TaskMetadata(
192+
TaskExecutionMetadata(
193193
name=dreamer_task.__name__,
194194
),
195195
task_filter=task_filter,
@@ -212,7 +212,7 @@ async def test_filtering_listing_tasks(
212212
celery_task_manager: CeleryTaskManager,
213213
with_celery_worker: WorkController,
214214
):
215-
class MyFilter(TaskFilter):
215+
class MyFilter(TaskOwnerMetadata):
216216
user_id: int
217217
product_name: str | Wildcard
218218
client_app: str | Wildcard
@@ -229,7 +229,7 @@ class MyFilter(TaskFilter):
229229
client_app=_faker.word(),
230230
)
231231
task_uuid = await celery_task_manager.submit_task(
232-
TaskMetadata(
232+
TaskExecutionMetadata(
233233
name=dreamer_task.__name__,
234234
),
235235
task_filter=task_filter,
@@ -244,7 +244,7 @@ class MyFilter(TaskFilter):
244244
client_app=_faker.word(),
245245
)
246246
task_uuid = await celery_task_manager.submit_task(
247-
TaskMetadata(
247+
TaskExecutionMetadata(
248248
name=dreamer_task.__name__,
249249
),
250250
task_filter=task_filter,

packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from typing import Annotated, Any, TypeAlias
22
from uuid import UUID
33

4+
from models_library.products import ProductName
5+
from models_library.users import UserID
46
from pydantic import BaseModel, ConfigDict, StringConstraints
57

68
from ..progress_bar import ProgressReport
@@ -42,7 +44,7 @@ class AsyncJobAbort(BaseModel):
4244
job_id: AsyncJobId
4345

4446

45-
class AsyncJobFilter(BaseModel):
47+
class AsyncJobOwnerMetadata(BaseModel):
4648
"""Data for controlling access to an async job"""
4749

4850
model_config = ConfigDict(
@@ -57,5 +59,6 @@ class AsyncJobFilter(BaseModel):
5759
]
5860
},
5961
)
60-
62+
user_id: UserID
63+
product_name: ProductName
6164
task_owner: Annotated[str, StringConstraints(min_length=1, pattern=r"^[a-z_-]+$")]

0 commit comments

Comments
 (0)