Skip to content

Commit fac176e

Browse files
author
Andrei Neagu
committed
refactir cancellation
1 parent f699cd2 commit fac176e

File tree

1 file changed

+17
-34
lines changed
  • packages/service-library/src/servicelib/long_running_tasks

1 file changed

+17
-34
lines changed

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 17 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import logging
66
import traceback
77
import urllib.parse
8-
from contextlib import suppress
98
from typing import Any, ClassVar, Final, Protocol, TypeAlias
109
from uuid import uuid4
1110

@@ -369,6 +368,19 @@ async def get_task_result(
369368

370369
return string_to_object(tracked_task.result_field.result)
371370

371+
@staticmethod
372+
async def _cancel_tracked_task(
373+
task: asyncio.Task, task_id: TaskId, *, reraise_errors: bool
374+
) -> None:
375+
try:
376+
await cancel_wait_task(task, max_delay=_CANCEL_TASK_TIMEOUT)
377+
except Exception as e: # pylint:disable=broad-except
378+
formatted_traceback = "".join(traceback.format_exception(e))
379+
if reraise_errors:
380+
raise TaskExceptionError(
381+
task_id=task_id, exception=e, traceback=formatted_traceback
382+
) from e
383+
372384
async def cancel_task(
373385
self, task_id: TaskId, with_task_context: TaskContext
374386
) -> None:
@@ -384,40 +396,10 @@ async def cancel_task(
384396
await self._tasks_data.set_as_cancelled(task_id, with_task_context)
385397
tracked_task = await self._get_tracked_task(task_id, with_task_context)
386398
await self._cancel_tracked_task(
387-
self._created_tasks[tracked_task.task_id], task_id, reraise_errors=True
399+
self._created_tasks[tracked_task.task_id], task_id, reraise_errors=False
388400
)
389401

390-
@staticmethod
391-
async def _cancel_asyncio_task(
392-
task: asyncio.Task, reference: str, *, reraise_errors: bool
393-
) -> None:
394-
task.cancel()
395-
with suppress(asyncio.CancelledError):
396-
try:
397-
try:
398-
await asyncio.wait_for(
399-
_await_task(task), timeout=_CANCEL_TASK_TIMEOUT
400-
)
401-
except TimeoutError:
402-
_logger.warning(
403-
"Timed out while awaiting for cancellation of '%s'", reference
404-
)
405-
except Exception: # pylint:disable=broad-except
406-
if reraise_errors:
407-
raise
408-
409-
async def _cancel_tracked_task(
410-
self, task: asyncio.Task, task_id: TaskId, *, reraise_errors: bool
411-
) -> None:
412-
try:
413-
await self._cancel_asyncio_task(
414-
task, task_id, reraise_errors=reraise_errors
415-
)
416-
except Exception as e: # pylint:disable=broad-except
417-
formatted_traceback = "".join(traceback.format_exception(e))
418-
raise TaskExceptionError(
419-
task_id=task_id, exception=e, traceback=formatted_traceback
420-
) from e
402+
# wait for task to be removed?
421403

422404
async def remove_task(
423405
self,
@@ -440,8 +422,9 @@ async def remove_task(
440422
task_id,
441423
reraise_errors=reraise_errors,
442424
)
425+
# task might already be removed via cancellation background task
443426
await self._tasks_data.delete_task_data(task_id)
444-
del self._created_tasks[tracked_task.task_id]
427+
self._created_tasks.pop(tracked_task.task_id, None)
445428

446429
def _get_task_id(self, task_name: str, *, is_unique: bool) -> TaskId:
447430
unique_part = "unique" if is_unique else f"{uuid4()}"

0 commit comments

Comments
 (0)