Skip to content

Commit 06b744f

Browse files
author
Emanuele Palazzetti
authored
Merge pull request #362 from palazzem/python-futures
[tornado] patch concurrent.futures if available
2 parents c65eb5a + adf5915 commit 06b744f

File tree

11 files changed

+165
-63
lines changed

11 files changed

+165
-63
lines changed

ddtrace/context.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ def span_id(self):
5151
with self._lock:
5252
return self._parent_span_id
5353

54+
@property
55+
def sampled(self):
56+
"""Return current context sampled flag."""
57+
with self._lock:
58+
return self._sampled
59+
5460
@property
5561
def sampling_priority(self):
5662
"""Return current context sampling priority."""

ddtrace/contrib/tornado/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Auto instrumentation is available using the ``patch`` function that **must be called before**
44
importing the tornado library. The following is an example::
55
6-
# patch before importing tornado
6+
# patch before importing tornado and concurrent.futures
77
from ddtrace import tracer, patch
88
patch(tornado=True)
99
@@ -83,10 +83,10 @@ def notify(self):
8383
with require_modules(required_modules) as missing_modules:
8484
if not missing_modules:
8585
from .stack_context import run_with_trace_context, TracerStackContext
86-
from .patch import patch, unpatch
8786

88-
# alias for API compatibility
89-
context_provider = TracerStackContext
87+
context_provider = TracerStackContext()
88+
89+
from .patch import patch, unpatch
9090

