Skip to content

Commit e7cea08

Browse files
author
Andrei Neagu
committed
fixed removal issues
1 parent be7e1e2 commit e7cea08

File tree

1 file changed

+27
-19
lines changed
  • packages/service-library/src/servicelib/long_running_tasks

1 file changed

+27
-19
lines changed

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from common_library.async_tools import cancel_wait_task
1212
from models_library.api_schemas_long_running_tasks.base import TaskProgress
1313
from pydantic import NonNegativeFloat, PositiveFloat
14+
from servicelib.utils import limited_gather
1415
from settings_library.redis import RedisDatabase, RedisSettings
1516
from tenacity import (
1617
AsyncRetrying,
@@ -50,6 +51,7 @@
5051
_STATUS_UPDATE_CHECK_INTERNAL: Final[datetime.timedelta] = datetime.timedelta(seconds=1)
5152
_MAX_EXCLUSIVE_TASK_CANCEL_TIMEOUT: Final[NonNegativeFloat] = 5
5253
_TASK_REMOVAL_MAX_WAIT: Final[NonNegativeFloat] = 60
54+
_PARALLEL_TASKS_CANCELLATION: Final[int] = 5
5355

5456
AllowedErrrors: TypeAlias = tuple[type[BaseException], ...]
5557

@@ -205,34 +207,40 @@ async def setup(self) -> None:
205207
await self._started_event_task_tasks_monitor.wait()
206208

207209
async def teardown(self) -> None:
208-
# ensure all created tasks are cancelled
209-
for tracked_task in await self._tasks_data.list_tasks_data():
210-
with suppress(TaskNotFoundError):
211-
await self.remove_task(
212-
tracked_task.task_id,
213-
tracked_task.task_context,
214-
wait_for_removal=True,
215-
)
210+
# stop cancelled_tasks_removal
211+
if self._task_cancelled_tasks_removal:
212+
await cancel_wait_task(self._task_cancelled_tasks_removal)
216213

217-
for task in self._created_tasks.values():
218-
_logger.warning(
219-
"Task %s was not completed before shutdown, cancelling it",
220-
task.get_name(),
214+
# stopping only tasks that are handled by this manager
215+
# otherwise it will cancel long running tasks that were running in diffierent processes
216+
async def _remove_local_task(task_data: TaskData) -> None:
217+
await self.remove_task(
218+
task_data.task_id,
219+
task_data.task_context,
220+
wait_for_removal=False,
221221
)
222-
await cancel_wait_task(task)
222+
await self._attempt_to_remove_local_task(task_data.task_id)
223223

224-
# stale_tasks_monitor
224+
tasks_to_remove = []
225+
for task_id in self._created_tasks:
226+
tracked_task = await self._tasks_data.get_task_data(task_id)
227+
if tracked_task is None:
228+
continue
229+
230+
tasks_to_remove.append(_remove_local_task(tracked_task))
231+
232+
await limited_gather(
233+
*tasks_to_remove, log=_logger, limit=_PARALLEL_TASKS_CANCELLATION
234+
)
235+
236+
# stop stale_tasks_monitor
225237
if self._task_stale_tasks_monitor:
226238
await cancel_wait_task(
227239
self._task_stale_tasks_monitor,
228240
max_delay=_MAX_EXCLUSIVE_TASK_CANCEL_TIMEOUT,
229241
)
230242

231-
# cancelled_tasks_removal
232-
if self._task_cancelled_tasks_removal:
233-
await cancel_wait_task(self._task_cancelled_tasks_removal)
234-
235-
# tasks_monitor
243+
# stop tasks_monitor
236244
if self._task_tasks_monitor:
237245
await cancel_wait_task(self._task_tasks_monitor)
238246

0 commit comments

Comments
 (0)