Skip to content

Commit 0d23b86

Browse files
author
Andrei Neagu
committed
removed unsued
1 parent 10fc877 commit 0d23b86

File tree

6 files changed

+42
-64
lines changed

6 files changed

+42
-64
lines changed

packages/service-library/src/servicelib/long_running_tasks/_redis_store.py

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@
88
from ..redis._client import RedisClientSDK
99
from ..redis._utils import handle_redis_returns_union_types
1010
from ..utils import limited_gather
11-
from .models import LRTNamespace, TaskContext, TaskData, TaskId
11+
from .models import LRTNamespace, TaskData, TaskId
1212

1313
_STORE_TYPE_TASK_DATA: Final[str] = "TD"
14-
_STORE_TYPE_CANCELLED_TASKS: Final[str] = "CT"
15-
_LIST_CONCURRENCY: Final[int] = 2
14+
_LIST_CONCURRENCY: Final[int] = 3
1615

1716

1817
def _to_redis_hash_mapping(data: dict[str, Any]) -> dict[str, str]:
@@ -52,11 +51,6 @@ def _get_redis_key_task_data_match(self) -> str:
5251
def _get_redis_task_data_key(self, task_id: TaskId) -> str:
5352
return f"{self.namespace}:{_STORE_TYPE_TASK_DATA}:{task_id}"
5453

