Skip to content

Commit 213906a

Browse files
author
Andrei Neagu
committed
added dont autoretry for
1 parent 042d093 commit 213906a

File tree

2 files changed

+19
-3
lines changed
  • services/storage/src/simcore_service_storage

2 files changed

+19
-3
lines changed

services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22

33
from celery import Celery # type: ignore[import-untyped]
4+
from models_library.api_schemas_storage.export_data_async_jobs import AccessRightError
45
from servicelib.logging_utils import log_context
56

67
from ...modules.celery._celery_types import register_celery_types
@@ -15,7 +16,7 @@
1516
def setup_worker_tasks(app: Celery) -> None:
1617
register_celery_types()
1718
with log_context(_logger, logging.INFO, msg="worker task registration"):
18-
register_task(app, export_data)
19+
register_task(app, export_data, dont_autoretry_for=(AccessRightError,))
1920
register_task(app, compute_path_size)
2021
register_task(app, complete_upload_file)
2122
register_task(app, delete_paths)

services/storage/src/simcore_service_storage/modules/celery/_task.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
_DEFAULT_TASK_TIMEOUT: Final[timedelta | None] = None
2121
_DEFAULT_MAX_RETRIES: Final[NonNegativeInt] = 3
2222
_DEFAULT_WAIT_BEFORE_RETRY: Final[timedelta] = timedelta(seconds=5)
23+
_DEFAULT_DONT_AUTORETRY_FOR: Final[tuple[type[Exception], ...]] = tuple()
2324

2425

2526
T = TypeVar("T")
@@ -52,13 +53,21 @@ def wrapper(task: AbortableTask, *args: P.args, **kwargs: P.kwargs) -> R:
5253
return decorator
5354

5455

55-
def _error_handling(max_retries: NonNegativeInt, delay_between_retries: timedelta):
56+
def _error_handling(
57+
max_retries: NonNegativeInt,
58+
delay_between_retries: timedelta,
59+
dont_autoretry_for: tuple[type[Exception], ...],
60+
):
5661
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
5762
@wraps(func)
5863
def wrapper(task: Task, *args: Any, **kwargs: Any) -> Any:
5964
try:
6065
return func(task, *args, **kwargs)
6166
except Exception as exc:
67+
if isinstance(exc, dont_autoretry_for):
68+
_logger.debug("Not retrying for exception %s", type(exc).__name__)
69+
raise # propagate without retry
70+
6271
exc_type = type(exc).__name__
6372
exc_message = f"{exc}"
6473
_logger.exception(
@@ -87,6 +96,7 @@ def register_task(
8796
timeout: timedelta | None = _DEFAULT_TASK_TIMEOUT,
8897
max_retries: NonNegativeInt = _DEFAULT_MAX_RETRIES,
8998
delay_between_retries: timedelta = _DEFAULT_WAIT_BEFORE_RETRY,
99+
dont_autoretry_for: tuple[type[Exception], ...] = _DEFAULT_DONT_AUTORETRY_FOR,
90100
) -> None: ...
91101

92102

@@ -98,6 +108,7 @@ def register_task(
98108
timeout: timedelta | None = _DEFAULT_TASK_TIMEOUT,
99109
max_retries: NonNegativeInt = _DEFAULT_MAX_RETRIES,
100110
delay_between_retries: timedelta = _DEFAULT_WAIT_BEFORE_RETRY,
111+
dont_autoretry_for: tuple[type[Exception], ...] = _DEFAULT_DONT_AUTORETRY_FOR,
101112
) -> None: ...
102113

103114

@@ -111,6 +122,7 @@ def register_task( # type: ignore[misc]
111122
timeout: timedelta | None = _DEFAULT_TASK_TIMEOUT,
112123
max_retries: NonNegativeInt = _DEFAULT_MAX_RETRIES,
113124
delay_between_retries: timedelta = _DEFAULT_WAIT_BEFORE_RETRY,
125+
dont_autoretry_for: tuple[type[Exception], ...] = _DEFAULT_DONT_AUTORETRY_FOR,
114126
) -> None:
115127
"""Decorator to define a celery task with error handling and abortable support
116128
@@ -119,6 +131,7 @@ def register_task( # type: ignore[misc]
119131
timeout -- when None no timeout is enforced, task is allowed to run forever (default: {_DEFAULT_TASK_TIMEOUT})
120132
max_retries -- number of attempts in case of failuire before giving up (default: {_DEFAULT_MAX_RETRIES})
121133
delay_between_retries -- dealy between each attempt in case of error (default: {_DEFAULT_WAIT_BEFORE_RETRY})
134+
dont_autoretry_for -- exceptions that should not be retried when raised by the task
122135
"""
123136
wrapped_fn: Callable[Concatenate[AbortableTask, P], R]
124137
if asyncio.iscoroutinefunction(fn):
@@ -128,7 +141,9 @@ def register_task( # type: ignore[misc]
128141
wrapped_fn = fn
129142

130143
wrapped_fn = _error_handling(
131-
max_retries=max_retries, delay_between_retries=delay_between_retries
144+
max_retries=max_retries,
145+
delay_between_retries=delay_between_retries,
146+
dont_autoretry_for=dont_autoretry_for,
132147
)(wrapped_fn)
133148

134149
app.task(

0 commit comments

Comments
 (0)