Skip to content

Commit cdd0471

Browse files
authored
Asyncio: Carry context along with Tasks (#139)
* Carry context across ensure_future calls * Add test without context * Add support for create_task (with tests) * Add package configurator module * Add section describing configuratior * Use configurator in tests
1 parent 5ed0ac5 commit cdd0471

File tree

7 files changed

+244
-1
lines changed

7 files changed

+244
-1
lines changed

Configuration.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,22 @@ or
2525
instana.service_name = "myservice"
2626
```
2727

28+
## Package Configuration
29+
30+
The Instana package includes a runtime configuration module that manages the configuration of various components.
31+
32+
_Note: as the package evolves, more options will be added here_
33+
34+
```python
35+
from instana.configurator import config
36+
37+
# To enable tracing context propagation across Asyncio ensure_future and create_task calls
38+
# Default is false
39+
config['asyncio_task_context_propagation']['enabled'] = True
40+
41+
```
42+
43+
2844
## Debugging & More Verbosity
2945

3046
Setting `INSTANA_DEV` to a non nil value will enable extra logging output generally useful

instana/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def boot_agent():
6262
if "INSTANA_DISABLE_AUTO_INSTR" not in os.environ:
6363
# Import & initialize instrumentation
6464
if sys.version_info >= (3, 5, 3):
65+
from .instrumentation import asyncio # noqa
6566
from .instrumentation.aiohttp import client # noqa
6667
from .instrumentation.aiohttp import server # noqa
6768
from .instrumentation import asynqp # noqa

instana/configurator.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from __future__ import absolute_import
2+
from collections import defaultdict
3+
4+
# This file contains a config object that will hold configuration options for the package.
5+
# Defaults are set and can be overridden after package load.
6+
7+
8+
# Simple implementation of a nested dictionary.
9+
#
10+
# Same as:
11+
# stan_dictionary = lambda: defaultdict(stan_dictionary)
12+
# but we use the function form because of PEP 8
13+
#
14+
def stan_dictionary():
15+
return defaultdict(stan_dictionary)
16+
17+
18+
# La Protagonista
19+
config = stan_dictionary()
20+
21+
22+
# This option determines if tasks created via asyncio (with ensure_future or create_task) will
23+
# automatically carry existing context into the created task.
24+
config['asyncio_task_context_propagation']['enabled'] = False
25+
26+
27+
28+

instana/instrumentation/asyncio.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from __future__ import absolute_import
2+
3+
import wrapt
4+
5+
from ..log import logger
6+
from ..singletons import async_tracer
7+
from ..configurator import config
8+
9+
try:
10+
import asyncio
11+
12+
@wrapt.patch_function_wrapper('asyncio','ensure_future')
13+
def ensure_future_with_instana(wrapped, instance, argv, kwargs):
14+
if config['asyncio_task_context_propagation']['enabled'] is False:
15+
return wrapped(*argv, **kwargs)
16+
17+
scope = async_tracer.scope_manager.active
18+
task = wrapped(*argv, **kwargs)
19+
20+
if scope is not None:
21+
async_tracer.scope_manager._set_task_scope(scope, task=task)
22+
23+
return task
24+
25+
if hasattr(asyncio, "create_task"):
26+
@wrapt.patch_function_wrapper('asyncio','create_task')
27+
def create_task_with_instana(wrapped, instance, argv, kwargs):
28+
if config['asyncio_task_context_propagation']['enabled'] is False:
29+
return wrapped(*argv, **kwargs)
30+
31+
scope = async_tracer.scope_manager.active
32+
task = wrapped(*argv, **kwargs)
33+
34+
if scope is not None:
35+
async_tracer.scope_manager._set_task_scope(scope, task=task)
36+
37+
return task
38+
39+
logger.debug("Instrumenting asyncio")
40+
except ImportError:
41+
pass

runtests.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
command_line = [__file__, '--verbose']
66

77
if (LooseVersion(sys.version) < LooseVersion('3.5.3')):
8-
command_line.extend(['-e', 'asynqp', '-e', 'aiohttp'])
8+
command_line.extend(['-e', 'asynqp', '-e', 'aiohttp', '-e', 'async'])
99

1010
if (LooseVersion(sys.version) >= LooseVersion('3.7.0')):
1111
command_line.extend(['-e', 'sudsjurko'])

tests/test_asyncio.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
from __future__ import absolute_import
2+
3+
import asyncio
4+
import unittest
5+
6+
import aiohttp
7+
8+
from instana.singletons import async_tracer
9+
from instana.configurator import config
10+
11+
from .helpers import testenv
12+
13+
14+
class TestAsyncio(unittest.TestCase):
15+
def setUp(self):
16+
""" Clear all spans before a test run """
17+
self.recorder = async_tracer.recorder
18+
self.recorder.clear_spans()
19+
20+
# New event loop for every test
21+
self.loop = asyncio.new_event_loop()
22+
asyncio.set_event_loop(None)
23+
24+
# Restore default
25+
config['asyncio_task_context_propagation']['enabled'] = False
26+
27+
def tearDown(self):
28+
""" Purge the queue """
29+
pass
30+
31+
async def fetch(self, session, url, headers=None):
32+
try:
33+
async with session.get(url, headers=headers) as response:
34+
return response
35+
except aiohttp.web_exceptions.HTTPException:
36+
pass
37+
38+
def test_ensure_future_with_context(self):
39+
async def run_later(msg="Hello"):
40+
# print("run_later: %s" % async_tracer.active_span.operation_name)
41+
async with aiohttp.ClientSession() as session:
42+
return await self.fetch(session, testenv["wsgi_server"] + "/")
43+
44+
async def test():
45+
with async_tracer.start_active_span('test'):
46+
asyncio.ensure_future(run_later("Hello"))
47+
await asyncio.sleep(0.5)
48+
49+
# Override default task context propagation
50+
config['asyncio_task_context_propagation']['enabled'] = True
51+
52+
self.loop.run_until_complete(test())
53+
54+
spans = self.recorder.queued_spans()
55+
self.assertEqual(3, len(spans))
56+
57+
test_span = spans[0]
58+
wsgi_span = spans[1]
59+
aioclient_span = spans[2]
60+
61+
self.assertEqual(test_span.t, wsgi_span.t)
62+
self.assertEqual(test_span.t, aioclient_span.t)
63+
64+
self.assertEqual(test_span.p, None)
65+
self.assertEqual(wsgi_span.p, aioclient_span.s)
66+
self.assertEqual(aioclient_span.p, test_span.s)
67+
68+
def test_ensure_future_without_context(self):
69+
async def run_later(msg="Hello"):
70+
# print("run_later: %s" % async_tracer.active_span.operation_name)
71+
async with aiohttp.ClientSession() as session:
72+
return await self.fetch(session, testenv["wsgi_server"] + "/")
73+
74+
async def test():
75+
with async_tracer.start_active_span('test'):
76+
asyncio.ensure_future(run_later("Hello"))
77+
await asyncio.sleep(0.5)
78+
79+
self.loop.run_until_complete(test())
80+
81+
spans = self.recorder.queued_spans()
82+
83+
self.assertEqual(2, len(spans))
84+
self.assertEqual("sdk", spans[0].n)
85+
self.assertEqual("wsgi", spans[1].n)
86+
87+
# Without the context propagated, we should get two separate traces
88+
self.assertNotEqual(spans[0].t, spans[1].t)
89+
90+
if hasattr(asyncio, "create_task"):
91+
def test_create_task_with_context(self):
92+
async def run_later(msg="Hello"):
93+
# print("run_later: %s" % async_tracer.active_span.operation_name)
94+
async with aiohttp.ClientSession() as session:
95+
return await self.fetch(session, testenv["wsgi_server"] + "/")
96+
97+
async def test():
98+
with async_tracer.start_active_span('test'):
99+
asyncio.create_task(run_later("Hello"))
100+
await asyncio.sleep(0.5)
101+
102+
# Override default task context propagation
103+
config['asyncio_task_context_propagation']['enabled'] = True
104+
105+
self.loop.run_until_complete(test())
106+
107+
spans = self.recorder.queued_spans()
108+
self.assertEqual(3, len(spans))
109+
110+
test_span = spans[0]
111+
wsgi_span = spans[1]
112+
aioclient_span = spans[2]
113+
114+
self.assertEqual(test_span.t, wsgi_span.t)
115+
self.assertEqual(test_span.t, aioclient_span.t)
116+
117+
self.assertEqual(test_span.p, None)
118+
self.assertEqual(wsgi_span.p, aioclient_span.s)
119+
self.assertEqual(aioclient_span.p, test_span.s)
120+
121+
def test_create_task_without_context(self):
122+
async def run_later(msg="Hello"):
123+
# print("run_later: %s" % async_tracer.active_span.operation_name)
124+
async with aiohttp.ClientSession() as session:
125+
return await self.fetch(session, testenv["wsgi_server"] + "/")
126+
127+
async def test():
128+
with async_tracer.start_active_span('test'):
129+
asyncio.create_task(run_later("Hello"))
130+
await asyncio.sleep(0.5)
131+
132+
self.loop.run_until_complete(test())
133+
134+
spans = self.recorder.queued_spans()
135+
136+
self.assertEqual(2, len(spans))
137+
self.assertEqual("sdk", spans[0].n)
138+
self.assertEqual("wsgi", spans[1].n)
139+
140+
# Without the context propagated, we should get two separate traces
141+
self.assertNotEqual(spans[0].t, spans[1].t)

tests/test_configurator.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from __future__ import absolute_import
2+
3+
import unittest
4+
5+
from instana.configurator import config
6+
7+
8+
class TestRedis(unittest.TestCase):
9+
def setUp(self):
10+
pass
11+
12+
def tearDown(self):
13+
pass
14+
15+
def test_has_default_config(self):
16+
self.assertEqual(config['asyncio_task_context_propagation']['enabled'], False)

0 commit comments

Comments
 (0)