Skip to content

Commit 0c39aa2

Browse files
authored
Merge branch 'master' into is40/refactoring-confirmation-repository
2 parents e1bff2b + dd8edee commit 0c39aa2

File tree

34 files changed

+602
-481
lines changed

34 files changed

+602
-481
lines changed

packages/celery-library/src/celery_library/backends/redis.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
from models_library.progress_bar import ProgressReport
77
from pydantic import ValidationError
88
from servicelib.celery.models import (
9+
WILDCARD,
10+
ExecutionMetadata,
11+
OwnerMetadata,
912
Task,
10-
TaskFilter,
1113
TaskID,
1214
TaskInfoStore,
13-
TaskMetadata,
14-
Wildcard,
1515
)
1616
from servicelib.redis import RedisClientSDK, handle_redis_returns_union_types
1717

@@ -35,23 +35,23 @@ def __init__(self, redis_client_sdk: RedisClientSDK) -> None:
3535
async def create_task(
3636
self,
3737
task_id: TaskID,
38-
task_metadata: TaskMetadata,
38+
execution_metadata: ExecutionMetadata,
3939
expiry: timedelta,
4040
) -> None:
4141
task_key = _build_key(task_id)
4242
await handle_redis_returns_union_types(
4343
self._redis_client_sdk.redis.hset(
4444
name=task_key,
4545
key=_CELERY_TASK_METADATA_KEY,
46-
value=task_metadata.model_dump_json(),
46+
value=execution_metadata.model_dump_json(),
4747
)
4848
)
4949
await self._redis_client_sdk.redis.expire(
5050
task_key,
5151
expiry,
5252
)
5353

