Skip to content

Commit 74846fc

Browse files
author
Emanuele Palazzetti
committed
[tornado] patch concurrent.futures to propagate Tornado context from the main thread
1 parent 73a3019 commit 74846fc

File tree

6 files changed

+120
-11
lines changed

6 files changed

+120
-11
lines changed

ddtrace/contrib/tornado/futures.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from ddtrace import tracer
2+
from ddtrace.context import Context
3+
4+
5+
def _wrap_submit(func, instance, args, kwargs):
6+
"""
7+
Wrap `Executor` method used to submit a work executed in another
8+
thread. This wrapper ensures that a new `Context` is created and
9+
properly propagated using an intermediate function.
10+
"""
11+
# create a new Context with the right active Span
12+
# TODO: the current implementation doesn't provide the GlobalTracer
13+
# singleton, so we should rely in our top-level import
14+
ctx = Context()
15+
current_ctx = tracer.get_call_context()
16+
ctx._current_span = current_ctx._current_span
17+
18+
# extract the target function that must be executed in
19+
# a new thread and the `target` arguments
20+
fn = args[0]
21+
fn_args = args[1:]
22+
return func(_wrap_execution, ctx, fn, fn_args, kwargs)
23+
24+
def _wrap_execution(ctx, fn, args, kwargs):
25+
"""
26+
Intermediate target function that is executed in a new thread;
27+
it receives the original function with arguments and keyword
28+
arguments, including our tracing `Context`. The current context
29+
provider sets the Active context in a thread local storage
30+
variable because it's outside the asynchronous loop.
31+
"""
32+
tracer.context_provider.activate(ctx)
33+
return fn(*args, **kwargs)

ddtrace/contrib/tornado/patch.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import ddtrace
22
import tornado
3+
import concurrent
34

45
from wrapt import wrap_function_wrapper as _w
56

6-
from . import handlers, application, decorators, template, TracerStackContext
7+
from . import handlers, application, decorators, template, futures, TracerStackContext
78
from ...util import unwrap as _u
89

910

@@ -31,6 +32,12 @@ def patch():
3132
# patch Template system
3233
_w('tornado.template', 'Template.generate', template.generate)
3334

