Skip to content

Commit e1c5994

Browse files
authored
New aiohttp instrumentation (#133)
* Add aiohttp to test requirements * aiohttp instrumentation & tests * Add aiohttp to exlude list on Python versions earlier than 3.5.3 * Update min py version for test packages * Additional way to inject headers * Fix asynqp version specifier * Exception and redirect handling, reporting & tests * Fix instrumentation message * aiohttp server instrumentation & piu (in omaggio): * aiohttp server instrumentation * aiohttp instrumented server that runs in background thread * Extend HTTP propagator to support CIMultiDict * First test for basic aiohttp client <--> server requests * Add support for custom headers and param scrubbing * Safer exception msg extraction * Client safeties * More test app routes * Moar tests * Loosen test reqs for platform variations * Do not load asyncio app on unsupported versions * Add Py 3.7 to tests * Do not run spyne/sudsjurko on 3.7 until support is added * Clean up version limiters * Travis work-around for Py 3.7 tests See travis-ci/travis-ci#9069 (comment) * Better msg extraction for inconsisten exception usage across libraries * Remove py3.7 because no rabbitmq support Time is coming to eventually migrate to CircleCI * Major/minor version specifiers only * Update reported span names * Update tests to follow changes. * In case of webapp handler issue, act accordingly * Include example application used for testing. * Change port number so as not to conflict with test suite
1 parent 02fe488 commit e1c5994

File tree

17 files changed

+1094
-30
lines changed

17 files changed

+1094
-30
lines changed

example/asyncio/README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Asyncio Examples
2+
3+
This directory includes an example asyncio application and client with aiohttp and asynqp used for testing.
4+
5+
# Requirements
6+
7+
* Python 3.5 or greater
8+
* instana, aiohttp and asynqp Python packages installed
9+
* A RabbitMQ server with it's location specified in the `RABBITMQ_HOST` environment variable
10+
11+
12+
# Run
13+
14+
* Make sure the Instana Python package is installed or you have this git repository checked out.
15+
16+
* Set the environment variable `AUTOWRAPT_BOOTSTRAP=instana` for immediate instrumentation.
17+
18+
* Boot the aiohttpserver.py file as follows. It will launch an aiohttp server that listens on port localhost:5102. See the source code for published endpoints.
19+
20+
```bash
21+
python aiohttpserver.py
22+
```
23+
24+
* Boot the `aiohttpclient.py` file to generate a request (every 1 second) to the aiohttp server.
25+
26+
```bash
27+
python aiohttpclient.py
28+
```
29+
30+
From here, you can modify the `aiohttpclient.py` file as needed to change requested paths and so on.
31+
32+
# Results
33+
34+
Some example traces from local tests.
35+
36+
aiohttp client calling aiohttp server:
37+
![screen shot 2019-02-25 at 19 12 28](https://user-images.githubusercontent.com/395132/53401921-0f49cc00-39b1-11e9-8606-24844925a478.png)
38+
39+
aiohttp server making multiple asynqp calls (publish & consume)
40+
![screen shot 2019-02-26 at 10 21 50](https://user-images.githubusercontent.com/395132/53401997-2e485e00-39b1-11e9-97fd-460b136cf92a.png)

example/asyncio/aioclient.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from __future__ import absolute_import
2+
3+
import aiohttp
4+
import asyncio
5+
6+
from instana.singletons import async_tracer, agent
7+
8+
async def test():
9+
while True:
10+
await asyncio.sleep(1)
11+
with async_tracer.start_active_span('JobRunner'):
12+
async with aiohttp.ClientSession() as session:
13+
async with session.get("http://localhost:5102/?secret=iloveyou") as response:
14+
print(response.status)
15+
16+
17+
loop = asyncio.get_event_loop()
18+
loop.run_until_complete(test())
19+
loop.run_forever()
20+

example/asyncio/aioserver.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import os
2+
import asyncio
3+
import asynqp
4+
from aiohttp import web
5+
6+
RABBITMQ_HOST = ""
7+
if "RABBITMQ_HOST" in os.environ:
8+
RABBITMQ_HOST = os.environ["RABBITMQ_HOST"]
9+
else:
10+
RABBITMQ_HOST = "localhost"
11+
12+
class RabbitUtil():
13+
14+
def __init__(self, loop):
15+
self.loop = loop
16+
self.loop.run_until_complete(self.connect())
17+
18+
@asyncio.coroutine
19+
def connect(self):
20+
# connect to the RabbitMQ broker
21+
self.connection = yield from asynqp.connect(RABBITMQ_HOST, 5672, username='guest', password='guest')
22+
23+
# Open a communications channel
24+
self.channel = yield from self.connection.open_channel()
25+
26+
# Create a queue and an exchange on the broker
27+
self.exchange = yield from self.channel.declare_exchange('test.exchange', 'direct')
28+
self.queue = yield from self.channel.declare_queue('test.queue')
29+
30+
# Bind the queue to the exchange, so the queue will get messages published to the exchange
31+
yield from self.queue.bind(self.exchange, 'routing.key')
32+
yield from self.queue.purge()
33+
34+
35+
@asyncio.coroutine
36+
def publish_msg(request):
37+
msg = asynqp.Message({'hello': 'world'})
38+
rabbit_util.exchange.publish(msg, 'routing.key')
39+
rabbit_util.exchange.publish(msg, 'routing.key')
40+
rabbit_util.exchange.publish(msg, 'routing.key')
41+
rabbit_util.exchange.publish(msg, 'routing.key')
42+
43+
msg = yield from rabbit_util.queue.get()
44+
45+
return web.Response(text='Published 4 messages. Got 1. %s' % str(msg))
46+
47+
48+
async def say_hello(request):
49+
return web.Response(text='Hello, world')
50+
51+
52+
async def four_hundred_one(request):
53+
return web.HTTPUnauthorized(reason="I must simulate errors.", text="Simulated server error.")
54+
55+
56+
async def five_hundred(request):
57+
return web.HTTPInternalServerError(reason="I must simulate errors.", text="Simulated server error.")
58+
59+
60+
loop = asyncio.new_event_loop()
61+
asyncio.set_event_loop(loop)
62+
63+
rabbit_util = RabbitUtil(loop)
64+
65+
app = web.Application(debug=False)
66+
app.add_routes([web.get('/', say_hello)])
67+
app.add_routes([web.get('/401', four_hundred_one)])
68+
app.add_routes([web.get('/500', five_hundred)])
69+
app.add_routes([web.get('/publish', publish_msg)])
70+
71+
runner = web.AppRunner(app)
72+
loop.run_until_complete(runner.setup())
73+
site = web.TCPSite(runner, 'localhost', 5102)
74+
75+
loop.run_until_complete(site.start())
76+
loop.run_forever()
77+

instana/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ def boot_agent():
6161

6262
if "INSTANA_DISABLE_AUTO_INSTR" not in os.environ:
6363
# Import & initialize instrumentation
64-
if (sys.version_info >= (3, 4)) and (sys.version_info < (3, 7)):
64+
if sys.version_info >= (3, 5, 3):
65+
from .instrumentation.aiohttp import client # noqa
66+
from .instrumentation.aiohttp import server # noqa
6567
from .instrumentation import asynqp # noqa
6668
from .instrumentation import logging # noqa
6769
from .instrumentation import mysqlpython # noqa

instana/http_propagator.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,26 @@ def inject(self, span_context, carrier):
5555
carrier.append((self.HEADER_KEY_S, span_id))
5656
carrier.append((self.HEADER_KEY_L, "1"))
5757
carrier.append((self.HEADER_KEY_ST, "intid;desc=%s" % trace_id))
58+
elif hasattr(carrier, '__setitem__'):
59+
carrier.__setitem__(self.HEADER_KEY_T, trace_id)
60+
carrier.__setitem__(self.HEADER_KEY_S, span_id)
61+
carrier.__setitem__(self.HEADER_KEY_L, "1")
62+
carrier.__setitem__(self.HEADER_KEY_ST, "intid;desc=%s" % trace_id)
5863
else:
5964
raise Exception("Unsupported carrier type", type(carrier))
6065

61-
except Exception as e:
62-
logger.debug("inject error: ", str(e))
66+
except:
67+
logger.debug("inject error:", exc_info=True)
6368

6469
def extract(self, carrier): # noqa
6570
trace_id = None
6671
span_id = None
6772

6873
try:
69-
if type(carrier) is dict or hasattr(carrier, "__dict__"):
74+
if type(carrier) is dict or hasattr(carrier, "__getitem__"):
7075
dc = carrier
76+
elif hasattr(carrier, "__dict__"):
77+
dc = carrier.__dict__
7178
elif type(carrier) is list:
7279
dc = dict(carrier)
7380
else:
@@ -97,4 +104,4 @@ def extract(self, carrier): # noqa
97104
return ctx
98105

99106
except Exception as e:
100-
logger.debug("extract error: ", str(e))
107+
logger.debug("extract error:", exc_info=True)

instana/instrumentation/aiohttp/__init__.py

Whitespace-only changes.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
from __future__ import absolute_import
2+
3+
import opentracing
4+
import wrapt
5+
6+
from ...log import logger
7+
from ...singletons import agent, async_tracer
8+
from ...util import strip_secrets
9+
10+
11+
try:
12+
import aiohttp
13+
import asyncio
14+
15+
async def stan_request_start(session, trace_config_ctx, params):
16+
try:
17+
parent_span = async_tracer.active_span
18+
19+
# If we're not tracing, just return
20+
if parent_span is None:
21+
return
22+
23+
scope = async_tracer.start_active_span("aiohttp-client", child_of=parent_span)
24+
trace_config_ctx.scope = scope
25+
26+
async_tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, params.headers)
27+
28+
parts = str(params.url).split('?')
29+
if len(parts) > 1:
30+
cleaned_qp = strip_secrets(parts[1], agent.secrets_matcher, agent.secrets_list)
31+
scope.span.set_tag("http.params", cleaned_qp)
32+
scope.span.set_tag("http.url", parts[0])
33+
scope.span.set_tag('http.method', params.method)
34+
except:
35+
logger.debug("stan_request_start", exc_info=True)
36+
37+
async def stan_request_end(session, trace_config_ctx, params):
38+
try:
39+
scope = trace_config_ctx.scope
40+
if scope is not None:
41+
scope.span.set_tag('http.status_code', params.response.status)
42+
43+
if 400 <= params.response.status <= 599:
44+
scope.span.set_tag("http.error", params.response.reason)
45+
scope.span.set_tag("error", True)
46+
ec = scope.span.tags.get('ec', 0)
47+
scope.span.set_tag("ec", ec + 1)
48+
49+
scope.close()
50+
except:
51+
logger.debug("stan_request_end", exc_info=True)
52+
53+
async def stan_request_exception(session, trace_config_ctx, params):
54+
try:
55+
scope = trace_config_ctx.scope
56+
if scope is not None:
57+
scope.span.log_exception(params.exception)
58+
scope.span.set_tag("http.error", str(params.exception))
59+
scope.close()
60+
except:
61+
logger.debug("stan_request_exception", exc_info=True)
62+
63+
@wrapt.patch_function_wrapper('aiohttp.client','ClientSession.__init__')
64+
def init_with_instana(wrapped, instance, argv, kwargs):
65+
instana_trace_config = aiohttp.TraceConfig()
66+
instana_trace_config.on_request_start.append(stan_request_start)
67+
instana_trace_config.on_request_end.append(stan_request_end)
68+
instana_trace_config.on_request_exception.append(stan_request_exception)
69+
if 'trace_configs' in kwargs:
70+
kwargs['trace_configs'].append(instana_trace_config)
71+
else:
72+
kwargs['trace_configs'] = [instana_trace_config]
73+
74+
return wrapped(*argv, **kwargs)
75+
76+
logger.debug("Instrumenting aiohttp client")
77+
except ImportError:
78+
pass
79+
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from __future__ import absolute_import
2+
3+
import opentracing
4+
import wrapt
5+
6+
from ...log import logger
7+
from ...singletons import agent, async_tracer
8+
from ...util import strip_secrets
9+
10+
11+
try:
12+
import aiohttp
13+
import asyncio
14+
15+
from aiohttp.web import middleware
16+
17+
@middleware
18+
async def stan_middleware(request, handler):
19+
try:
20+
ctx = async_tracer.extract(opentracing.Format.HTTP_HEADERS, request.headers)
21+
request['scope'] = async_tracer.start_active_span('aiohttp-server', child_of=ctx)
22+
scope = request['scope']
23+
24+
# Query param scrubbing
25+
url = str(request.url)
26+
parts = url.split('?')
27+
if len(parts) > 1:
28+
cleaned_qp = strip_secrets(parts[1], agent.secrets_matcher, agent.secrets_list)
29+
scope.span.set_tag("http.params", cleaned_qp)
30+
31+
scope.span.set_tag("http.url", parts[0])
32+
scope.span.set_tag("http.method", request.method)
33+
34+
# Custom header tracking support
35+
if agent.extra_headers is not None:
36+
for custom_header in agent.extra_headers:
37+
if custom_header in request.headers:
38+
scope.span.set_tag("http.%s" % custom_header, request.headers[custom_header])
39+
40+
response = await handler(request)
41+
42+
if response is not None:
43+
# Mark 500 responses as errored
44+
if 500 <= response.status <= 511:
45+
scope.span.set_tag("error", True)
46+
ec = scope.span.tags.get('ec', 0)
47+
if ec is 0:
48+
scope.span.set_tag("ec", ec + 1)
49+
50+
scope.span.set_tag("http.status_code", response.status)
51+
async_tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, response.headers)
52+
53+
return response
54+
except:
55+
logger.debug("aiohttp stan_middleware", exc_info=True)
56+
finally:
57+
if scope is not None:
58+
scope.close()
59+
60+
61+
@wrapt.patch_function_wrapper('aiohttp.web','Application.__init__')
62+
def init_with_instana(wrapped, instance, argv, kwargs):
63+
if "middlewares" in kwargs:
64+
kwargs["middlewares"].append(stan_middleware)
65+
else:
66+
kwargs["middlewares"] = [stan_middleware]
67+
68+
return wrapped(*argv, **kwargs)
69+
70+
logger.debug("Instrumenting aiohttp server")
71+
except ImportError:
72+
pass
73+

instana/recorder.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@
2323

2424

2525
class InstanaRecorder(SpanRecorder):
26-
registered_spans = ("django", "memcache", "mysql", "rabbitmq", "redis",
27-
"rpc-client", "rpc-server", "sqlalchemy", "soap", "urllib3", "wsgi", "log")
28-
http_spans = ("django", "wsgi", "urllib3", "soap")
29-
30-
exit_spans = ("memcache", "mysql", "rabbitmq", "redis", "rpc-client",
31-
"sqlalchemy", "soap", "urllib3", "log")
32-
entry_spans = ("django", "wsgi", "rabbitmq", "rpc-server")
26+
registered_spans = ("aiohttp-client", "aiohttp-server", "django", "log", "memcache", "mysql",
27+
"rabbitmq", "redis", "rpc-client", "rpc-server", "sqlalchemy", "soap",
28+
"urllib3", "wsgi")
29+
http_spans = ("aiohttp-client", "aiohttp-server", "django", "wsgi", "urllib3", "soap")
30+
31+
exit_spans = ("aiohttp-client", "log", "memcache", "mysql", "rabbitmq", "redis", "rpc-client",
32+
"sqlalchemy", "soap", "urllib3")
33+
entry_spans = ("aiohttp-server", "django", "wsgi", "rabbitmq", "rpc-server")
3334

3435
entry_kind = ["entry", "server", "consumer"]
3536
exit_kind = ["exit", "client", "producer"]

instana/span.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,10 @@ def finish(self, finish_time=None):
88
super(InstanaSpan, self).finish(finish_time)
99

1010
def log_exception(self, e):
11-
if hasattr(e, 'message') and len(e.message):
12-
self.log_kv({'message': e.message})
13-
elif hasattr(e, '__str__'):
14-
self.log_kv({'message': e.__str__()})
15-
else:
11+
if hasattr(e, '__str__'):
1612
self.log_kv({'message': str(e)})
13+
elif hasattr(e, 'message') and e.message is not None:
14+
self.log_kv({'message': e.message})
1715

1816
self.set_tag("error", True)
1917
ec = self.tags.get('ec', 0)

0 commit comments

Comments
 (0)