55-
def _get_key_to_remove(self) -> str:
56-
return f"{self.namespace}:{_STORE_TYPE_CANCELLED_TASKS}"
57-
58-
# TaskData
59-
6054
async def get_task_data(self, task_id: TaskId) -> TaskData | None:
6155
result: dict[str, Any] = await handle_redis_returns_union_types(
6256
self._redis.hgetall(
@@ -117,28 +111,18 @@ async def delete_task_data(self, task_id: TaskId) -> None:
117111

118112
# to cancel
119113

120-
async def mark_task_for_removal(
121-
self, task_id: TaskId, with_task_context: TaskContext
122-
) -> None:
114+
async def mark_task_for_removal(self, task_id: TaskId) -> None:
123115
await handle_redis_returns_union_types(
124116
self._redis.hset(
125-
self._get_key_to_remove(), task_id, json_dumps(with_task_context)
117+
self._get_redis_task_data_key(task_id),
118+
mapping=_to_redis_hash_mapping({"marked_for_removal": True}),
126119
)
127120
)
128121

129122
async def is_marked_for_removal(self, task_id: TaskId) -> bool:
130-
result: bool = await handle_redis_returns_union_types(
131-
self._redis.hexists(self._get_key_to_remove(), task_id)
132-
)
133-
return result
134-
135-
async def completed_task_removal(self, task_id: TaskId) -> None:
136-
await handle_redis_returns_union_types(
137-
self._redis.hdel(self._get_key_to_remove(), task_id)
138-
)
139-
140-
async def list_tasks_to_remove(self) -> dict[TaskId, TaskContext]:
141-
result: dict[str, str | None] = await handle_redis_returns_union_types(
142-
self._redis.hgetall(self._get_key_to_remove())
123+
result = await handle_redis_returns_union_types(
124+
self._redis.hget(
125+
self._get_redis_task_data_key(task_id), "marked_for_removal"
126+
)
143127
)
144-
return {task_id: json_loads(context) for task_id, context in result.items()}
128+
return False if result is None else json_loads(result)

packages/service-library/src/servicelib/long_running_tasks/models.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class TaskData(BaseModel):
5050
task_id: str
5151
task_progress: TaskProgress
5252
# NOTE: this context lifetime is with the tracked task (similar to aiohttp storage concept)
53-
task_context: dict[str, Any]
53+
task_context: TaskContext
5454
fire_and_forget: Annotated[
5555
bool,
5656
Field(
@@ -78,6 +78,10 @@ class TaskData(BaseModel):
7878
result_field: Annotated[
7979
ResultField | None, Field(description="the result of the task")
8080
] = None
81+
marked_for_removal: Annotated[
82+
bool,
83+
Field(description=("if True, indicates the task is marked for removal")),
84+
] = False
8185

8286
model_config = ConfigDict(
8387
arbitrary_types_allowed=True,

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -300,11 +300,17 @@ async def _cancelled_tasks_removal(self) -> None:
300300
"""
301301
self._started_event_task_cancelled_tasks_removal.set()
302302

303-
to_remove = await self._tasks_data.list_tasks_to_remove()
304-
for task_id in to_remove:
305-
await self._attempt_to_remove_local_task(task_id)
303+
tasks_data = await self._tasks_data.list_tasks_data()
304+
await limited_gather(
305+
*(
306+
self._attempt_to_remove_local_task(x.task_id)
307+
for x in tasks_data
308+
if x.marked_for_removal is True
309+
),
310+
limit=_PARALLEL_TASKS_CANCELLATION,
311+
)
306312

307-
async def _tasks_monitor(self) -> None:
313+
async def _tasks_monitor(self) -> None: # noqa: C901
308314
"""
309315
A task which monitors locally running tasks and updates their status
310316
in the Redis store when they are done.
@@ -385,30 +391,21 @@ async def _tasks_monitor(self) -> None:
385391
updates["result_field"] = result_field
386392
await self._tasks_data.update_task_data(task_id, updates=updates)
387393

388-
async def _list_tasks_unfiltered(
389-
self, with_task_context: TaskContext | None
390-
) -> list[TaskBase]:
394+
async def list_tasks(self, with_task_context: TaskContext | None) -> list[TaskBase]:
391395
if not with_task_context:
392396
return [
393397
TaskBase(task_id=task.task_id)
394398
for task in (await self._tasks_data.list_tasks_data())
399+
if task.marked_for_removal is False
395400
]
396401

397402
return [
398403
TaskBase(task_id=task.task_id)
399404
for task in (await self._tasks_data.list_tasks_data())
400405
if task.task_context == with_task_context
406+
and task.marked_for_removal is False
401407
]
402408

403-
async def list_tasks(self, with_task_context: TaskContext | None) -> list[TaskBase]:
404-
tasks_to_filter = await self._list_tasks_unfiltered(with_task_context)
405-
406-
if len(tasks_to_filter) == 0:
407-
return []
408-
409-
to_exclude = await self._tasks_data.list_tasks_to_remove()
410-
return [r for r in tasks_to_filter if r.task_id not in to_exclude]
411-
412409
async def _get_tracked_task(
413410
self, task_id: TaskId, with_task_context: TaskContext
414411
) -> TaskData:
@@ -488,7 +485,6 @@ async def _attempt_to_remove_local_task(self, task_id: TaskId) -> None:
488485
task_to_cancel = self._created_tasks.pop(task_id, None)
489486
if task_to_cancel is not None:
490487
await cancel_wait_task(task_to_cancel)
491-
await self._tasks_data.completed_task_removal(task_id)
492488
await self._tasks_data.delete_task_data(task_id)
493489

494490
async def remove_task(
@@ -507,9 +503,7 @@ async def remove_task(
507503

508504
tracked_task = await self._get_tracked_task(task_id, with_task_context)
509505

510-
await self._tasks_data.mark_task_for_removal(
511-
tracked_task.task_id, tracked_task.task_context
512-
)
506+
await self._tasks_data.mark_task_for_removal(tracked_task.task_id)
513507

514508
if not wait_for_removal:
515509
return

packages/service-library/tests/long_running_tasks/test_long_running_tasks__store.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from collections.abc import AsyncIterable, Callable
44
from contextlib import AbstractAsyncContextManager
5+
from copy import deepcopy
56

67
import pytest
78
from pydantic import TypeAdapter
@@ -50,18 +51,14 @@ async def test_workflow(store: RedisStore, task_data: TaskData) -> None:
5051
assert await store.list_tasks_data() == []
5152

5253
# cancelled tasks
53-
assert await store.list_tasks_to_remove() == {}
54+
await store.add_task_data(task_data.task_id, task_data)
5455

5556
assert await store.is_marked_for_removal(task_data.task_id) is False
5657

57-
await store.mark_task_for_removal(task_data.task_id, task_data.task_context)
58+
await store.mark_task_for_removal(task_data.task_id)
5859

5960
assert await store.is_marked_for_removal(task_data.task_id) is True
6061

61-
assert await store.list_tasks_to_remove() == {
62-
task_data.task_id: task_data.task_context
63-
}
64-
6562

6663
@pytest.fixture
6764
async def redis_stores(
@@ -93,15 +90,15 @@ async def test_workflow_multiple_redis_stores_with_different_namespaces(
9390

9491
for store in redis_stores:
9592
assert await store.list_tasks_data() == []
96-
assert await store.list_tasks_to_remove() == {}
9793

9894
for store in redis_stores:
9995
await store.add_task_data(task_data.task_id, task_data)
100-
await store.mark_task_for_removal(task_data.task_id, {})
96+
await store.mark_task_for_removal(task_data.task_id)
10197

98+
marked_as_removed_task_data = deepcopy(task_data)
99+
marked_as_removed_task_data.marked_for_removal = True
102100
for store in redis_stores:
103-
assert await store.list_tasks_data() == [task_data]
104-
assert await store.list_tasks_to_remove() == {task_data.task_id: {}}
101+
assert await store.list_tasks_data() == [marked_as_removed_task_data]
105102

106103
for store in redis_stores:
107104
await store.delete_task_data(task_data.task_id)

packages/service-library/tests/long_running_tasks/test_long_running_tasks_client_long_running_manager.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from collections.abc import AsyncIterable, Callable
44
from contextlib import AbstractAsyncContextManager
5+
from copy import deepcopy
56

67
import pytest
78
from pydantic import TypeAdapter
@@ -64,20 +65,18 @@ async def test_cleanup_namespace(
6465
) -> None:
6566
# create entries in both sides
6667
await store.add_task_data(task_data.task_id, task_data)
67-
await store.mark_task_for_removal(task_data.task_id, task_data.task_context)
68+
await store.mark_task_for_removal(task_data.task_id)
6869

6970
# entries exit
70-
assert await store.list_tasks_data() == [task_data]
71-
assert await store.list_tasks_to_remove() == {
72-
task_data.task_id: task_data.task_context
73-
}
71+
marked_for_removal = deepcopy(task_data)
72+
marked_for_removal.marked_for_removal = True
73+
assert await store.list_tasks_data() == [marked_for_removal]
7474

7575
# removes
7676
await long_running_client_helper.cleanup(lrt_namespace)
7777

7878
# entris were removed
7979
assert await store.list_tasks_data() == []
80-
assert await store.list_tasks_to_remove() == {}
8180

8281
# ensore it does not raise errors if there is nothing to remove
8382
await long_running_client_helper.cleanup(lrt_namespace)

packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ async def test__cancelled_tasks_worker_equivalent_of_cancellation_from_a_differe
549549
task_context=empty_context,
550550
)
551551
await long_running_manager.tasks_manager._tasks_data.mark_task_for_removal( # noqa: SLF001
552-
task_id, with_task_context=empty_context
552+
task_id
553553
)
554554

555555
async for attempt in AsyncRetrying(**_RETRY_PARAMS):

0 commit comments

Comments
 (0)