Skip to content

Commit 72b800f

Browse files
author
Peter Giacomo Lombardo
authored
New ASGI Instrumentation for FastAPI & Starlette (#282)
* Load GRPC server on-demand only * Tests: Support launching threads with args * Python 2 compatibility * Avoid using lambda for test suite: multiprocessing & pickle * New test suite dependencies * Add support & tests for byte based context keys * Set test env vars * Reorg middleware exports * When in test, we can always send * Check for empty first * Add support for byte based context headers * New ASGI middleware for FastAPI & Starlette * Add ASGI as registered span * Tests: FastAPI background server & tests * Check potential byte based value * Better custom header capture * Set custom header options before launching background process * Vanilla, synthetic and custom header capture tests * Tests: Secret scrubbing * Path Templates support & tests * FastAPI Test server cleanup * Test path templates always * Starlette tests: background server and tests * Version limiters for CircleCI * Fix version string * Version limit uvicorn * Fix Python 2 support * Dont use __getitem__: Final answer * Update opentracing version * Update basictracer version * When in test, use a multiprocess queue * Code comments * Simplify multiprocess launching * In tests, pause for spans to land * Break up aiohttp tests * More robust extraction * Assure package loaded * Remove redundant sleeps * Pause to let spans settle * Starlette requires aiofiles * Better conversion for Tornado headers class * Unify, cleanup & fix context propagators * Fix binary propagator tests * Safeties, maturities and updated tests * Cleanup: remove debug & pydoc * Cleanup; Remove debug logs * Maturity Refactoring * Breakout gunicorn detection * Make log package independent to avoid circular import issues * Reload gunicorn on AutoTrace * New Test Helper: launch_traced_request
1 parent 6a4a4a4 commit 72b800f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1873
-737
lines changed

instana/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,10 @@ def boot_agent():
125125
# Import & initialize instrumentation
126126
from .instrumentation.aws import lambda_inst
127127

128+
if sys.version_info >= (3, 6, 0):
129+
from .instrumentation import fastapi_inst
130+
from .instrumentation import starlette_inst
131+
128132
if sys.version_info >= (3, 5, 3):
129133
from .instrumentation import asyncio
130134
from .instrumentation.aiohttp import client

instana/agent/host.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ def can_send(self):
102102
Are we in a state where we can send data?
103103
@return: Boolean
104104
"""
105+
if "INSTANA_TEST" in os.environ:
106+
return True
107+
105108
# Watch for pid change (fork)
106109
self.last_fork_check = datetime.now()
107110
current_pid = os.getpid()

instana/collector/aws_fargate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""
2-
Snapshot & metrics collection for AWS Fargate
2+
AWS Fargate Collector: Manages the periodic collection of metrics & snapshot data
33
"""
44
import os
55
import json

instana/collector/aws_lambda.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""
2-
Snapshot & metrics collection for AWS Lambda
2+
AWS Lambda Collector: Manages the periodic collection of metrics & snapshot data
33
"""
44
from ..log import logger
55
from .base import BaseCollector

instana/collector/base.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,15 @@ def __init__(self, agent):
2929
self.THREAD_NAME = "Instana Collector"
3030

3131
# The Queue where we store finished spans before they are sent
32-
self.span_queue = queue.Queue()
32+
if env_is_test:
33+
# Override span queue with a multiprocessing version
34+
# The test suite runs background applications - some in background threads,
35+
# others in background processes. This multiprocess queue allows us to collect
36+
# up spans from all sources.
37+
import multiprocessing
38+
self.span_queue = multiprocessing.Queue()
39+
else:
40+
self.span_queue = queue.Queue()
3341

3442
# The Queue where we store finished profiles before they are sent
3543
self.profile_queue = queue.Queue()

instana/collector/host.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
"""
2-
Snapshot & metrics collection for AWS Fargate
2+
Host Collector: Manages the periodic collection of metrics & snapshot data
33
"""
44
from time import time
55
from ..log import logger
66
from .base import BaseCollector
77
from ..util import DictionaryOfStan
8-
from ..singletons import env_is_test
98
from .helpers.runtime import RuntimeHelper
109

1110

instana/instrumentation/asgi.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
"""
2+
Instana ASGI Middleware
3+
"""
4+
import opentracing
5+
6+
from ..log import logger
7+
from ..singletons import async_tracer, agent
8+
from ..util import strip_secrets_from_query
9+
10+
class InstanaASGIMiddleware:
11+
"""
12+
Instana ASGI Middleware
13+
"""
14+
def __init__(self, app):
15+
self.app = app
16+
17+
def _extract_custom_headers(self, span, headers):
18+
try:
19+
for custom_header in agent.options.extra_http_headers:
20+
# Headers are in the following format: b'x-header-1'
21+
for header_pair in headers:
22+
if header_pair[0].decode('utf-8').lower() == custom_header.lower():
23+
span.set_tag("http.%s" % custom_header, header_pair[1].decode('utf-8'))
24+
except Exception:
25+
logger.debug("extract_custom_headers: ", exc_info=True)
26+
27+
def _collect_kvs(self, scope, span):
28+
try:
29+
span.set_tag('http.path', scope.get('path'))
30+
span.set_tag('http.method', scope.get('method'))
31+
32+
server = scope.get('server')
33+
if isinstance(server, tuple):
34+
span.set_tag('http.host', server[0])
35+
36+
query = scope.get('query_string')
37+
if isinstance(query, (str, bytes)) and len(query):
38+
if isinstance(query, bytes):
39+
query = query.decode('utf-8')
40+
scrubbed_params = strip_secrets_from_query(query, agent.options.secrets_matcher, agent.options.secrets_list)
41+
span.set_tag("http.params", scrubbed_params)
42+
43+
app = scope.get('app')
44+
if app is not None and hasattr(app, 'routes'):
45+
# Attempt to detect the Starlette routes registered.
46+
# If Starlette isn't present, we harmlessly dump out.
47+
from starlette.routing import Match
48+
for route in scope['app'].routes:
49+
if route.matches(scope)[0] == Match.FULL:
50+
span.set_tag("http.path_tpl", route.path)
51+
except Exception:
52+
logger.debug("ASGI collect_kvs: ", exc_info=True)
53+
54+
55+
async def __call__(self, scope, receive, send):
56+
request_context = None
57+
58+
if scope["type"] not in ("http", "websocket"):
59+
await self.app(scope, receive, send)
60+
return
61+
62+
request_headers = scope.get('headers')
63+
if isinstance(request_headers, list):
64+
request_context = async_tracer.extract(opentracing.Format.BINARY, request_headers)
65+
66+
async def send_wrapper(response):
67+
span = async_tracer.active_span
68+
if span is None:
69+
await send(response)
70+
else:
71+
if response['type'] == 'http.response.start':
72+
try:
73+
status_code = response.get('status')
74+
if status_code is not None:
75+
if 500 <= int(status_code) <= 511:
76+
span.mark_as_errored()
77+
span.set_tag('http.status_code', status_code)
78+
79+
headers = response.get('headers')
80+
if headers is not None:
81+
async_tracer.inject(span.context, opentracing.Format.BINARY, headers)
82+
except Exception:
83+
logger.debug("send_wrapper: ", exc_info=True)
84+
85+
try:
86+
await send(response)
87+
except Exception as exc:
88+
span.log_exception(exc)
89+
raise
90+
91+
with async_tracer.start_active_span("asgi", child_of=request_context) as tracing_scope:
92+
self._collect_kvs(scope, tracing_scope.span)
93+
if 'headers' in scope and agent.options.extra_http_headers is not None:
94+
self._extract_custom_headers(tracing_scope.span, scope['headers'])
95+
96+
try:
97+
await self.app(scope, receive, send_wrapper)
98+
except Exception as exc:
99+
tracing_scope.span.log_exception(exc)
100+
raise exc
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
"""
2+
Instrumentation for FastAPI
3+
https://fastapi.tiangolo.com/
4+
"""
5+
try:
6+
import fastapi
7+
import wrapt
8+
import signal
9+
import os
10+
11+
from ..log import logger
12+
from ..util import running_in_gunicorn
13+
from .asgi import InstanaASGIMiddleware
14+
from starlette.middleware import Middleware
15+
16+
@wrapt.patch_function_wrapper('fastapi.applications', 'FastAPI.__init__')
17+
def init_with_instana(wrapped, instance, args, kwargs):
18+
middleware = kwargs.get('middleware')
19+
if middleware is None:
20+
kwargs['middleware'] = [Middleware(InstanaASGIMiddleware)]
21+
elif isinstance(middleware, list):
22+
middleware.append(Middleware(InstanaASGIMiddleware))
23+
24+
return wrapped(*args, **kwargs)
25+
26+
logger.debug("Instrumenting FastAPI")
27+
28+
# Reload GUnicorn when we are instrumenting an already running application
29+
if "INSTANA_MAGIC" in os.environ and running_in_gunicorn():
30+
os.kill(os.getpid(), signal.SIGHUP)
31+
32+
except ImportError:
33+
pass

instana/instrumentation/pyramid/tweens.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@
1212

1313
class InstanaTweenFactory(object):
1414
"""A factory that provides Instana instrumentation tween for Pyramid apps"""
15-
15+
1616
def __init__(self, handler, registry):
1717
self.handler = handler
1818

1919
def __call__(self, request):
20-
ctx = tracer.extract(ot.Format.HTTP_HEADERS, request.headers)
20+
ctx = tracer.extract(ot.Format.HTTP_HEADERS, dict(request.headers))
2121
scope = tracer.start_active_span('http', child_of=ctx)
2222

2323
scope.span.set_tag(ext.SPAN_KIND, ext.SPAN_KIND_RPC_SERVER)
@@ -42,7 +42,7 @@ def __call__(self, request):
4242
response = None
4343
try:
4444
response = self.handler(request)
45-
45+
4646
tracer.inject(scope.span.context, ot.Format.HTTP_HEADERS, response.headers)
4747
response.headers['Server-Timing'] = "intid;desc=%s" % scope.span.context.trace_id
4848
except HTTPException as e:
@@ -53,21 +53,21 @@ def __call__(self, request):
5353

5454
# we need to explicitly populate the `message` tag with an error here
5555
# so that it's picked up from an SDK span
56-
scope.span.set_tag("message", str(e))
56+
scope.span.set_tag("message", str(e))
5757
scope.span.log_exception(e)
58-
58+
5959
logger.debug("Pyramid Instana tween", exc_info=True)
6060
finally:
6161
if response:
6262
scope.span.set_tag("http.status", response.status_int)
63-
63+
6464
if 500 <= response.status_int <= 511:
6565
if response.exception is not None:
6666
message = str(response.exception)
6767
scope.span.log_exception(response.exception)
6868
else:
6969
message = response.status
70-
70+
7171
scope.span.set_tag("message", message)
7272
scope.span.assure_errored()
7373

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""
2+
Instrumentation for Starlette
3+
https://www.starlette.io/
4+
"""
5+
try:
6+
import starlette
7+
import wrapt
8+
from ..log import logger
9+
from .asgi import InstanaASGIMiddleware
10+
from starlette.middleware import Middleware
11+
12+
@wrapt.patch_function_wrapper('starlette.applications', 'Starlette.__init__')
13+
def init_with_instana(wrapped, instance, args, kwargs):
14+
middleware = kwargs.get('middleware')
15+
if middleware is None:
16+
kwargs['middleware'] = [Middleware(InstanaASGIMiddleware)]
17+
elif isinstance(middleware, list):
18+
middleware.append(Middleware(InstanaASGIMiddleware))
19+
20+
return wrapped(*args, **kwargs)
21+
22+
logger.debug("Instrumenting Starlette")
23+
except ImportError:
24+
pass

0 commit comments

Comments
 (0)