Skip to content

Commit 41132a9

Browse files
authored
🐛 Activate tracing in aiohttp-based services (ITISFoundation#2559)
* updates servicelib.tracing * doc director * remove general constraint * upgrades aiozipkin in servicelib * upgrades storage * upgrades webserver * fixes storage validation * cleanup datatypes in middlewares * sets tracing sync to get middleware in the outermost layer * sets tracing sync to get middleware in the outermost layer * annotations on application-setup * fixes test_login * creates aiohttp.typing_extension * restored aiozipkin to version 0.7.1 * removed old settings from storage * sorted environs * lower level of very verbose loggers * fixes storage: tracing middleware must always go after envelope middleware * removed tracing from director
1 parent 71579eb commit 41132a9

File tree

31 files changed

+375
-244
lines changed

31 files changed

+375
-244
lines changed

packages/service-library/requirements/_aiohttp.in

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88

99
openapi-core==0.12.0 # frozen until https://github.com/ITISFoundation/osparc-simcore/pull/1396 is CLOSED
1010
lazy-object-proxy~=1.4.3 # cannot upgrade due to contraints in openapi-core
11+
aiozipkin~=0.7.1 # 1.X has an issue with no content responses
12+
1113

1214
aiohttp
13-
aiozipkin
1415
aiopg[sa]
1516
jsonschema
1617
prometheus_client

packages/service-library/requirements/_aiohttp.txt

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,7 @@ aiohttp==3.7.4.post0
1313
aiopg==1.3.1
1414
# via -r requirements/_aiohttp.in
1515
aiozipkin==0.7.1
16-
# via
17-
# -c requirements/../../../requirements/constraints.txt
18-
# -c requirements/./../../../requirements/constraints.txt
19-
# -r requirements/_aiohttp.in
16+
# via -r requirements/_aiohttp.in
2017
async-timeout==3.0.1
2118
# via
2219
# aiohttp

packages/service-library/src/servicelib/aiohttp/application_setup.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ def setup(app: web.Application):
8080
...
8181
"""
8282
# TODO: resilience to failure. if this setup fails, then considering dependencies, is it fatal or app can start?
83+
# TODO: enforce signature as def setup(app: web.Application, **kwargs) -> web.Application
8384

8485
module_name = module_name.replace(".__init__", "")
8586
depends = depends or []
@@ -94,7 +95,7 @@ def setup(app: web.Application):
9495
# if passes config_enabled, invalidates info on section
9596
section = None
9697

97-
def decorate(setup_func):
98+
def _decorate(setup_func: Callable):
9899

99100
if "setup" not in setup_func.__name__:
100101
logger.warning("Rename '%s' to contain 'setup'", setup_func.__name__)
@@ -110,7 +111,7 @@ def setup_metadata() -> Dict:
110111

111112
# wrapper
112113
@functools.wraps(setup_func)
113-
def setup_wrapper(app: web.Application, *args, **kargs) -> bool:
114+
def _wrapper(app: web.Application, *args, **kargs) -> bool:
114115
# pre-setup
115116
logger.debug(
116117
"Setting up '%s' [%s; %s] ... ", module_name, category.name, depends
@@ -176,12 +177,12 @@ def setup_wrapper(app: web.Application, *args, **kargs) -> bool:
176177
)
177178
return completed
178179

179-
setup_wrapper.metadata = setup_metadata
180-
setup_wrapper.MARK = "setup"
180+
_wrapper.metadata = setup_metadata
181+
_wrapper.MARK = "setup"
181182

182-
return setup_wrapper
183+
return _wrapper
183184

184-
return decorate
185+
return _decorate
185186

186187

187188
def is_setup_function(fun):

packages/service-library/src/servicelib/aiohttp/rest_middlewares.py

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,18 @@
44
"""
55
import json
66
import logging
7+
from typing import Awaitable, Callable, Union
78

89
from aiohttp import web
10+
from aiohttp.web_request import Request
11+
from aiohttp.web_response import StreamResponse
912
from openapi_core.schema.exceptions import OpenAPIError
1013

1114
from ..utils import is_production_environ
1215
from .rest_models import ErrorItemType, ErrorType, LogMessageType
1316
from .rest_responses import (
1417
JSON_CONTENT_TYPE,
18+
_DataType,
1519
create_data_response,
1620
create_error_response,
1721
is_enveloped_from_map,
@@ -20,6 +24,7 @@
2024
)
2125
from .rest_utils import EnvelopeFactory
2226
from .rest_validators import OpenApiValidator
27+
from .typing_extension import Handler, Middleware
2328

2429
DEFAULT_API_VERSION = "v0"
2530

@@ -32,9 +37,8 @@ def is_api_request(request: web.Request, api_version: str) -> bool:
3237
return request.path.startswith(base_path)
3338

3439

35-
def error_middleware_factory(
36-
api_version: str = DEFAULT_API_VERSION, log_exceptions=True
37-
):
40+
def error_middleware_factory(api_version: str, log_exceptions=True) -> Middleware:
41+
3842
_is_prod: bool = is_production_environ()
3943

4044
def _process_and_raise_unexpected_error(request: web.BaseRequest, err: Exception):
@@ -59,7 +63,7 @@ def _process_and_raise_unexpected_error(request: web.BaseRequest, err: Exception
5963
raise resp
6064

6165
@web.middleware
62-
async def _middleware_handler(request: web.Request, handler):
66+
async def _middleware_handler(request: web.Request, handler: Handler):
6367
"""
6468
Ensure all error raised are properly enveloped and json responses
6569
"""
@@ -74,7 +78,7 @@ async def _middleware_handler(request: web.Request, handler):
7478
except web.HTTPError as err:
7579
# TODO: differenciate between server/client error
7680
if not err.reason:
77-
err.reason = "Unexpected error"
81+
err.set_status(err.status_code, reason="Unexpected error")
7882

7983
err.content_type = JSON_CONTENT_TYPE
8084

@@ -126,9 +130,9 @@ async def _middleware_handler(request: web.Request, handler):
126130
return _middleware_handler
127131

128132

129-
def validate_middleware_factory(api_version: str = DEFAULT_API_VERSION):
133+
def validate_middleware_factory(api_version: str) -> Middleware:
130134
@web.middleware
131-
async def _middleware_handler(request: web.Request, handler):
135+
async def _middleware_handler(request: web.Request, handler: Handler):
132136
"""
133137
Validates requests against openapi specs and extracts body, params, etc ...
134138
Validate response against openapi specs
@@ -172,31 +176,42 @@ async def _middleware_handler(request: web.Request, handler):
172176
return _middleware_handler
173177

174178

175-
def envelope_middleware_factory(api_version: str = DEFAULT_API_VERSION):
179+
_ResponseOrBodyData = Union[StreamResponse, _DataType]
180+
HandlerFlexible = Callable[[Request], Awaitable[_ResponseOrBodyData]]
181+
MiddlewareFlexible = Callable[[Request, HandlerFlexible], Awaitable[StreamResponse]]
182+
183+
184+
def envelope_middleware_factory(api_version: str) -> MiddlewareFlexible:
185+
# FIXME: This data conversion is very error-prone. Use decorators instead!
176186
_is_prod: bool = is_production_environ()
177187

178188
@web.middleware
179-
async def _middleware_handler(request: web.Request, handler):
189+
async def _middleware_handler(
190+
request: web.Request, handler: HandlerFlexible
191+
) -> StreamResponse:
180192
"""
181193
Ensures all responses are enveloped as {'data': .. , 'error', ...} in json
194+
ONLY for API-requests
182195
"""
183196
if not is_api_request(request, api_version):
184-
return await handler(request)
197+
resp = await handler(request)
198+
assert isinstance(resp, StreamResponse) # nosec
199+
return resp
185200

186-
resp = await handler(request)
201+
# NOTE: the return values of this handler
202+
resp: _ResponseOrBodyData = await handler(request)
187203

188-
if isinstance(resp, web.FileResponse): # allows for files to be downloaded
204+
if isinstance(resp, web.FileResponse):
189205
return resp
190206

191-
if not isinstance(resp, web.Response):
192-
response = create_data_response(
207+
if not isinstance(resp, StreamResponse):
208+
resp = create_data_response(
193209
data=resp,
194210
skip_internal_error_details=_is_prod,
195211
)
196-
else:
197-
# Enforced by user. Should check it is json?
198-
response = resp
199-
return response
212+
213+
assert isinstance(resp, web.StreamResponse) # nosec
214+
return resp
200215

201216
# adds identifier (mostly for debugging)
202217
_middleware_handler.__middleware_name__ = f"{__name__}.envelope_{api_version}"

packages/service-library/src/servicelib/aiohttp/rest_responses.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
JsonLikeModel = Union[Dict[str, Any], List[Dict[str, Any]]]
1919

20+
_DataType = Union[str, Dict[str, Any], List[Any]]
21+
2022

2123
def is_enveloped_from_map(payload: Mapping) -> bool:
2224
return all(k in ENVELOPE_KEYS for k in payload.keys() if not str(k).startswith("_"))
@@ -66,7 +68,7 @@ def unwrap_envelope(payload: Dict[str, Any]) -> Tuple:
6668

6769

6870
def create_data_response(
69-
data: Union[Mapping, str], *, skip_internal_error_details=False
71+
data: _DataType, *, skip_internal_error_details=False
7072
) -> web.Response:
7173
response = None
7274
try:

packages/service-library/src/servicelib/aiohttp/tracing.py

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,48 @@
33
"""
44
import asyncio
55
import logging
6-
from typing import Dict, Optional
6+
from typing import Iterable, Optional, Union
77

88
import aiozipkin as az
9-
import trafaret as T
109
from aiohttp import web
11-
from pydantic import AnyHttpUrl, BaseSettings
10+
from aiohttp.web import AbstractRoute
11+
from aiozipkin.tracer import Tracer
12+
from yarl import URL
1213

1314
log = logging.getLogger(__name__)
1415

1516

1617
def setup_tracing(
17-
app: web.Application, app_name: str, host: str, port: int, config: Dict
18+
app: web.Application,
19+
*,
20+
service_name: str,
21+
host: str,
22+
port: int,
23+
jaeger_base_url: Union[URL, str],
24+
skip_routes: Optional[Iterable[AbstractRoute]] = None,
1825
) -> bool:
19-
zipkin_address = f"{config['zipkin_endpoint']}/api/v2/spans"
20-
endpoint = az.create_endpoint(app_name, ipv4=host, port=port)
21-
loop = asyncio.get_event_loop()
22-
tracer = loop.run_until_complete(
23-
az.create(zipkin_address, endpoint, sample_rate=1.0)
26+
"""
27+
Sets up this service for a distributed tracing system
28+
using zipkin (https://zipkin.io/) and Jaeger (https://www.jaegertracing.io/)
29+
"""
30+
zipkin_address = URL(f"{jaeger_base_url}") / "api/v2/spans"
31+
32+
log.debug(
33+
"Settings up tracing for %s at %s:%d -> %s",
34+
service_name,
35+
host,
36+
port,
37+
zipkin_address,
2438
)
25-
az.setup(app, tracer)
26-
return True
2739

40+
endpoint = az.create_endpoint(service_name, ipv4=host, port=port)
2841

29-
schema = T.Dict(
30-
{
31-
T.Key("enabled", default=True, optional=True): T.Or(T.Bool(), T.ToInt),
32-
T.Key("zipkin_endpoint", default="http://jaeger:9411"): T.String(),
33-
}
34-
)
42+
tracer: Tracer = asyncio.get_event_loop().run_until_complete(
43+
az.create(f"{zipkin_address}", endpoint, sample_rate=1.0)
44+
)
3545

46+
# WARNING: adds a middleware that should be the outermost since
47+
# it expects stream responses while we allow data returns from a handler
48+
az.setup(app, tracer, skip_routes=skip_routes)
3649

37-
class TracingSettings(BaseSettings):
38-
enabled: Optional[bool] = True
39-
zipkin_endpoint: AnyHttpUrl = "http://jaeger:9411"
50+
return True
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from typing import Awaitable, Callable
2+
3+
from aiohttp.web import Request, StreamResponse
4+
5+
# Taken from aiohttp.web_middlewares import _Handler, _Middleware
6+
Handler = Callable[[Request], Awaitable[StreamResponse]]
7+
Middleware = Callable[[Request, Handler], Awaitable[StreamResponse]]
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# pylint: disable=redefined-outer-name
2+
# pylint: disable=unused-argument
3+
# pylint: disable=unused-variable
4+
5+
from asyncio import AbstractEventLoop
6+
from typing import Callable
7+
8+
import pytest
9+
from aiohttp import web
10+
from aiohttp.client_reqrep import ClientResponse
11+
from aiohttp.test_utils import TestClient
12+
from servicelib.aiohttp.rest_responses import _collect_http_exceptions
13+
from servicelib.aiohttp.tracing import setup_tracing
14+
15+
DEFAULT_JAEGER_BASE_URL = "http://jaeger:9411"
16+
17+
18+
@pytest.fixture()
19+
def client(
20+
loop: AbstractEventLoop, aiohttp_client: Callable, aiohttp_unused_port: Callable
21+
) -> TestClient:
22+
ports = [aiohttp_unused_port() for _ in range(2)]
23+
24+
async def redirect(request: web.Request) -> web.Response:
25+
return web.HTTPFound(location="/return/200")
26+
27+
async def return_response(request: web.Request) -> web.Response:
28+
code = int(request.match_info["code"])
29+
return web.Response(status=code)
30+
31+
async def raise_response(request: web.Request):
32+
status_code = int(request.match_info["code"])
33+
status_to_http_exception = _collect_http_exceptions()
34+
http_exception_cls = status_to_http_exception[status_code]
35+
raise http_exception_cls(
36+
reason=f"raised from raised_error with code {status_code}"
37+
)
38+
39+
async def skip(request: web.Request):
40+
return web.HTTPServiceUnavailable(reason="should not happen")
41+
42+
app = web.Application()
43+
app.add_routes(
44+
[
45+
web.get("/redirect", redirect),
46+
web.get("/return/{code}", return_response),
47+
web.get("/raise/{code}", raise_response),
48+
web.get("/skip", skip, name="skip"),
49+
]
50+
)
51+
52+
print("Resources:")
53+
for resource in app.router.resources():
54+
print(resource)
55+
56+
# UNDER TEST ---
57+
# SEE RoutesView to understand how resources can be iterated to get routes
58+
resource = app.router["skip"]
59+
routes_in_a_resource = list(resource)
60+
61+
setup_tracing(
62+
app,
63+
service_name=f"{__name__}.client",
64+
host="127.0.0.1",
65+
port=ports[0],
66+
jaeger_base_url=DEFAULT_JAEGER_BASE_URL,
67+
skip_routes=routes_in_a_resource,
68+
)
69+
70+
return loop.run_until_complete(
71+
aiohttp_client(app, server_kwargs={"port": ports[0]})
72+
)
73+
74+
75+
async def test_setup_tracing(client: TestClient):
76+
# NOTE: version aiozipkin 1.1.0 did not pass this test
77+
78+
res: ClientResponse
79+
80+
# on error
81+
for code in (web.HTTPOk.status_code, web.HTTPBadRequest.status_code):
82+
res = await client.get(f"/return/{code}")
83+
84+
assert res.status == code, await res.text()
85+
res = await client.get(f"/raise/{code}")
86+
assert res.status == code, await res.text()
87+
88+
res = await client.get("/redirect")
89+
# TODO: check it was redirected
90+
assert res.status == 200, await res.text()
91+
92+
res = await client.get("/skip")
93+
assert res.status == web.HTTPServiceUnavailable.status_code
94+
95+
# using POST instead of GET -> HTTPMethodNotAllowed
96+
res = await client.post("/skip")
97+
assert res.status == web.HTTPMethodNotAllowed.status_code, "GET and not POST"

requirements/constraints.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ urllib3>=1.26.5 # https://github.com/advisories/GH
2525
# TODO: https://aioredis.readthedocs.io/en/latest/migration/)
2626
aioredis<2.0.0
2727
# with new released version 1.0.0 (https://github.com/aio-libs/aiozipkin/releases).
28-
# TODO: Check again when moving to py3.8-3.9! SEE https://github.com/ITISFoundation/osparc-simcore/pull/2025
29-
aiozipkin==0.7.1
3028
# TODO: includes async features https://docs.sqlalchemy.org/en/14/changelog/migration_20.html
3129
sqlalchemy<2.0
3230

0 commit comments

Comments
 (0)