35+
# patch Python Futures when an Executor pool is used
36+
# TODO: this may be a generic module and should be moved
37+
# in a separate contributions when we want to support multi-threading
38+
# context propagation
39+
_w('concurrent.futures', 'ThreadPoolExecutor.submit', futures._wrap_submit)
40+
3441
# configure the global tracer
3542
ddtrace.tracer.configure(
3643
context_provider=TracerStackContext,
@@ -53,3 +60,4 @@ def unpatch():
5360
_u(tornado.web.Application, '__init__')
5461
_u(tornado.concurrent, 'run_on_executor')
5562
_u(tornado.template.Template, 'generate')
63+
_u(concurrent.futures.ThreadPoolExecutor, 'submit')

ddtrace/contrib/tornado/stack_context.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from tornado.ioloop import IOLoop
12
from tornado.stack_context import StackContextInconsistentError, _state
23

34
from ...context import Context
@@ -59,20 +60,39 @@ def active(cls):
5960
"""
6061
Return the ``Context`` from the current execution flow. This method can be
6162
used inside a Tornado coroutine to retrieve and use the current tracing context.
63+
If used in a separated Thread, the `_state` thread-local storage is used to
64+
propagate the current Active context from the `MainThread`.
6265
"""
63-
for ctx in reversed(_state.contexts[0]):
64-
if isinstance(ctx, cls) and ctx.active:
65-
return ctx.context
66+
io_loop = getattr(IOLoop._current, 'instance', None)
67+
if io_loop is None:
68+
# if a Tornado loop is not available, it means that this method
69+
# has been called from a synchronous code, so we can rely in a
70+
# thread-local storage
71+
return getattr(_state, '__datadog_context', None)
72+
else:
73+
# we're inside a Tornado loop so the TracerStackContext is used
74+
for ctx in reversed(_state.contexts[0]):
75+
if isinstance(ctx, cls) and ctx.active:
76+
return ctx.context
6677

6778
@classmethod
6879
def activate(cls, ctx):
6980
"""
7081
Set the active ``Context`` for this async execution. If a ``TracerStackContext``
7182
is not found, the context is discarded.
83+
If used in a separated Thread, the `_state` thread-local storage is used to
84+
propagate the current Active context from the `MainThread`.
7285
"""
73-
for stack_ctx in reversed(_state.contexts[0]):
74-
if isinstance(stack_ctx, cls) and stack_ctx.active:
75-
stack_ctx.context = ctx
86+
io_loop = getattr(IOLoop._current, 'instance', None)
87+
if io_loop is None:
88+
# because we're outside of an asynchronous execution, we store
89+
# the current context in a thread-local storage
90+
setattr(_state, '__datadog_context', ctx)
91+
else:
92+
# we're inside a Tornado loop so the TracerStackContext is used
93+
for stack_ctx in reversed(_state.contexts[0]):
94+
if isinstance(stack_ctx, cls) and stack_ctx.active:
95+
stack_ctx.context = ctx
7696

7797

7898
def run_with_trace_context(func, *args, **kwargs):

tests/contrib/tornado/test_executor_decorator.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,36 @@ def test_on_executor_handler(self):
4343
eq_(0, executor_span.error)
4444
ok_(executor_span.duration >= 0.05)
4545

46+
def test_on_executor_submit(self):
47+
# it should propagate the context when a handler uses directly the `executor.submit()`
48+
response = self.fetch('/executor_submit_handler/')
49+
eq_(200, response.code)
50+
51+
traces = self.tracer.writer.pop_traces()
52+
eq_(2, len(traces))
53+
eq_(1, len(traces[0]))
54+
eq_(1, len(traces[1]))
55+
56+
# this trace yields the execution of the thread
57+
request_span = traces[1][0]
58+
eq_('tornado-web', request_span.service)
59+
eq_('tornado.request', request_span.name)
60+
eq_('http', request_span.span_type)
61+
eq_('tests.contrib.tornado.web.app.ExecutorSubmitHandler', request_span.resource)
62+
eq_('GET', request_span.get_tag('http.method'))
63+
eq_('200', request_span.get_tag('http.status_code'))
64+
eq_('/executor_submit_handler/', request_span.get_tag('http.url'))
65+
eq_(0, request_span.error)
66+
ok_(request_span.duration >= 0.05)
67+
68+
# this trace is executed in a different thread
69+
executor_span = traces[0][0]
70+
eq_('tornado-web', executor_span.service)
71+
eq_('tornado.executor.query', executor_span.name)
72+
eq_(executor_span.parent_id, request_span.span_id)
73+
eq_(0, executor_span.error)
74+
ok_(executor_span.duration >= 0.05)
75+
4676
def test_on_delayed_executor_handler(self):
4777
# it should trace a handler that uses @run_on_executor but that doesn't
4878
# wait for its termination

tests/contrib/tornado/utils.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22

33
from ddtrace.contrib.tornado import patch, unpatch
44

5-
from .web import app
6-
from .web.compat import reload_module
5+
from .web import app, compat
76
from ...test_tracer import get_dummy_tracer
87

98

@@ -16,7 +15,8 @@ class TornadoTestCase(AsyncHTTPTestCase):
1615
def get_app(self):
1716
# patch Tornado and reload module app
1817
patch()
19-
reload_module(app)
18+
compat.reload_module(compat)
19+
compat.reload_module(app)
2020
# create a dummy tracer and a Tornado web application
2121
self.tracer = get_dummy_tracer()
2222
settings = self.get_settings()
@@ -34,4 +34,5 @@ def tearDown(self):
3434
super(TornadoTestCase, self).tearDown()
3535
# unpatch Tornado
3636
unpatch()
37-
reload_module(app)
37+
compat.reload_module(compat)
38+
compat.reload_module(app)

tests/contrib/tornado/web/app.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,22 @@ def get(self):
159159
self.write('OK')
160160

161161

162+
class ExecutorSubmitHandler(tornado.web.RequestHandler):
163+
executor = ThreadPoolExecutor(max_workers=3)
164+
165+
def query(self):
166+
tracer = self.settings['datadog_trace']['tracer']
167+
with tracer.trace('tornado.executor.query'):
168+
time.sleep(0.05)
169+
170+
@tornado.gen.coroutine
171+
def get(self):
172+
# run the query in another Executor, without using
173+
# Tornado decorators
174+
yield self.executor.submit(self.query)
175+
self.write('OK')
176+
177+
162178
class ExecutorDelayedHandler(tornado.web.RequestHandler):
163179
# used automatically by the @run_on_executor decorator
164180
executor = ThreadPoolExecutor(max_workers=3)
@@ -296,6 +312,7 @@ def make_app(settings={}):
296312
(r'/template_exception/', TemplateExceptionHandler),
297313
# handlers that spawn new threads
298314
(r'/executor_handler/', ExecutorHandler),
315+
(r'/executor_submit_handler/', ExecutorSubmitHandler),
299316
(r'/executor_delayed_handler/', ExecutorDelayedHandler),
300317
(r'/executor_custom_handler/', ExecutorCustomHandler),
301318
(r'/executor_custom_args_handler/', ExecutorCustomArgsHandler),

0 commit comments

Comments
 (0)