Skip to content

Commit e5b2602

Browse files
committed
simplify wildcard usage
1 parent 7ede2aa commit e5b2602

File tree

3 files changed

+13
-29
lines changed

3 files changed

+13
-29
lines changed

packages/celery-library/src/celery_library/backends/_redis.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,6 @@ async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> No
128128
value=report.model_dump_json(),
129129
) # type: ignore
130130

131-
@property
132-
def wildcard_str(self) -> str:
133-
return "*"
134-
135131

136132
if TYPE_CHECKING:
137133
_: type[TaskInfoStore] = RedisTaskInfoStore

packages/celery-library/src/celery_library/task_manager.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,7 @@ async def submit_task(
4949
msg=f"Submit {task_metadata.name=}: {task_filter=} {task_params=}",
5050
):
5151
task_uuid = uuid4()
52-
task_id = task_filter.get_task_id(
53-
task_uuid=task_uuid, wildcard_str=self._task_info_store.wildcard_str
54-
)
52+
task_id = task_filter.get_task_id(task_uuid=task_uuid)
5553
self._celery_app.send_task(
5654
task_metadata.name,
5755
task_id=task_id,
@@ -79,9 +77,7 @@ async def cancel_task(self, task_filter: TaskFilter, task_uuid: TaskUUID) -> Non
7977
logging.DEBUG,
8078
msg=f"task cancellation: {task_filter=} {task_uuid=}",
8179
):
82-
task_id = task_filter.get_task_id(
83-
task_uuid=task_uuid, wildcard_str=self._task_info_store.wildcard_str
84-
)
80+
task_id = task_filter.get_task_id(task_uuid=task_uuid)
8581
if not (await self.get_task_status(task_filter, task_uuid)).is_done:
8682
await self._abort_task(task_id)
8783
await self._task_info_store.remove_task(task_id)
@@ -98,9 +94,7 @@ async def get_task_result(
9894
logging.DEBUG,
9995
msg=f"Get task result: {task_filter=} {task_uuid=}",
10096
):
101-
task_id = task_filter.get_task_id(
102-
task_uuid=task_uuid, wildcard_str=self._task_info_store.wildcard_str
103-
)
97+
task_id = task_filter.get_task_id(task_uuid=task_uuid)
10498
async_result = self._celery_app.AsyncResult(task_id)
10599
result = async_result.result
106100
if async_result.ready():
@@ -114,9 +108,7 @@ async def _get_task_progress_report(
114108
self, task_filter: TaskFilter, task_uuid: TaskUUID, task_state: TaskState
115109
) -> ProgressReport:
116110
if task_state in (TaskState.STARTED, TaskState.RETRY, TaskState.ABORTED):
117-
task_id = task_filter.get_task_id(
118-
task_uuid=task_uuid, wildcard_str=self._task_info_store.wildcard_str
119-
)
111+
task_id = task_filter.get_task_id(task_uuid=task_uuid)
120112
progress = await self._task_info_store.get_task_progress(task_id)
121113
if progress is not None:
122114
return progress
@@ -145,9 +137,7 @@ async def get_task_status(
145137
logging.DEBUG,
146138
msg=f"Getting task status: {task_filter=} {task_uuid=}",
147139
):
148-
task_id = task_filter.get_task_id(
149-
task_uuid=task_uuid, wildcard_str=self._task_info_store.wildcard_str
150-
)
140+
task_id = task_filter.get_task_id(task_uuid=task_uuid)
151141
task_state = await self._get_task_celery_state(task_id)
152142
return TaskStatus(
153143
task_uuid=task_uuid,

packages/service-library/src/servicelib/celery/models.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
]
1616
TaskUUID: TypeAlias = UUID
1717
_TASK_ID_KEY_DELIMITATOR: Final[str] = ":"
18+
_WILDCARD: Final[str] = "*"
1819

1920

2021
class Wildcard: ...
2122

2223

23-
def _replace_wildcard(value: Any, wildcard_str: str) -> str:
24+
def _replace_wildcard(value: Any) -> str:
2425
if isinstance(value, Wildcard):
25-
return wildcard_str
26+
return _WILDCARD
2627
return f"{value}"
2728

2829

@@ -60,20 +61,20 @@ def _check_valid_filters(self) -> Self:
6061
)
6162
return self
6263

63-
def _build_task_id_prefix(self, wildcard_str: str) -> str:
64+
def _build_task_id_prefix(self) -> str:
6465
filter_dict = self.model_dump()
6566
return _TASK_ID_KEY_DELIMITATOR.join(
6667
[
67-
f"{key}={_replace_wildcard(filter_dict[key], wildcard_str)}"
68+
f"{key}={_replace_wildcard(filter_dict[key])}"
6869
for key in sorted(filter_dict)
6970
]
7071
)
7172

72-
def get_task_id(self, task_uuid: TaskUUID | Wildcard, wildcard_str: str) -> TaskID:
73+
def get_task_id(self, task_uuid: TaskUUID | Wildcard) -> TaskID:
7374
return _TASK_ID_KEY_DELIMITATOR.join(
7475
[
75-
self._build_task_id_prefix(wildcard_str),
76-
f"task_uuid={_replace_wildcard(task_uuid, wildcard_str)}",
76+
self._build_task_id_prefix(),
77+
f"task_uuid={_replace_wildcard(task_uuid)}",
7778
]
7879
)
7980

@@ -191,9 +192,6 @@ async def set_task_progress(
191192
self, task_id: TaskID, report: ProgressReport
192193
) -> None: ...
193194

194-
@property
195-
def wildcard_str(cls) -> str: ...
196-
197195

198196
class TaskStatus(BaseModel):
199197
task_uuid: TaskUUID

0 commit comments

Comments
 (0)