Skip to content

Commit 87c63ef

Browse files
committed
TaskFilter -> OwnerMetadata
1 parent d2cf678 commit 87c63ef

File tree

29 files changed

+196
-210
lines changed

29 files changed

+196
-210
lines changed

packages/celery-library/src/celery_library/backends/redis.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
from models_library.progress_bar import ProgressReport
77
from pydantic import ValidationError
88
from servicelib.celery.models import (
9+
ExecutionMetadata,
10+
OwnerMetadata,
911
Task,
10-
TaskExecutionMetadata,
1112
TaskID,
1213
TaskInfoStore,
13-
TaskOwnerMetadata,
1414
Wildcard,
1515
)
1616
from servicelib.redis import RedisClientSDK, handle_redis_returns_union_types
@@ -35,7 +35,7 @@ def __init__(self, redis_client_sdk: RedisClientSDK) -> None:
3535
async def create_task(
3636
self,
3737
task_id: TaskID,
38-
task_metadata: TaskExecutionMetadata,
38+
task_metadata: ExecutionMetadata,
3939
expiry: timedelta,
4040
) -> None:
4141
task_key = _build_key(task_id)
@@ -51,7 +51,7 @@ async def create_task(
5151
expiry,
5252
)
5353

54-
async def get_task_metadata(self, task_id: TaskID) -> TaskExecutionMetadata | None:
54+
async def get_task_metadata(self, task_id: TaskID) -> ExecutionMetadata | None:
5555
raw_result = await handle_redis_returns_union_types(
5656
self._redis_client_sdk.redis.hget(
5757
_build_key(task_id), _CELERY_TASK_METADATA_KEY
@@ -61,7 +61,7 @@ async def get_task_metadata(self, task_id: TaskID) -> TaskExecutionMetadata | No
6161
return None
6262

6363
try:
64-
return TaskExecutionMetadata.model_validate_json(raw_result)
64+
return ExecutionMetadata.model_validate_json(raw_result)
6565
except ValidationError as exc:
6666
_logger.debug(
6767
"Failed to deserialize task metadata for task %s: %s", task_id, f"{exc}"
@@ -85,7 +85,7 @@ async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
8585
)
8686
return None
8787

88-
async def list_tasks(self, task_filter: TaskOwnerMetadata) -> list[Task]:
88+
async def list_tasks(self, task_filter: OwnerMetadata) -> list[Task]:
8989
search_key = _CELERY_TASK_INFO_PREFIX + task_filter.create_task_id(
9090
task_uuid=Wildcard()
9191
)
@@ -112,10 +112,10 @@ async def list_tasks(self, task_filter: TaskOwnerMetadata) -> list[Task]:
112112
continue
113113

114114
with contextlib.suppress(ValidationError):
115-
task_metadata = TaskExecutionMetadata.model_validate_json(raw_metadata)
115+
task_metadata = ExecutionMetadata.model_validate_json(raw_metadata)
116116
tasks.append(
117117
Task(
118-
uuid=TaskOwnerMetadata.get_task_uuid(key),
118+
uuid=OwnerMetadata.get_task_uuid(key),
119119
metadata=task_metadata,
120120
)
121121
)

packages/celery-library/src/celery_library/rpc/_async_jobs.py

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
77
AsyncJobGet,
88
AsyncJobId,
9-
AsyncJobOwnerMetadata,
109
AsyncJobResult,
1110
AsyncJobStatus,
1211
)
@@ -17,7 +16,7 @@
1716
JobNotDoneError,
1817
JobSchedulerError,
1918
)
20-
from servicelib.celery.models import TaskOwnerMetadata, TaskState
19+
from servicelib.celery.models import OwnerMetadata, TaskState
2120
from servicelib.celery.task_manager import TaskManager
2221
from servicelib.logging_utils import log_catch
2322
from servicelib.rabbitmq import RPCRouter
@@ -34,14 +33,13 @@
3433

3534
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
3635
async def cancel(
37-
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobOwnerMetadata
36+
task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata
3837
):
3938
assert task_manager # nosec
40-
assert job_filter # nosec
41-
task_filter = TaskOwnerMetadata.model_validate(job_filter.model_dump())
39+
assert owner_metadata # nosec
4240
try:
4341
await task_manager.cancel_task(
44-
task_filter=task_filter,
42+
owner_metadata=owner_metadata,
4543
task_uuid=job_id,
4644
)
4745
except TaskNotFoundError as exc:
@@ -52,15 +50,14 @@ async def cancel(
5250

5351
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
5452
async def status(
55-
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobOwnerMetadata
53+
task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata
5654
) -> AsyncJobStatus:
5755
assert task_manager # nosec
58-
assert job_filter # nosec
56+
assert owner_metadata # nosec
5957

60-
task_filter = TaskOwnerMetadata.model_validate(job_filter.model_dump())
6158
try:
6259
task_status = await task_manager.get_task_status(
63-
task_filter=task_filter,
60+
owner_metadata=owner_metadata,
6461
task_uuid=job_id,
6562
)
6663
except TaskNotFoundError as exc:
@@ -85,23 +82,21 @@ async def status(
8582
)
8683
)
8784
async def result(
88-
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobOwnerMetadata
85+
task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata
8986
) -> AsyncJobResult:
9087
assert task_manager # nosec
9188
assert job_id # nosec
92-
assert job_filter # nosec
93-
94-
task_filter = TaskOwnerMetadata.model_validate(job_filter.model_dump())
89+
assert owner_metadata # nosec
9590