9191
__all__ = [
9292
'patch',

ddtrace/contrib/tornado/application.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from tornado import template
44

5-
from . import decorators, TracerStackContext
5+
from . import decorators, context_provider
66
from .constants import CONFIG_KEY
77

88
from ...ext import AppTypes
@@ -37,7 +37,7 @@ def tracer_config(__init__, app, args, kwargs):
3737
# global tracer while here we can have a different instance (even if
3838
# this is not usual).
3939
tracer.configure(
40-
context_provider=TracerStackContext,
40+
context_provider=context_provider,
4141
wrap_executor=decorators.wrap_executor,
4242
enabled=settings.get('enabled', None),
4343
hostname=settings.get('agent_hostname', None),

ddtrace/contrib/tornado/compat.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from ..util import require_modules
2+
3+
4+
optional_modules = ['concurrent.futures']
5+
6+
with require_modules(optional_modules) as missing_modules:
7+
# detect if concurrent.futures is available as a Python
8+
# stdlib or Python 2.7 backport
9+
futures_available = len(missing_modules) == 0

ddtrace/contrib/tornado/futures.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import ddtrace
2+
3+
4+
def _wrap_submit(func, instance, args, kwargs):
5+
"""
6+
Wrap `Executor` method used to submit a work executed in another
7+
thread. This wrapper ensures that a new `Context` is created and
8+
properly propagated using an intermediate function.
9+
"""
10+
# propagate the same Context in the new thread
11+
current_ctx = ddtrace.tracer.context_provider.active()
12+
13+
# extract the target function that must be executed in
14+
# a new thread and the `target` arguments
15+
fn = args[0]
16+
fn_args = args[1:]
17+
return func(_wrap_execution, current_ctx, fn, fn_args, kwargs)
18+
19+
def _wrap_execution(ctx, fn, args, kwargs):
20+
"""
21+
Intermediate target function that is executed in a new thread;
22+
it receives the original function with arguments and keyword
23+
arguments, including our tracing `Context`. The current context
24+
provider sets the Active context in a thread local storage
25+
variable because it's outside the asynchronous loop.
26+
"""
27+
ddtrace.tracer.context_provider.activate(ctx)
28+
return fn(*args, **kwargs)

ddtrace/contrib/tornado/patch.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from wrapt import wrap_function_wrapper as _w
55

6-
from . import handlers, application, decorators, template, TracerStackContext
6+
from . import handlers, application, decorators, template, futures, compat, context_provider
77
from ...util import unwrap as _u
88

99

@@ -25,15 +25,19 @@ def patch():
2525
_w('tornado.web', 'RequestHandler.on_finish', handlers.on_finish)
2626
_w('tornado.web', 'RequestHandler.log_exception', handlers.log_exception)
2727

28-
# patch Tornado decorators
29-
_w('tornado.concurrent', 'run_on_executor', decorators._run_on_executor)
30-
3128
# patch Template system
3229
_w('tornado.template', 'Template.generate', template.generate)
3330

31+
# patch Python Futures when an Executor pool is used
32+
# TODO: this may be a generic module and should be moved
33+
# in a separate contributions when we want to support multi-threading
34+
# context propagation
35+
if compat.futures_available:
36+
_w('concurrent.futures', 'ThreadPoolExecutor.submit', futures._wrap_submit)
37+
3438
# configure the global tracer
3539
ddtrace.tracer.configure(
36-
context_provider=TracerStackContext,
40+
context_provider=context_provider,
3741
wrap_executor=decorators.wrap_executor,
3842
)
3943

@@ -53,3 +57,6 @@ def unpatch():
5357
_u(tornado.web.Application, '__init__')
5458
_u(tornado.concurrent, 'run_on_executor')
5559
_u(tornado.template.Template, 'generate')
60+
61+
if compat.futures_available:
62+
_u('concurrent.futures.ThreadPoolExecutor', 'submit')

ddtrace/contrib/tornado/stack_context.py

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
from tornado.ioloop import IOLoop
12
from tornado.stack_context import StackContextInconsistentError, _state
23

34
from ...context import Context
5+
from ...provider import DefaultContextProvider
46

57

6-
class TracerStackContext(object):
8+
class TracerStackContext(DefaultContextProvider):
79
"""
810
A context manager that manages ``Context`` instances in a thread-local state.
911
It must be used everytime a Tornado's handler or coroutine is used within a
@@ -18,8 +20,9 @@ class TracerStackContext(object):
1820
https://github.com/tornadoweb/tornado/issues/1063
1921
"""
2022
def __init__(self):
21-
self.active = True
22-
self.context = Context()
23+
super(TracerStackContext, self).__init__()
24+
self._active = True
25+
self._context = Context()
2326

2427
def enter(self):
2528
"""
@@ -52,27 +55,45 @@ def __exit__(self, type, value, traceback):
5255
self.new_contexts = None
5356

5457
def deactivate(self):
55-
self.active = False
58+
self._active = False
5659

57-
@classmethod
58-
def active(cls):
60+
def active(self):
5961
"""
6062
Return the ``Context`` from the current execution flow. This method can be
6163
used inside a Tornado coroutine to retrieve and use the current tracing context.
64+
If used in a separated Thread, the `_state` thread-local storage is used to
65+
propagate the current Active context from the `MainThread`.
6266
"""
63-
for ctx in reversed(_state.contexts[0]):
64-
if isinstance(ctx, cls) and ctx.active:
65-
return ctx.context
66-
67-
@classmethod
68-
def activate(cls, ctx):
67+
io_loop = getattr(IOLoop._current, 'instance', None)
68+
if io_loop is None:
69+
# if a Tornado loop is not available, it means that this method
70+
# has been called from a synchronous code, so we can rely in a
71+
# thread-local storage
72+
return self._local.get()
73+
else:
74+
# we're inside a Tornado loop so the TracerStackContext is used
75+
for stack in reversed(_state.contexts[0]):
76+
if isinstance(stack, self.__class__) and stack._active:
77+
return stack._context
78+
79+
def activate(self, ctx):
6980
"""
7081
Set the active ``Context`` for this async execution. If a ``TracerStackContext``
7182
is not found, the context is discarded.
83+
If used in a separated Thread, the `_state` thread-local storage is used to
84+
propagate the current Active context from the `MainThread`.
7285
"""
73-
for stack_ctx in reversed(_state.contexts[0]):
74-
if isinstance(stack_ctx, cls) and stack_ctx.active:
75-
stack_ctx.context = ctx
86+
io_loop = getattr(IOLoop._current, 'instance', None)
87+
if io_loop is None:
88+
# because we're outside of an asynchronous execution, we store
89+
# the current context in a thread-local storage
90+
self._local.set(ctx)
91+
else:
92+
# we're inside a Tornado loop so the TracerStackContext is used
93+
for stack_ctx in reversed(_state.contexts[0]):
94+
if isinstance(stack_ctx, self.__class__) and stack_ctx._active:
95+
stack_ctx._context = ctx
96+
return ctx
7697

7798

7899
def run_with_trace_context(func, *args, **kwargs):

tests/contrib/tornado/test_executor_decorator.py

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import unittest
33

44
from nose.tools import eq_, ok_
5+
from ddtrace.contrib.tornado.compat import futures_available
56

67
from tornado import version_info
78

@@ -19,12 +20,11 @@ def test_on_executor_handler(self):
1920
eq_(200, response.code)
2021

2122
traces = self.tracer.writer.pop_traces()
22-
eq_(2, len(traces))
23-
eq_(1, len(traces[0]))
24-
eq_(1, len(traces[1]))
23+
eq_(1, len(traces))
24+
eq_(2, len(traces[0]))
2525

2626
# this trace yields the execution of the thread
27-
request_span = traces[1][0]
27+
request_span = traces[0][0]
2828
eq_('tornado-web', request_span.service)
2929
eq_('tornado.request', request_span.name)
3030
eq_('http', request_span.span_type)
@@ -36,46 +36,39 @@ def test_on_executor_handler(self):
3636
ok_(request_span.duration >= 0.05)
3737

3838
# this trace is executed in a different thread
39-
executor_span = traces[0][0]
39+
executor_span = traces[0][1]
4040
eq_('tornado-web', executor_span.service)
4141
eq_('tornado.executor.with', executor_span.name)
4242
eq_(executor_span.parent_id, request_span.span_id)
4343
eq_(0, executor_span.error)
4444
ok_(executor_span.duration >= 0.05)
4545

46-
def test_on_delayed_executor_handler(self):
47-
# it should trace a handler that uses @run_on_executor but that doesn't
48-
# wait for its termination
49-
response = self.fetch('/executor_delayed_handler/')
46+
@unittest.skipUnless(futures_available, 'Futures must be available to test direct submit')
47+
def test_on_executor_submit(self):
48+
# it should propagate the context when a handler uses directly the `executor.submit()`
49+
response = self.fetch('/executor_submit_handler/')
5050
eq_(200, response.code)
5151

52-
# timeout for the background thread execution
53-
time.sleep(0.1)
54-
5552
traces = self.tracer.writer.pop_traces()
56-
eq_(2, len(traces))
57-
eq_(1, len(traces[0]))
58-
eq_(1, len(traces[1]))
59-
60-
# order the `traces` list to have deterministic results
61-
# (required only for this special use case)
62-
traces.sort(key=lambda x: x[0].name, reverse=True)
53+
eq_(1, len(traces))
54+
eq_(2, len(traces[0]))
6355

6456
# this trace yields the execution of the thread
6557
request_span = traces[0][0]
6658
eq_('tornado-web', request_span.service)
6759
eq_('tornado.request', request_span.name)
6860
eq_('http', request_span.span_type)
69-
eq_('tests.contrib.tornado.web.app.ExecutorDelayedHandler', request_span.resource)
61+
eq_('tests.contrib.tornado.web.app.ExecutorSubmitHandler', request_span.resource)
7062
eq_('GET', request_span.get_tag('http.method'))
7163
eq_('200', request_span.get_tag('http.status_code'))
72-
eq_('/executor_delayed_handler/', request_span.get_tag('http.url'))
64+
eq_('/executor_submit_handler/', request_span.get_tag('http.url'))
7365
eq_(0, request_span.error)
66+
ok_(request_span.duration >= 0.05)
7467

7568
# this trace is executed in a different thread
76-
executor_span = traces[1][0]
69+
executor_span = traces[0][1]
7770
eq_('tornado-web', executor_span.service)
78-
eq_('tornado.executor.with', executor_span.name)
71+
eq_('tornado.executor.query', executor_span.name)
7972
eq_(executor_span.parent_id, request_span.span_id)
8073
eq_(0, executor_span.error)
8174
ok_(executor_span.duration >= 0.05)
@@ -86,12 +79,11 @@ def test_on_executor_exception_handler(self):
8679
eq_(500, response.code)
8780

8881
traces = self.tracer.writer.pop_traces()
89-
eq_(2, len(traces))
90-
eq_(1, len(traces[0]))
91-
eq_(1, len(traces[1]))
82+
eq_(1, len(traces))
83+
eq_(2, len(traces[0]))
9284

9385
# this trace yields the execution of the thread
94-
request_span = traces[1][0]
86+
request_span = traces[0][0]
9587
eq_('tornado-web', request_span.service)
9688
eq_('tornado.request', request_span.name)
9789
eq_('http', request_span.span_type)
@@ -104,7 +96,7 @@ def test_on_executor_exception_handler(self):
10496
ok_('Exception: Ouch!' in request_span.get_tag('error.stack'))
10597

10698
# this trace is executed in a different thread
107-
executor_span = traces[0][0]
99+
executor_span = traces[0][1]
108100
eq_('tornado-web', executor_span.service)
109101
eq_('tornado.executor.with', executor_span.name)
110102
eq_(executor_span.parent_id, request_span.span_id)
@@ -123,12 +115,11 @@ def test_on_executor_custom_kwarg(self):
123115
eq_(200, response.code)
124116

125117
traces = self.tracer.writer.pop_traces()
126-
eq_(2, len(traces))
127-
eq_(1, len(traces[0]))
128-
eq_(1, len(traces[1]))
118+
eq_(1, len(traces))
119+
eq_(2, len(traces[0]))
129120

130121
# this trace yields the execution of the thread
131-
request_span = traces[1][0]
122+
request_span = traces[0][0]
132123
eq_('tornado-web', request_span.service)
133124
eq_('tornado.request', request_span.name)
134125
eq_('http', request_span.span_type)
@@ -140,7 +131,7 @@ def test_on_executor_custom_kwarg(self):
140131
ok_(request_span.duration >= 0.05)
141132

142133
# this trace is executed in a different thread
143-
executor_span = traces[0][0]
134+
executor_span = traces[0][1]
144135
eq_('tornado-web', executor_span.service)
145136
eq_('tornado.executor.with', executor_span.name)
146137
eq_(executor_span.parent_id, request_span.span_id)

tests/contrib/tornado/test_safety.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class TestAppSafety(TornadoTestCase):
4848
"""
4949
Ensure that the application patch has the proper safety guards.
5050
"""
51+
5152
def test_trace_unpatch(self):
5253
# the application must not be traced if unpatch() is called
5354
patch()
@@ -108,6 +109,27 @@ def test_arbitrary_resource_404(self):
108109
eq_('tornado.web.ErrorHandler', request_span.resource)
109110
eq_('/does_not_exist/', request_span.get_tag('http.url'))
110111

112+
@gen_test
113+
def test_futures_without_context(self):
114+
# ensures that if futures propagation is available, an empty
115+
# context doesn't crash the system
116+
from .web.compat import ThreadPoolExecutor
117+
118+
def job():
119+
with self.tracer.trace('job'):
120+
return 42
121+
122+
executor = ThreadPoolExecutor(max_workers=3)
123+
yield executor.submit(job)
124+
125+
traces = self.tracer.writer.pop_traces()
126+
eq_(1, len(traces))
127+
eq_(1, len(traces[0]))
128+
129+
# this trace yields the execution of the thread
130+
span = traces[0][0]
131+
eq_('job', span.name)
132+
111133

112134
class TestCustomAppSafety(TornadoTestCase):
113135
"""

0 commit comments

Comments
 (0)