|
2 | 2 | import inspect |
3 | 3 | import logging |
4 | 4 | from collections.abc import Awaitable, Callable, Iterable |
5 | | -from datetime import timedelta |
| 5 | +from datetime import datetime, timedelta |
6 | 6 | from enum import Enum |
7 | 7 | from typing import Any, Final |
8 | 8 |
|
@@ -118,6 +118,14 @@ def _raise_if_not_type(task_result: Any, expected_types: Iterable[type]) -> None |
118 | 118 | raise TypeError(msg) |
119 | 119 |
|
120 | 120 |
|
| 121 | +async def _wait_until_future_date(possible_future_date: datetime) -> None: |
| 122 | + while True: |
| 123 | + now = arrow.utcnow().datetime |
| 124 | + if now >= possible_future_date: |
| 125 | + return |
| 126 | + await asyncio.sleep(1) |
| 127 | + |
| 128 | + |
121 | 129 | class DeferredManager: # pylint:disable=too-many-instance-attributes |
122 | 130 | def __init__( |
123 | 131 | self, |
@@ -464,15 +472,28 @@ async def _fs_handle_error_result( # pylint:disable=method-hidden |
464 | 472 | ): |
465 | 473 | _logger.debug("Schedule retry attempt for task_uid '%s'", task_uid) |
466 | 474 |
|
467 | | - subclass = self.__get_subclass(task_schedule.class_unique_reference) |
468 | | - deferred_context = self.__get_deferred_context(task_schedule.start_context) |
469 | | - sleep_interval = await subclass.get_retry_delay( |
470 | | - context=deferred_context, |
471 | | - remaining_attempts=task_schedule.execution_attempts, |
472 | | - total_attempts=task_schedule.total_attempts, |
473 | | - ) |
474 | | - await asyncio.sleep(sleep_interval.total_seconds()) |
| 475 | + # resilenet wait before retrying |
| 476 | + if task_schedule.wait_cancellation_until is None: |
| 477 | + # save the new one |
| 478 | + subclass = self.__get_subclass(task_schedule.class_unique_reference) |
| 479 | + deferred_context = self.__get_deferred_context( |
| 480 | + task_schedule.start_context |
| 481 | + ) |
| 482 | + sleep_interval = await subclass.get_retry_delay( |
| 483 | + context=deferred_context, |
| 484 | + remaining_attempts=task_schedule.execution_attempts, |
| 485 | + total_attempts=task_schedule.total_attempts, |
| 486 | + ) |
| 487 | + task_schedule.wait_cancellation_until = ( |
| 488 | + arrow.utcnow().datetime + sleep_interval |
| 489 | + ) |
| 490 | + await self._task_tracker.save(task_uid, task_schedule) |
| 491 | + |
| 492 | + await _wait_until_future_date(task_schedule.wait_cancellation_until) |
| 493 | + task_schedule.wait_cancellation_until = None |
| 494 | + await self._task_tracker.save(task_uid, task_schedule) |
475 | 495 |
|
| 496 | + # waiting is done can proceed with retry |
476 | 497 | task_schedule.state = TaskState.SUBMIT_TASK |
477 | 498 | await self._task_tracker.save(task_uid, task_schedule) |
478 | 499 | await self.__publish_to_queue(task_uid, _FastStreamRabbitQueue.SUBMIT_TASK) |
|
0 commit comments