Skip to content

Commit 9f459c3

Browse files
authored
♻️ refactor dv-2 dy-sidecar's API client (ITISFoundation#3121)
1 parent 8e1c4f2 commit 9f459c3

20 files changed

+1898
-505
lines changed

services/director-v2/src/simcore_service_director_v2/core/settings.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ class DynamicSidecarSettings(BaseCustomSettings):
183183
regex=SERVICE_NETWORK_RE,
184184
description="network all dynamic services are connected to",
185185
)
186+
DYNAMIC_SIDECAR_API_CLIENT_REQUEST_MAX_RETRIES: int = Field(
187+
4, description="maximum attempts to retry a request before giving up"
188+
)
186189
DYNAMIC_SIDECAR_API_REQUEST_TIMEOUT: PositiveFloat = Field(
187190
15.0,
188191
description=(

services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/scheduler.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,15 @@
1111
SimcoreServiceLabels,
1212
)
1313
from models_library.services_resources import ServiceResourcesDict
14-
from pydantic import BaseModel, Extra, Field, PositiveInt, constr
14+
from pydantic import (
15+
AnyHttpUrl,
16+
BaseModel,
17+
Extra,
18+
Field,
19+
PositiveInt,
20+
constr,
21+
parse_obj_as,
22+
)
1523

1624
from ..constants import (
1725
DYNAMIC_PROXY_SERVICE_PREFIX,
@@ -235,9 +243,11 @@ def can_save_state(self) -> bool:
235243
# consider adding containers for healthchecks but this is more difficult and it depends on each service
236244

237245
@property
238-
def endpoint(self):
246+
def endpoint(self) -> AnyHttpUrl:
239247
"""endpoint where all the services are exposed"""
240-
return f"http://{self.hostname}:{self.port}"
248+
return parse_obj_as(
249+
AnyHttpUrl, f"http://{self.hostname}:{self.port}" # NOSONAR
250+
)
241251

242252
@property
243253
def are_containers_ready(self) -> bool:
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from ._errors import BaseClientHTTPError, ClientHttpError, UnexpectedStatusError
2+
from ._public import (
3+
DynamicSidecarClient,
4+
get_dynamic_sidecar_client,
5+
get_dynamic_sidecar_service_health,
6+
setup,
7+
shutdown,
8+
)
9+
10+
__all__: tuple[str, ...] = (
11+
"BaseClientHTTPError",
12+
"ClientHttpError",
13+
"DynamicSidecarClient",
14+
"get_dynamic_sidecar_client",
15+
"get_dynamic_sidecar_service_health",
16+
"setup",
17+
"shutdown",
18+
"UnexpectedStatusError",
19+
)
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import asyncio
2+
import functools
3+
import inspect
4+
import logging
5+
from logging import Logger
6+
from typing import Any, Awaitable, Callable, Optional
7+
8+
from httpx import AsyncClient, ConnectError, HTTPError, PoolTimeout, Response
9+
from httpx._types import TimeoutTypes, URLTypes
10+
from tenacity import RetryCallState
11+
from tenacity._asyncio import AsyncRetrying
12+
from tenacity.before import before_log
13+
from tenacity.retry import retry_if_exception_type
14+
from tenacity.stop import stop_after_attempt
15+
from tenacity.wait import wait_exponential
16+
17+
from ._errors import ClientHttpError, UnexpectedStatusError, _WrongReturnType
18+
19+
logger = logging.getLogger(__name__)
20+
21+
22+
def _log_requests_in_pool(client: AsyncClient, event_name: str) -> None:
23+
# pylint: disable=protected-access
24+
logger.warning(
25+
"Requests while event '%s': %s",
26+
event_name.upper(),
27+
[
28+
(r.request.method, r.request.url, r.request.headers)
29+
for r in client._transport._pool._requests
30+
],
31+
)
32+
33+
34+
def _log_retry(log: Logger, max_retries: int) -> Callable[[RetryCallState], None]:
35+
def log_it(retry_state: RetryCallState) -> None:
36+
# pylint: disable=protected-access
37+
38+
assert retry_state.outcome # nosec
39+
e = retry_state.outcome.exception()
40+
assert isinstance(e, HTTPError) # nosec
41+
assert e._request # nosec
42+
43+
log.info(
44+
"[%s/%s]Retry. Unexpected %s while requesting '%s %s': %s",
45+
retry_state.attempt_number,
46+
max_retries,
47+
e.__class__.__name__,
48+
e._request.method,
49+
e._request.url,
50+
f"{e=}",
51+
)
52+
53+
return log_it
54+
55+
56+
def retry_on_errors(
57+
request_func: Callable[..., Awaitable[Response]]
58+
) -> Callable[..., Awaitable[Response]]:
59+
"""
60+
Will retry the request on `ConnectError` and `PoolTimeout`.
61+
Also wraps `httpx.HTTPError`
62+
raises:
63+
- `ClientHttpError`
64+
"""
65+
assert asyncio.iscoroutinefunction(request_func)
66+
67+
RETRY_ERRORS = (ConnectError, PoolTimeout)
68+
69+
@functools.wraps(request_func)
70+
async def request_wrapper(zelf: "BaseThinClient", *args, **kwargs) -> Response:
71+
# pylint: disable=protected-access
72+
try:
73+
async for attempt in AsyncRetrying(
74+
stop=stop_after_attempt(zelf._request_max_retries),
75+
wait=wait_exponential(min=1),
76+
retry=retry_if_exception_type(RETRY_ERRORS),
77+
before=before_log(logger, logging.DEBUG),
78+
after=_log_retry(logger, zelf._request_max_retries),
79+
reraise=True,
80+
):
81+
with attempt:
82+
r: Response = await request_func(zelf, *args, **kwargs)
83+
return r
84+
except HTTPError as e:
85+
if isinstance(e, PoolTimeout):
86+
_log_requests_in_pool(zelf._client, "pool timeout")
87+
raise ClientHttpError(e) from e
88+
89+
return request_wrapper
90+
91+
92+
def expect_status(expected_code: int):
93+
"""
94+
raises an `UnexpectedStatusError` if the request's status is different
95+
from `expected_code`
96+
NOTE: always apply after `retry_on_errors`
97+
98+
raises:
99+
- `UnexpectedStatusError`
100+
- `ClientHttpError`
101+
"""
102+
103+
def decorator(
104+
request_func: Callable[..., Awaitable[Response]]
105+
) -> Callable[..., Awaitable[Response]]:
106+
assert asyncio.iscoroutinefunction(request_func)
107+
108+
@functools.wraps(request_func)
109+
async def request_wrapper(zelf: "BaseThinClient", *args, **kwargs) -> Response:
110+
response = await request_func(zelf, *args, **kwargs)
111+
if response.status_code != expected_code:
112+
raise UnexpectedStatusError(response, expected_code)
113+
114+
return response
115+
116+
return request_wrapper
117+
118+
return decorator
119+
120+
121+
class BaseThinClient:
122+
SKIP_METHODS: set[str] = {"close"}
123+
124+
def __init__(
125+
self,
126+
*,
127+
request_max_retries: int,
128+
base_url: Optional[URLTypes] = None,
129+
timeout: Optional[TimeoutTypes] = None,
130+
) -> None:
131+
self._request_max_retries: int = request_max_retries
132+
133+
client_args: dict[str, Any] = {}
134+
if base_url:
135+
client_args["base_url"] = base_url
136+
if timeout:
137+
client_args["timeout"] = timeout
138+
self._client = AsyncClient(**client_args)
139+
140+
# ensure all user defined public methods return `httpx.Response`
141+
# NOTE: ideally these checks should be ran at import time!
142+
public_methods = [
143+
t[1]
144+
for t in inspect.getmembers(self, predicate=inspect.ismethod)
145+
if not (t[0].startswith("_") or t[0] in self.SKIP_METHODS)
146+
]
147+
148+
for method in public_methods:
149+
signature = inspect.signature(method)
150+
if signature.return_annotation != Response:
151+
raise _WrongReturnType(method, signature.return_annotation)
152+
153+
async def close(self) -> None:
154+
_log_requests_in_pool(self._client, "closing")
155+
await self._client.aclose()
156+
157+
async def __aenter__(self):
158+
return self
159+
160+
async def __aexit__(self, exc_t, exc_v, exc_tb):
161+
await self.close()
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
"""
2+
Exception hierarchy:
3+
4+
* BaseClientError
5+
x BaseRequestError
6+
+ ClientHttpError
7+
+ UnexpectedStatusError
8+
x WrongReturnType
9+
"""
10+
11+
from httpx import Response
12+
13+
14+
class BaseClientError(Exception):
15+
"""
16+
Used as based for all the raised errors
17+
"""
18+
19+
20+
class _WrongReturnType(BaseClientError):
21+
"""
22+
used internally to signal the user that the defined method
23+
has an invalid return time annotation
24+
"""
25+
26+
def __init__(self, method, return_annotation) -> None:
27+
super().__init__(
28+
(
29+
f"{method=} should return an instance "
30+
f"of {Response}, not '{return_annotation}'!"
31+
)
32+
)
33+
34+
35+
class BaseClientHTTPError(BaseClientError):
36+
"""Base class to wrap all http related client errors"""
37+
38+
39+
class ClientHttpError(BaseClientHTTPError):
40+
"""used to captures all httpx.HttpError"""
41+
42+
def __init__(self, error: Exception) -> None:
43+
super().__init__()
44+
self.error: Exception = error
45+
46+
47+
class UnexpectedStatusError(BaseClientHTTPError):
48+
"""raised when the status of the request is not the one it was expected"""
49+
50+
def __init__(self, response: Response, expecting: int) -> None:
51+
message = (
52+
f"Expected status: {expecting}, got {response.status_code} for: {response.url}: "
53+
f"headers={response.headers}, body='{response.text}'"
54+
)
55+
super().__init__(message)
56+
self.response = response

0 commit comments

Comments
 (0)