Skip to content

Commit 1fddd08

Browse files
author
Andrei Neagu
committed
fixeed removal and task cancellation
1 parent fac176e commit 1fddd08

File tree

5 files changed

+18
-103
lines changed

5 files changed

+18
-103
lines changed

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ async def start_long_running_task(
9595
dumps=json_dumps,
9696
)
9797
except asyncio.CancelledError:
98-
# cancel the task, the client has disconnected
98+
# remove the task, the client was disconnected
9999
if task_id:
100-
await lrt_api.cancel_task(
100+
await lrt_api.remove_task(
101101
long_running_manager.tasks_manager, task_context, task_id
102102
)
103103
raise

packages/service-library/src/servicelib/fastapi/long_running_tasks/_context_manager.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,11 @@ async def _wait_for_task_result() -> Any:
128128
exception=e,
129129
) from e
130130
except Exception as e:
131-
error = TaskExceptionError(task_id=task_id, exception=e, traceback="")
132131
_logger.warning(
133132
create_troubleshootting_log_message(
134133
user_error_msg=f"{task_id=} raised an exception",
135134
error=e,
136135
tip=f"Check the logs of the service responding to '{client.base_url}'",
137136
)
138137
)
139-
raise error from e
138+
raise TaskExceptionError(task_id=task_id, exception=e, traceback="") from e

