Skip to content

Commit 5dcecb4

Browse files
sandereggmatusdrobuliak66
authored andcommitted
🐛 Stopping a pipeline should not fail when it does not exist (#7942)
1 parent 74a886a commit 5dcecb4

File tree

36 files changed

+387
-108
lines changed

36 files changed

+387
-108
lines changed

packages/service-library/src/servicelib/aiohttp/rest_middlewares.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from servicelib.rest_constants import RESPONSE_MODEL_POLICY
1919
from servicelib.status_codes_utils import is_5xx_server_error
2020

21-
from ..logging_errors import create_troubleshotting_log_kwargs
21+
from ..logging_errors import create_troubleshootting_log_kwargs
2222
from ..mimetype_constants import MIMETYPE_APPLICATION_JSON
2323
from ..rest_responses import is_enveloped_from_map, is_enveloped_from_text
2424
from ..status_codes_utils import get_code_description
@@ -71,7 +71,7 @@ def _log_5xx_server_error(
7171
error_code, error_context = _create_error_context(request, exception)
7272

7373
_logger.exception(
74-
**create_troubleshotting_log_kwargs(
74+
**create_troubleshootting_log_kwargs(
7575
user_error_msg,
7676
error=exception,
7777
error_context=error_context,

packages/service-library/src/servicelib/background_task_utils.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from collections.abc import Callable, Coroutine
44
from typing import Any, ParamSpec, TypeVar
55

6-
from servicelib.exception_utils import silence_exceptions
6+
from servicelib.exception_utils import suppress_exceptions
77
from servicelib.redis._errors import CouldNotAcquireLockError
88

99
from .background_task import periodic
@@ -39,10 +39,11 @@ def _decorator(
3939
coro: Callable[P, Coroutine[Any, Any, None]],
4040
) -> Callable[P, Coroutine[Any, Any, None]]:
4141
@periodic(interval=retry_after)
42-
@silence_exceptions(
42+
@suppress_exceptions(
4343
# Replicas will raise CouldNotAcquireLockError
4444
# SEE https://github.com/ITISFoundation/osparc-simcore/issues/7574
45-
(CouldNotAcquireLockError,)
45+
(CouldNotAcquireLockError,),
46+
reason="Multiple instances of the periodic task `{coro.__module__}.{coro.__name__}` are running.",
4647
)
4748
@exclusive(
4849
redis_client,

packages/service-library/src/servicelib/exception_utils.py

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import Any, Final, ParamSpec, TypeVar
77

88
from pydantic import BaseModel, Field, NonNegativeFloat, PrivateAttr
9+
from servicelib.logging_errors import create_troubleshootting_log_kwargs
910

1011
_logger = logging.getLogger(__name__)
1112

@@ -76,17 +77,85 @@ def else_reset(self) -> None:
7677
F = TypeVar("F", bound=Callable[..., Any])
7778

7879

79-
def silence_exceptions(exceptions: tuple[type[BaseException], ...]) -> Callable[[F], F]:
80-
def _decorator(func_or_coro: F) -> F:
80+
def _should_suppress_exception(
81+
exc: BaseException,
82+
predicate: Callable[[BaseException], bool] | None,
83+
func_name: str,
84+
) -> bool:
85+
if predicate is None:
86+
# No predicate provided, suppress all exceptions
87+
return True
88+
89+
try:
90+
return predicate(exc)
91+
except Exception as predicate_exc: # pylint: disable=broad-except
92+
# the predicate function raised an exception
93+
# log it and do not suppress the original exception
94+
_logger.warning(
95+
**create_troubleshootting_log_kwargs(
96+
f"Predicate function raised exception {type(predicate_exc).__name__}:{predicate_exc} in {func_name}. "
97+
f"Original exception will be re-raised: {type(exc).__name__}",
98+
error=predicate_exc,
99+
error_context={
100+
"func_name": func_name,
101+
"original_exception": f"{type(exc).__name__}",
102+
},
103+
tip="Predicate raised, please fix it.",
104+
)
105+
)
106+
return False
107+
108+
109+
def suppress_exceptions(
110+
exceptions: tuple[type[BaseException], ...],
111+
*,
112+
reason: str,
113+
predicate: Callable[[BaseException], bool] | None = None,
114+
) -> Callable[[F], F]:
115+
"""
116+
Decorator to suppress specified exceptions.
117+
118+
Args:
119+
exceptions: Tuple of exception types to suppress
120+
reason: Reason for suppression (for logging)
121+
predicate: Optional function to check exception attributes.
122+
If provided, exception is only suppressed if predicate returns True.
123+
124+
Example:
125+
# Suppress all ConnectionError exceptions
126+
@suppress_exceptions((ConnectionError,), reason="Network issues")
127+
def my_func(): ...
128+
129+
# Suppress only ConnectionError with specific errno
130+
@suppress_exceptions(
131+
(ConnectionError,),
132+
reason="Specific network error",
133+
predicate=lambda e: hasattr(e, 'errno') and e.errno == 104
134+
)
135+
def my_func(): ...
136+
"""
81137

138+
def _decorator(func_or_coro: F) -> F:
82139
if inspect.iscoroutinefunction(func_or_coro):
83140

84141
@wraps(func_or_coro)
85142
async def _async_wrapper(*args, **kwargs) -> Any:
86143
try:
87144
assert inspect.iscoroutinefunction(func_or_coro) # nosec
88145
return await func_or_coro(*args, **kwargs)
89-
except exceptions:
146+
except exceptions as exc:
147+
# Check if exception should be suppressed
148+
if not _should_suppress_exception(
149+
exc, predicate, func_or_coro.__name__
150+
):
151+
raise # Re-raise if predicate returns False or fails
152+
153+
_logger.debug(
154+
"Caught suppressed exception %s in %s: TIP: %s",
155+
exc,
156+
func_or_coro.__name__,
157+
reason,
158+
)
90159
return None
91160

92161
return _async_wrapper # type: ignore[return-value] # decorators typing is hard
@@ -95,7 +164,19 @@ async def _async_wrapper(*args, **kwargs) -> Any:
95164
def _sync_wrapper(*args, **kwargs) -> Any:
96165
try:
97166
return func_or_coro(*args, **kwargs)
98-
except exceptions:
167+
except exceptions as exc:
168+
# Check if exception should be suppressed
169+
if not _should_suppress_exception(
170+
exc, predicate, func_or_coro.__name__
171+
):
172+
raise # Re-raise if predicate returns False or fails
173+
174+
_logger.debug(
175+
"Caught suppressed exception %s in %s: TIP: %s",
176+
exc,
177+
func_or_coro.__name__,
178+
reason,
179+
)
99180
return None
100181

101182
return _sync_wrapper # type: ignore[return-value] # decorators typing is hard

packages/service-library/src/servicelib/fastapi/http_error.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from fastapi.responses import JSONResponse
1212
from pydantic import ValidationError
1313

14-
from ..logging_errors import create_troubleshotting_log_kwargs
14+
from ..logging_errors import create_troubleshootting_log_kwargs
1515
from ..status_codes_utils import is_5xx_server_error
1616

1717
validation_error_response_definition["properties"] = {
@@ -50,7 +50,7 @@ async def _http_error_handler(request: Request, exc: Exception) -> JSONResponse:
5050

5151
if is_5xx_server_error(status_code):
5252
_logger.exception(
53-
create_troubleshotting_log_kwargs(
53+
create_troubleshootting_log_kwargs(
5454
"Unexpected error happened in the Resource Usage Tracker. Please contact support.",
5555
error=exc,
5656
error_context={

packages/service-library/src/servicelib/fastapi/long_running_tasks/_context_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import Any, Final
66

77
from pydantic import PositiveFloat
8-
from servicelib.logging_errors import create_troubleshotting_log_message
8+
from servicelib.logging_errors import create_troubleshootting_log_message
99

1010
from ...long_running_tasks.errors import TaskClientTimeoutError, TaskExceptionError
1111
from ...long_running_tasks.models import (
@@ -130,7 +130,7 @@ async def _wait_for_task_result() -> Any:
130130
except Exception as e:
131131
error = TaskExceptionError(task_id=task_id, exception=e, traceback="")
132132
_logger.warning(
133-
create_troubleshotting_log_message(
133+
create_troubleshootting_log_message(
134134
user_error_msg=f"{task_id=} raised an exception",
135135
error=e,
136136
tip=f"Check the logs of the service responding to '{client.base_url}'",

packages/service-library/src/servicelib/logging_errors.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
_logger = logging.getLogger(__name__)
1111

1212

13-
def create_troubleshotting_log_message(
13+
def create_troubleshootting_log_message(
1414
user_error_msg: str,
1515
*,
1616
error: BaseException,
@@ -57,7 +57,7 @@ class LogKwargs(TypedDict):
5757
extra: LogExtra | None
5858

5959

60-
def create_troubleshotting_log_kwargs(
60+
def create_troubleshootting_log_kwargs(
6161
user_error_msg: str,
6262
*,
6363
error: BaseException,
@@ -76,7 +76,11 @@ def create_troubleshotting_log_kwargs(
7676
_logger.exception(
7777
**create_troubleshotting_log_kwargs(
7878
user_error_msg=frontend_msg,
79-
exception=exc,
79+
error=exc,
80+
error_context={
81+
"user_id": user_id,
82+
"product_name": product_name,
83+
},
8084
tip="Check row in `groups_extra_properties` for this product. It might be missing.",
8185
)
8286
)
@@ -88,7 +92,7 @@ def create_troubleshotting_log_kwargs(
8892
context.update(error.error_context())
8993

9094
# compose as log message
91-
log_msg = create_troubleshotting_log_message(
95+
log_msg = create_troubleshootting_log_message(
9296
user_error_msg,
9397
error=error,
9498
error_code=error_code,

packages/service-library/src/servicelib/long_running_tasks/http_endpoint_responses.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import Any
33

44
from common_library.error_codes import create_error_code
5-
from servicelib.logging_errors import create_troubleshotting_log_kwargs
5+
from servicelib.logging_errors import create_troubleshootting_log_kwargs
66

77
from .errors import TaskNotCompletedError, TaskNotFoundError
88
from .models import TaskBase, TaskId, TaskStatus
@@ -43,7 +43,7 @@ async def get_task_result(
4343
raise
4444
except Exception as exc:
4545
_logger.exception(
46-
**create_troubleshotting_log_kwargs(
46+
**create_troubleshootting_log_kwargs(
4747
user_error_msg=f"{task_id=} raised an exception",
4848
error=exc,
4949
error_code=create_error_code(exc),

packages/service-library/src/servicelib/redis/_decorators.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
import contextlib
32
import functools
43
import logging
54
import socket
@@ -10,6 +9,7 @@
109
import arrow
1110
import redis.exceptions
1211
from redis.asyncio.lock import Lock
12+
from servicelib.logging_errors import create_troubleshootting_log_kwargs
1313

1414
from ..background_task import periodic
1515
from ._client import RedisClientSDK
@@ -23,9 +23,9 @@
2323
R = TypeVar("R")
2424

2525
_EXCLUSIVE_TASK_NAME: Final[str] = "exclusive/{module_name}.{func_name}"
26-
_EXCLUSIVE_AUTO_EXTEND_TASK_NAME: Final[
27-
str
28-
] = "exclusive/autoextend_lock_{redis_lock_key}"
26+
_EXCLUSIVE_AUTO_EXTEND_TASK_NAME: Final[str] = (
27+
"exclusive/autoextend_lock_{redis_lock_key}"
28+
)
2929

3030

3131
@periodic(interval=DEFAULT_LOCK_TTL / 2, raise_on_error=True)
@@ -134,10 +134,26 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
134134
assert len(lock_lost_errors.exceptions) == 1 # nosec
135135
raise lock_lost_errors.exceptions[0] from eg
136136
finally:
137-
with contextlib.suppress(redis.exceptions.LockNotOwnedError):
137+
try:
138138
# in the case where the lock would have been lost,
139139
# this would raise again and is not necessary
140140
await lock.release()
141+
except redis.exceptions.LockNotOwnedError as exc:
142+
_logger.exception(
143+
**create_troubleshootting_log_kwargs(
144+
f"Unexpected error while releasing lock '{redis_lock_key}'",
145+
error=exc,
146+
error_context={
147+
"redis_lock_key": redis_lock_key,
148+
"lock_value": lock_value,
149+
"client_name": client.client_name,
150+
"hostname": socket.gethostname(),
151+
"coroutine": coro.__name__,
152+
},
153+
tip="This might happen if the lock was lost before releasing it. "
154+
"Look for synchronous code that prevents refreshing the lock or asyncio loop overload.",
155+
)
156+
)
141157

142158
return _wrapper
143159

0 commit comments

Comments
 (0)