54-
async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None:
54+
async def get_task_metadata(self, task_id: TaskID) -> ExecutionMetadata | None:
5555
raw_result = await handle_redis_returns_union_types(
5656
self._redis_client_sdk.redis.hget(
5757
_build_key(task_id), _CELERY_TASK_METADATA_KEY
@@ -61,7 +61,7 @@ async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None:
6161
return None
6262

6363
try:
64-
return TaskMetadata.model_validate_json(raw_result)
64+
return ExecutionMetadata.model_validate_json(raw_result)
6565
except ValidationError as exc:
6666
_logger.debug(
6767
"Failed to deserialize task metadata for task %s: %s", task_id, f"{exc}"
@@ -85,9 +85,9 @@ async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
8585
)
8686
return None
8787

88-
async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
89-
search_key = _CELERY_TASK_INFO_PREFIX + task_filter.create_task_id(
90-
task_uuid=Wildcard()
88+
async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
89+
search_key = _CELERY_TASK_INFO_PREFIX + owner_metadata.model_dump_task_id(
90+
task_uuid=WILDCARD
9191
)
9292

9393
keys: list[str] = []
@@ -112,11 +112,11 @@ async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
112112
continue
113113

114114
with contextlib.suppress(ValidationError):
115-
task_metadata = TaskMetadata.model_validate_json(raw_metadata)
115+
execution_metadata = ExecutionMetadata.model_validate_json(raw_metadata)
116116
tasks.append(
117117
Task(
118-
uuid=TaskFilter.get_task_uuid(key),
119-
metadata=task_metadata,
118+
uuid=OwnerMetadata.get_task_uuid(key),
119+
metadata=execution_metadata,
120120
)
121121
)
122122

packages/celery-library/src/celery_library/rpc/_async_jobs.py

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
from celery.exceptions import CeleryError # type: ignore[import-untyped]
66
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
7-
AsyncJobFilter,
87
AsyncJobGet,
98
AsyncJobId,
109
AsyncJobResult,
@@ -17,7 +16,7 @@
1716
JobNotDoneError,
1817
JobSchedulerError,
1918
)
20-
from servicelib.celery.models import TaskFilter, TaskState
19+
from servicelib.celery.models import OwnerMetadata, TaskState
2120
from servicelib.celery.task_manager import TaskManager
2221
from servicelib.logging_utils import log_catch
2322
from servicelib.rabbitmq import RPCRouter
@@ -34,14 +33,13 @@
3433

3534
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
3635
async def cancel(
37-
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
36+
task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata
3837
):
3938
assert task_manager # nosec
40-
assert job_filter # nosec
41-
task_filter = TaskFilter.model_validate(job_filter.model_dump())
39+
assert owner_metadata # nosec
4240
try:
4341
await task_manager.cancel_task(
44-
task_filter=task_filter,
42+
owner_metadata=owner_metadata,
4543
task_uuid=job_id,
4644
)
4745
except TaskNotFoundError as exc:
@@ -52,15 +50,14 @@ async def cancel(
5250

5351
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
5452
async def status(
55-
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
53+
task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata
5654
) -> AsyncJobStatus:
5755
assert task_manager # nosec
58-
assert job_filter # nosec
56+
assert owner_metadata # nosec
5957

60-
task_filter = TaskFilter.model_validate(job_filter.model_dump())
6158
try:
6259
task_status = await task_manager.get_task_status(
63-
task_filter=task_filter,
60+
owner_metadata=owner_metadata,
6461
task_uuid=job_id,
6562
)
6663
except TaskNotFoundError as exc:
@@ -85,23 +82,21 @@ async def status(
8582
)
8683
)
8784
async def result(
88-
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
85+
task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata
8986
) -> AsyncJobResult:
9087
assert task_manager # nosec
9188
assert job_id # nosec
92-
assert job_filter # nosec
93-
94-
task_filter = TaskFilter.model_validate(job_filter.model_dump())
89+
assert owner_metadata # nosec
9590

9691
try:
9792
_status = await task_manager.get_task_status(
98-
task_filter=task_filter,
93+
owner_metadata=owner_metadata,
9994
task_uuid=job_id,
10095
)
10196
if not _status.is_done:
10297
raise JobNotDoneError(job_id=job_id)
10398
_result = await task_manager.get_task_result(
104-
task_filter=task_filter,
99+
owner_metadata=owner_metadata,
105100
task_uuid=job_id,
106101
)
107102
except TaskNotFoundError as exc:
@@ -134,13 +129,12 @@ async def result(
134129

135130
@router.expose(reraise_if_error_type=(JobSchedulerError,))
136131
async def list_jobs(
137-
task_manager: TaskManager, job_filter: AsyncJobFilter
132+
task_manager: TaskManager, owner_metadata: OwnerMetadata
138133
) -> list[AsyncJobGet]:
139134
assert task_manager # nosec
140-
task_filter = TaskFilter.model_validate(job_filter.model_dump())
141135
try:
142136
tasks = await task_manager.list_tasks(
143-
task_filter=task_filter,
137+
owner_metadata=owner_metadata,
144138
)
145139
except CeleryError as exc:
146140
raise JobSchedulerError(exc=f"{exc}") from exc

packages/celery-library/src/celery_library/task_manager.py

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
from models_library.progress_bar import ProgressReport
1010
from servicelib.celery.models import (
1111
TASK_DONE_STATES,
12+
ExecutionMetadata,
13+
OwnerMetadata,
1214
Task,
13-
TaskFilter,
1415
TaskID,
1516
TaskInfoStore,
16-
TaskMetadata,
1717
TaskState,
1818
TaskStatus,
1919
TaskUUID,
@@ -39,34 +39,34 @@ class CeleryTaskManager:
3939

4040
async def submit_task(
4141
self,
42-
task_metadata: TaskMetadata,
42+
execution_metadata: ExecutionMetadata,
4343
*,
44-
task_filter: TaskFilter,
44+
owner_metadata: OwnerMetadata,
4545
**task_params,
4646
) -> TaskUUID:
4747
with log_context(
4848
_logger,
4949
logging.DEBUG,
50-
msg=f"Submit {task_metadata.name=}: {task_filter=} {task_params=}",
50+
msg=f"Submit {execution_metadata.name=}: {owner_metadata=} {task_params=}",
5151
):
5252
task_uuid = uuid4()
53-
task_id = task_filter.create_task_id(task_uuid=task_uuid)
53+
task_id = owner_metadata.model_dump_task_id(task_uuid=task_uuid)
5454

5555
expiry = (
5656
self._celery_settings.CELERY_EPHEMERAL_RESULT_EXPIRES
57-
if task_metadata.ephemeral
57+
if execution_metadata.ephemeral
5858
else self._celery_settings.CELERY_RESULT_EXPIRES
5959
)
6060

6161
try:
6262
await self._task_info_store.create_task(
63-
task_id, task_metadata, expiry=expiry
63+
task_id, execution_metadata, expiry=expiry
6464
)
6565
self._celery_app.send_task(
66-
task_metadata.name,
66+
execution_metadata.name,
6767
task_id=task_id,
6868
kwargs={"task_id": task_id} | task_params,
69-
queue=task_metadata.queue.value,
69+
queue=execution_metadata.queue.value,
7070
)
7171
except CeleryError as exc:
7272
try:
@@ -78,20 +78,22 @@ async def submit_task(
7878
exc_info=True,
7979
)
8080
raise TaskSubmissionError(
81-
task_name=task_metadata.name,
81+
task_name=execution_metadata.name,
8282
task_id=task_id,
8383
task_params=task_params,
8484
) from exc
8585

8686
return task_uuid
8787

88-
async def cancel_task(self, task_filter: TaskFilter, task_uuid: TaskUUID) -> None:
88+
async def cancel_task(
89+
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
90+
) -> None:
8991
with log_context(
9092
_logger,
9193
logging.DEBUG,
92-
msg=f"task cancellation: {task_filter=} {task_uuid=}",
94+
msg=f"task cancellation: {owner_metadata=} {task_uuid=}",
9395
):
94-
task_id = task_filter.create_task_id(task_uuid=task_uuid)
96+
task_id = owner_metadata.model_dump_task_id(task_uuid=task_uuid)
9597
if not await self.task_exists(task_id):
9698
raise TaskNotFoundError(task_id=task_id)
9799

@@ -106,14 +108,14 @@ def _forget_task(self, task_id: TaskID) -> None:
106108
self._celery_app.AsyncResult(task_id).forget()
107109

108110
async def get_task_result(
109-
self, task_filter: TaskFilter, task_uuid: TaskUUID
111+
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
110112
) -> Any:
111113
with log_context(
112114
_logger,
113115
logging.DEBUG,
114-
msg=f"Get task result: {task_filter=} {task_uuid=}",
116+
msg=f"Get task result: {owner_metadata=} {task_uuid=}",
115117
):
116-
task_id = task_filter.create_task_id(task_uuid=task_uuid)
118+
task_id = owner_metadata.model_dump_task_id(task_uuid=task_uuid)
117119
if not await self.task_exists(task_id):
118120
raise TaskNotFoundError(task_id=task_id)
119121

@@ -149,14 +151,14 @@ def _get_task_celery_state(self, task_id: TaskID) -> TaskState:
149151
return TaskState(self._celery_app.AsyncResult(task_id).state)
150152

151153
async def get_task_status(
152-
self, task_filter: TaskFilter, task_uuid: TaskUUID
154+
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
153155
) -> TaskStatus:
154156
with log_context(
155157
_logger,
156158
logging.DEBUG,
157-
msg=f"Getting task status: {task_filter=} {task_uuid=}",
159+
msg=f"Getting task status: {owner_metadata=} {task_uuid=}",
158160
):
159-
task_id = task_filter.create_task_id(task_uuid=task_uuid)
161+
task_id = owner_metadata.model_dump_task_id(task_uuid=task_uuid)
160162
if not await self.task_exists(task_id):
161163
raise TaskNotFoundError(task_id=task_id)
162164

@@ -169,13 +171,13 @@ async def get_task_status(
169171
),
170172
)
171173

172-
async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
174+
async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
173175
with log_context(
174176
_logger,
175177
logging.DEBUG,
176-
msg=f"Listing tasks: {task_filter=}",
178+
msg=f"Listing tasks: {owner_metadata=}",
177179
):
178-
return await self._task_info_store.list_tasks(task_filter)
180+
return await self._task_info_store.list_tasks(owner_metadata)
179181

180182
async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> None:
181183
await self._task_info_store.set_task_progress(

0 commit comments

Comments
 (0)