9691
try:
9792
_status = await task_manager.get_task_status(
98-
task_filter=task_filter,
93+
owner_metadata=owner_metadata,
9994
task_uuid=job_id,
10095
)
10196
if not _status.is_done:
10297
raise JobNotDoneError(job_id=job_id)
10398
_result = await task_manager.get_task_result(
104-
task_filter=task_filter,
99+
owner_metadata=owner_metadata,
105100
task_uuid=job_id,
106101
)
107102
except TaskNotFoundError as exc:
@@ -134,13 +129,12 @@ async def result(
134129

135130
@router.expose(reraise_if_error_type=(JobSchedulerError,))
136131
async def list_jobs(
137-
task_manager: TaskManager, job_filter: AsyncJobOwnerMetadata
132+
task_manager: TaskManager, owner_metadata: OwnerMetadata
138133
) -> list[AsyncJobGet]:
139134
assert task_manager # nosec
140-
task_filter = TaskOwnerMetadata.model_validate(job_filter.model_dump())
141135
try:
142136
tasks = await task_manager.list_tasks(
143-
task_filter=task_filter,
137+
owner_metadata=owner_metadata,
144138
)
145139
except CeleryError as exc:
146140
raise JobSchedulerError(exc=f"{exc}") from exc

packages/celery-library/src/celery_library/task_manager.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
from models_library.progress_bar import ProgressReport
1010
from servicelib.celery.models import (
1111
TASK_DONE_STATES,
12+
ExecutionMetadata,
13+
OwnerMetadata,
1214
Task,
13-
TaskExecutionMetadata,
1415
TaskID,
1516
TaskInfoStore,
16-
TaskOwnerMetadata,
1717
TaskState,
1818
TaskStatus,
1919
TaskUUID,
@@ -39,34 +39,34 @@ class CeleryTaskManager:
3939

4040
async def submit_task(
4141
self,
42-
task_metadata: TaskExecutionMetadata,
42+
execution_metadata: ExecutionMetadata,
4343
*,
44-
task_filter: TaskOwnerMetadata,
44+
owner_metadata: OwnerMetadata,
4545
**task_params,
4646
) -> TaskUUID:
4747
with log_context(
4848
_logger,
4949
logging.DEBUG,
50-
msg=f"Submit {task_metadata.name=}: {task_filter=} {task_params=}",
50+
msg=f"Submit {execution_metadata.name=}: {owner_metadata=} {task_params=}",
5151
):
5252
task_uuid = uuid4()
53-
task_id = task_filter.create_task_id(task_uuid=task_uuid)
53+
task_id = owner_metadata.create_task_id(task_uuid=task_uuid)
5454

5555
expiry = (
5656
self._celery_settings.CELERY_EPHEMERAL_RESULT_EXPIRES
57-
if task_metadata.ephemeral
57+
if execution_metadata.ephemeral
5858
else self._celery_settings.CELERY_RESULT_EXPIRES
5959
)
6060

6161
try:
6262
await self._task_info_store.create_task(
63-
task_id, task_metadata, expiry=expiry
63+
task_id, execution_metadata, expiry=expiry
6464
)
6565
self._celery_app.send_task(
66-
task_metadata.name,
66+
execution_metadata.name,
6767
task_id=task_id,
6868
kwargs={"task_id": task_id} | task_params,
69-
queue=task_metadata.queue.value,
69+
queue=execution_metadata.queue.value,
7070
)
7171
except CeleryError as exc:
7272
try:
@@ -78,22 +78,22 @@ async def submit_task(
7878
exc_info=True,
7979
)
8080
raise TaskSubmissionError(
81-
task_name=task_metadata.name,
81+
task_name=execution_metadata.name,
8282
task_id=task_id,
8383
task_params=task_params,
8484
) from exc
8585

8686
return task_uuid
8787

8888
async def cancel_task(
89-
self, task_filter: TaskOwnerMetadata, task_uuid: TaskUUID
89+
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
9090
) -> None:
9191
with log_context(
9292
_logger,
9393
logging.DEBUG,
94-
msg=f"task cancellation: {task_filter=} {task_uuid=}",
94+
msg=f"task cancellation: {owner_metadata=} {task_uuid=}",
9595
):
96-
task_id = task_filter.create_task_id(task_uuid=task_uuid)
96+
task_id = owner_metadata.create_task_id(task_uuid=task_uuid)
9797
if not await self.task_exists(task_id):
9898
raise TaskNotFoundError(task_id=task_id)
9999

@@ -108,14 +108,14 @@ def _forget_task(self, task_id: TaskID) -> None:
108108
self._celery_app.AsyncResult(task_id).forget()
109109

110110
async def get_task_result(
111-
self, task_filter: TaskOwnerMetadata, task_uuid: TaskUUID
111+
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
112112
) -> Any:
113113
with log_context(
114114
_logger,
115115
logging.DEBUG,
116-
msg=f"Get task result: {task_filter=} {task_uuid=}",
116+
msg=f"Get task result: {owner_metadata=} {task_uuid=}",
117117
):
118-
task_id = task_filter.create_task_id(task_uuid=task_uuid)
118+
task_id = owner_metadata.create_task_id(task_uuid=task_uuid)
119119
if not await self.task_exists(task_id):
120120
raise TaskNotFoundError(task_id=task_id)
121121

@@ -151,14 +151,14 @@ def _get_task_celery_state(self, task_id: TaskID) -> TaskState:
151151
return TaskState(self._celery_app.AsyncResult(task_id).state)
152152

153153
async def get_task_status(
154-
self, task_filter: TaskOwnerMetadata, task_uuid: TaskUUID
154+
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
155155
) -> TaskStatus:
156156
with log_context(
157157
_logger,
158158
logging.DEBUG,
159-
msg=f"Getting task status: {task_filter=} {task_uuid=}",
159+
msg=f"Getting task status: {owner_metadata=} {task_uuid=}",
160160
):
161-
task_id = task_filter.create_task_id(task_uuid=task_uuid)
161+
task_id = owner_metadata.create_task_id(task_uuid=task_uuid)
162162
if not await self.task_exists(task_id):
163163
raise TaskNotFoundError(task_id=task_id)
164164

@@ -171,13 +171,13 @@ async def get_task_status(
171171
),
172172
)
173173

174-
async def list_tasks(self, task_filter: TaskOwnerMetadata) -> list[Task]:
174+
async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
175175
with log_context(
176176
_logger,
177177
logging.DEBUG,
178-
msg=f"Listing tasks: {task_filter=}",
178+
msg=f"Listing tasks: {owner_metadata=}",
179179
):
180-
return await self._task_info_store.list_tasks(task_filter)
180+
return await self._task_info_store.list_tasks(owner_metadata)
181181

182182
async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> None:
183183
await self._task_info_store.set_task_progress(

packages/celery-library/tests/unit/test_async_jobs.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from models_library.rabbitmq_basic_types import RPCNamespace
2828
from models_library.users import UserID
2929
from pydantic import TypeAdapter
30-
from servicelib.celery.models import TaskExecutionMetadata, TaskID, TaskOwnerMetadata
30+
from servicelib.celery.models import ExecutionMetadata, OwnerMetadata, TaskID
3131
from servicelib.celery.task_manager import TaskManager
3232
from servicelib.rabbitmq import RabbitMQRPCClient, RPCRouter
3333
from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs
@@ -82,9 +82,9 @@ async def rpc_sync_job(
8282
task_manager: TaskManager, *, job_filter: AsyncJobOwnerMetadata, **kwargs: Any
8383
) -> AsyncJobGet:
8484
task_name = sync_job.__name__
85-
task_filter = TaskOwnerMetadata.model_validate(job_filter.model_dump())
85+
task_filter = OwnerMetadata.model_validate(job_filter.model_dump())
8686
task_uuid = await task_manager.submit_task(
87-
TaskExecutionMetadata(name=task_name), task_filter=task_filter, **kwargs
87+
ExecutionMetadata(name=task_name), task_filter=task_filter, **kwargs
8888
)
8989

9090
return AsyncJobGet(job_id=task_uuid, job_name=task_name)
@@ -95,9 +95,9 @@ async def rpc_async_job(
9595
task_manager: TaskManager, *, job_filter: AsyncJobOwnerMetadata, **kwargs: Any
9696
) -> AsyncJobGet:
9797
task_name = async_job.__name__
98-
task_filter = TaskOwnerMetadata.model_validate(job_filter.model_dump())
98+
task_filter = OwnerMetadata.model_validate(job_filter.model_dump())
9999
task_uuid = await task_manager.submit_task(
100-
TaskExecutionMetadata(name=task_name), task_filter=task_filter, **kwargs
100+
ExecutionMetadata(name=task_name), task_filter=task_filter, **kwargs
101101
)
102102

103103
return AsyncJobGet(job_id=task_uuid, job_name=task_name)
@@ -160,7 +160,7 @@ async def _start_task_via_rpc(
160160
**kwargs: Any,
161161
) -> tuple[AsyncJobGet, AsyncJobOwnerMetadata]:
162162
job_filter = AsyncJobOwnerMetadata(
163-
user_id=user_id, product_name=product_name, task_owner="pytest_client"
163+
user_id=user_id, product_name=product_name, owner="pytest_client"
164164
)
165165
async_job_get = await async_jobs.submit(
166166
rabbitmq_rpc_client=client,
@@ -304,7 +304,7 @@ async def test_async_jobs_cancel(
304304
async_jobs_rabbitmq_rpc_client,
305305
rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE,
306306
job_id=async_job_get.job_id,
307-
job_filter=job_filter,
307+
owner_metadata=job_filter,
308308
)
309309

310310
jobs = await async_jobs.list_jobs(

0 commit comments

Comments
 (0)