packages/service-library/src/servicelib/long_running_tasks/lrt_api.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,5 @@ async def get_task_result(
103103
async def remove_task(
104104
tasks_manager: TasksManager, task_context: TaskContext, task_id: TaskId
105105
) -> None:
106-
"""removes / cancels a task"""
106+
"""cancels and removes the task"""
107107
await tasks_manager.remove_task(task_id, with_task_context=task_context)
108-
109-
110-
async def cancel_task(
111-
tasks_manager: TasksManager, task_context: TaskContext, task_id: TaskId
112-
) -> None:
113-
"""cancels a task"""
114-
await tasks_manager.cancel_task(task_id, with_task_context=task_context)

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 14 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import functools
44
import inspect
55
import logging
6-
import traceback
76
import urllib.parse
87
from typing import Any, ClassVar, Final, Protocol, TypeAlias
98
from uuid import uuid4
@@ -14,15 +13,13 @@
1413
from settings_library.redis import RedisDatabase, RedisSettings
1514

1615
from ..background_task import create_periodic_task
17-
from ..logging_utils import log_catch
1816
from ..redis import RedisClientSDK, exclusive
1917
from ._redis_serialization import object_to_string, string_to_object
2018
from ._store.base import BaseStore
2119
from ._store.redis import RedisStore
2220
from .errors import (
2321
TaskAlreadyRunningError,
2422
TaskCancelledError,
25-
TaskExceptionError,
2623
TaskNotCompletedError,
2724
TaskNotFoundError,
2825
TaskNotRegisteredError,
@@ -171,12 +168,10 @@ async def teardown(self) -> None:
171168
)
172169

173170
if self._stale_tasks_monitor_task:
174-
with log_catch(_logger, reraise=False):
175-
await cancel_wait_task(self._stale_tasks_monitor_task)
171+
await cancel_wait_task(self._stale_tasks_monitor_task)
176172

177173
if self._cancelled_tasks_removal_task:
178-
with log_catch(_logger, reraise=False):
179-
await cancel_wait_task(self._cancelled_tasks_removal_task)
174+
await cancel_wait_task(self._cancelled_tasks_removal_task)
180175

181176
if self.redis_client_sdk is not None:
182177
await self.redis_client_sdk.shutdown()
@@ -368,38 +363,20 @@ async def get_task_result(
368363

369364
return string_to_object(tracked_task.result_field.result)
370365

371-
@staticmethod
372366
async def _cancel_tracked_task(
373-
task: asyncio.Task, task_id: TaskId, *, reraise_errors: bool
367+
self, task: asyncio.Task, task_id: TaskId, with_task_context: TaskContext
374368
) -> None:
375369
try:
370+
await self._tasks_data.set_as_cancelled(task_id, with_task_context)
371+
# TODO: remove this cancellation timeout
376372
await cancel_wait_task(task, max_delay=_CANCEL_TASK_TIMEOUT)
377373
except Exception as e: # pylint:disable=broad-except
378-
formatted_traceback = "".join(traceback.format_exception(e))
379-
if reraise_errors:
380-
raise TaskExceptionError(
381-
task_id=task_id, exception=e, traceback=formatted_traceback
382-
) from e
383-
384-
async def cancel_task(
385-
self, task_id: TaskId, with_task_context: TaskContext
386-
) -> None:
387-
"""
388-
Eventually cancels the task.
389-
390-
# NOTE: aborts the task:
391-
# - Immediately, if the task is running on the current worker.
392-
# - Asynchronously (after a short delay), if the task is running on another worker.
393-
394-
raises TaskNotFoundError if the task cannot be found
395-
"""
396-
await self._tasks_data.set_as_cancelled(task_id, with_task_context)
397-
tracked_task = await self._get_tracked_task(task_id, with_task_context)
398-
await self._cancel_tracked_task(
399-
self._created_tasks[tracked_task.task_id], task_id, reraise_errors=False
400-
)
401-
402-
# wait for task to be removed?
374+
_logger.info(
375+
"Task %s cancellation failed with error: %s",
376+
task_id,
377+
e,
378+
stack_info=True,
379+
)
403380

404381
async def remove_task(
405382
self,
@@ -417,15 +394,12 @@ async def remove_task(
417394
return
418395

419396
if tracked_task.task_id in self._created_tasks:
420-
await self._cancel_tracked_task(
421-
self._created_tasks[tracked_task.task_id],
422-
task_id,
423-
reraise_errors=reraise_errors,
424-
)
425-
# task might already be removed via cancellation background task
397+
# will have affect in the worker which deals with the removal
426398
await self._tasks_data.delete_task_data(task_id)
427399
self._created_tasks.pop(tracked_task.task_id, None)
428400

401+
# TODO: wait for removal to becompleted here
402+
429403
def _get_task_id(self, task_name: str, *, is_unique: bool) -> TaskId:
430404
unique_part = "unique" if is_unique else f"{uuid4()}"
431405
return f"{self.namespace}.{task_name}.{unique_part}"

packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py

Lines changed: 0 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from servicelib.long_running_tasks import lrt_api
1818
from servicelib.long_running_tasks.errors import (
1919
TaskAlreadyRunningError,
20-
TaskCancelledError,
2120
TaskNotCompletedError,
2221
TaskNotFoundError,
2322
TaskNotRegisteredError,
@@ -324,34 +323,6 @@ async def test_get_result_finished_with_error(
324323
await tasks_manager.get_task_result(task_id, with_task_context=empty_context)
325324

326325

327-
async def test_get_result_task_was_cancelled_multiple_times(
328-
tasks_manager: TasksManager, empty_context: TaskContext
329-
):
330-
task_id = await lrt_api.start_task(
331-
tasks_manager,
332-
a_background_task.__name__,
333-
raise_when_finished=False,
334-
total_sleep=10,
335-
task_context=empty_context,
336-
)
337-
for _ in range(5):
338-
await tasks_manager.cancel_task(task_id, with_task_context=empty_context)
339-
340-
with pytest.raises( # noqa: PT012
341-
TaskCancelledError, match=f"Task {task_id} was cancelled before completing"
342-
):
343-
async for attempt in AsyncRetrying(
344-
**{
345-
**_RETRY_PARAMS,
346-
"retry": retry_if_exception_type((TaskNotCompletedError,)),
347-
}
348-
):
349-
with attempt:
350-
await tasks_manager.get_task_result(
351-
task_id, with_task_context=empty_context
352-
)
353-
354-
355326
async def test_cancel_task_from_different_manager(
356327
use_in_memory_redis: RedisSettings,
357328
empty_context: TaskContext,
@@ -436,28 +407,6 @@ async def test_remove_unknown_task(
436407
)
437408

438409

439-
async def test_cancel_task_with_task_context(tasks_manager: TasksManager):
440-
TASK_CONTEXT = {"some_context": "some_value"}
441-
task_id = await lrt_api.start_task(
442-
tasks_manager,
443-
a_background_task.__name__,
444-
raise_when_finished=False,
445-
total_sleep=10,
446-
task_context=TASK_CONTEXT,
447-
)
448-
# getting status fails if wrong task context given
449-
with pytest.raises(TaskNotFoundError):
450-
await tasks_manager.get_task_status(
451-
task_id, with_task_context={"wrong_task_context": 12}
452-
)
453-
# getting status fails if wrong task context given
454-
with pytest.raises(TaskNotFoundError):
455-
await tasks_manager.cancel_task(
456-
task_id, with_task_context={"wrong_task_context": 12}
457-
)
458-
await tasks_manager.cancel_task(task_id, with_task_context=TASK_CONTEXT)
459-
460-
461410
async def test__cancelled_tasks_worker_equivalent_of_cancellation_from_a_different_process(
462411
tasks_manager: TasksManager, empty_context: TaskContext
463412
):

0 commit comments

Comments
 (0)