Skip to content

Commit dba3260

Browse files
Kyle-VerhoogP403n1x87jdbrettlangdon
authored
fix: contextvar usage (#1936)
* refactor trace building Instead of storing a mutable Context object in the contextvar, the active span/context is stored instead. This removes the need to have to clone the context which means that ddtrace will now work out of the box with async frameworks that use contextvars. * fix OT with the new context management * Fix sampling * remove deprecation for context_provider * add current_execution_id function * use execution id to group traces this matches the previous functionality although we'll probably want to change this in the near future * Revert "use execution id to group traces" This reverts commit 1d53a5c. * various updates - formatting - added lock to _Trace - updated tests - docstrings * support setting sampling_priority through context * fix gevent tests * fix asyncio * enable partial flushing by default Since multiple execution traces are no longer handled individually it makes sense to enable partial flushing by default for long running cross-execution traces. * use get * fix aiohttp tests * store traces per tracer instance * formatting * test fixes * fix futures test * fix parenting issue for non active spans * fix flask, asyncio and tornado tests * clean up - remove unrelated formatting - remove new public apis (can be added later) - add back context.sampling_priority * sampling cleanup, backwards compatibility * more cleanup - revert unnecessary changes * revert active() returning None * handle trace-level tags Trace-level tags like origin and sampling priority belong to a trace and it's cumbersome to try to keep on the root span. Context seems like the reasonable place for these things although if feels a bit icky to overload context with it for a couple of reasons: - the context tags have to be shared between spans (could be easy to forget) - it overloads the context to support being a "non-local span" and storing trace level state. Ideally we could split this functionality into two structures... maybe TraceContext and NonLocalSpan...? * fix opentracing + gevent tests * fix span.context active span * rename methods * get span from context if possible * remove useless properties * thread-safe context * Add context test cases * Remove backwards compatibility This is done for several reasons: - The existing functionality is undefined in a lot of cases so it doesn't make sense to support. - Backwards support is not entirely possible since we are no longer in control of the context. One example of this: previously the active span of a context could be accessed from outside the execution that the span was created in via the span.context. This is no longer possible since the execution context is abstracted away. - Adding backwards support for some of the features led to complexity which is largely unnecessary with the above points in mind. * update grpc test sampling priority is actually unnecessary in these traces * Clean up docs * use lock instead of copying * Use attr for context * Add update tags api * Revert context fix, fix tests The context fix isn't required with the new definition of context * Apply suggestions from code review thank u jd and gab Co-authored-by: Gabriele N. Tornetta <[email protected]> Co-authored-by: Julien Danjou <[email protected]> * Doc updates * Fix typehint * get_call_context -> current_trace_context The old API naming no longer made sense as Context is not tied to a call stack. I think that trace context is more clear as to what the context is used for. * update tests * Reinstate get_call_context but deprecate it * Expand on context documentation * Implement local root as span attribute This makes local root lookup constant time and a bit more defined at the cost of another span attribute. * Fix tests * Address code review comments - Doc fixes/updates - Direct return * Handle possibly None context * Inline type hints * delete init context attrs, fix eq, add unit tests * Fix get_call_context and add back tests It should always return a context * Fix context attrs * Move updating active span to context provider Previously the active span updating was being done in the tracer._on_span_finish callback which may not be called from the same execution in which the span was created (eg. when a span is finished in a future's callback). This could result in the wrong span being activated (in the execution in which the span finished). This has a disadvantage however. There is a possibility of a memory leak when the following happens: there are executions A and B. A span is created in A and finished in B. When this happens then a reference to the span in kept in the contextvar for A. It will be removed/updated when the context provider is queried for an active span or if a span is activated. If this doesn't occur then the span (and any spans referenced from the span) will be kept in memory so long as the contextvars context lives. The worst-case here is that a really large trace is created in an execution and the spans are finished in another and no subsequent tracing is done for the rest of the application lifecycle. Then this trace will remain in memory for so long as the executor exists. * Fix docs * Add docs for cross-execution tracing * Move self.enabled shortcut to after warning logic * Doc cleanup * Add comment for why the active span is replaced * Store newly created contexts on a span * Fixup the context documentation Co-authored-by: Gabriele N. Tornetta <[email protected]> Co-authored-by: Julien Danjou <[email protected]> Co-authored-by: Brett Langdon <[email protected]>
1 parent bf9cf67 commit dba3260

35 files changed

+700
-577
lines changed

ddtrace/context.py

Lines changed: 69 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -1,98 +1,102 @@
11
import threading
22
from typing import Optional
33
from typing import TYPE_CHECKING
4+
from typing import Text
5+
6+
import attr
47

58
from .constants import ORIGIN_KEY
69
from .constants import SAMPLING_PRIORITY_KEY
10+
from .internal.compat import NumericType
711
from .internal.logger import get_logger
812
from .utils.deprecation import deprecated
9-
from .utils.formats import asbool
10-
from .utils.formats import get_env
1113

1214

1315
if TYPE_CHECKING:
14-
from ddtrace import Span
16+
from .span import Span
17+
from .span import _MetaDictType
18+
from .span import _MetricDictType
1519

1620
log = get_logger(__name__)
1721

1822

23+
@attr.s(eq=False, slots=True)
1924
class Context(object):
25+
"""Represents the state required to propagate a trace across execution
26+
boundaries.
2027
"""
21-
Context is used to keep track of a hierarchy of spans for the current
22-
execution flow. During each logical execution, the same ``Context`` is
23-
used to represent a single logical trace, even if the trace is built
24-
asynchronously.
25-
26-
A single code execution may use multiple ``Context`` if part of the execution
27-
must not be related to the current tracing. As example, a delayed job may
28-
compose a standalone trace instead of being related to the same trace that
29-
generates the job itself. On the other hand, if it's part of the same
30-
``Context``, it will be related to the original trace.
31-
32-
This data structure is thread-safe.
33-
"""
34-
35-
_partial_flush_enabled = asbool(get_env("tracer", "partial_flush_enabled", default=False))
36-
_partial_flush_min_spans = int(get_env("tracer", "partial_flush_min_spans", default=500)) # type: ignore[arg-type]
37-
38-
def __init__(
39-
self,
40-
trace_id=None, # type: Optional[int]
41-
span_id=None, # type: Optional[int]
42-
sampling_priority=None, # type: Optional[int]
43-
dd_origin=None, # type: Optional[str]
44-
):
45-
# type: (...) -> None
46-
"""
47-
Initialize a new thread-safe ``Context``.
48-
49-
:param int trace_id: trace_id of parent span
50-
:param int span_id: span_id of parent span
51-
"""
52-
self._current_span = None # type: Optional[Span]
53-
self._lock = threading.RLock()
5428

55-
self._parent_trace_id = trace_id
56-
self._parent_span_id = span_id
57-
self._sampling_priority = sampling_priority
58-
self._local_root_span = None # type: Optional[Span]
59-
self.dd_origin = dd_origin
29+
trace_id = attr.ib(default=None, type=Optional[int])
30+
span_id = attr.ib(default=None, type=Optional[int])
31+
_dd_origin = attr.ib(default=None, type=Optional[str], repr=False)
32+
_sampling_priority = attr.ib(default=None, type=Optional[NumericType], repr=False)
33+
_lock = attr.ib(factory=threading.RLock, type=threading.RLock, repr=False)
34+
_meta = attr.ib(factory=dict) # type: _MetaDictType
35+
_metrics = attr.ib(factory=dict) # type: _MetricDictType
36+
37+
def __attrs_post_init__(self):
38+
self.dd_origin = self._dd_origin
39+
self.sampling_priority = self._sampling_priority
40+
del self._dd_origin
41+
del self._sampling_priority
42+
43+
def __eq__(self, other):
44+
with self._lock:
45+
return (
46+
self.trace_id == other.trace_id
47+
and self.span_id == other.span_id
48+
and self._meta == other._meta
49+
and self._metrics == other._metrics
50+
)
6051

61-
@property
62-
def trace_id(self):
63-
"""Return current context trace_id."""
52+
def _with_span(self, span):
53+
# type: (Span) -> Context
54+
"""Return a shallow copy of the context with the given span."""
6455
with self._lock:
65-
return self._parent_trace_id
56+
ctx = self.__class__(trace_id=span.trace_id, span_id=span.span_id)
57+
ctx._lock = self._lock
58+
ctx._meta = self._meta
59+
ctx._metrics = self._metrics
60+
return ctx
6661

67-
@property
68-
def span_id(self):
69-
"""Return current context span_id."""
62+
def _update_tags(self, span):
63+
# type: (Span) -> None
7064
with self._lock:
71-
return self._parent_span_id
65+
span.meta.update(self._meta)
66+
span.metrics.update(self._metrics)
7267

7368
@property
7469
def sampling_priority(self):
75-
"""Return current context sampling priority."""
76-
with self._lock:
77-
return self._sampling_priority
70+
# type: () -> Optional[NumericType]
71+
"""Return the context sampling priority for the trace."""
72+
return self._metrics.get(SAMPLING_PRIORITY_KEY)
7873

7974
@sampling_priority.setter
8075
def sampling_priority(self, value):
81-
# type: (int) -> None
82-
"""Set sampling priority."""
76+
# type: (Optional[NumericType]) -> None
8377
with self._lock:
84-
self._sampling_priority = value
78+
if value is None:
79+
if SAMPLING_PRIORITY_KEY in self._metrics:
80+
del self._metrics[SAMPLING_PRIORITY_KEY]
81+
return
82+
self._metrics[SAMPLING_PRIORITY_KEY] = value
8583

86-
def _clone(self):
87-
# type: () -> Context
84+
@property
85+
def dd_origin(self):
86+
# type: () -> Optional[Text]
87+
"""Get the origin of the trace."""
88+
return self._meta.get(ORIGIN_KEY)
89+
90+
@dd_origin.setter
91+
def dd_origin(self, value):
92+
# type: (Optional[Text]) -> None
93+
"""Set the origin of the trace."""
8894
with self._lock:
89-
new_ctx = Context(
90-
trace_id=self._parent_trace_id,
91-
span_id=self._parent_span_id,
92-
sampling_priority=self._sampling_priority,
93-
)
94-
new_ctx._current_span = self._current_span
95-
return new_ctx
95+
if value is None:
96+
if ORIGIN_KEY in self._meta:
97+
del self._meta[ORIGIN_KEY]
98+
return
99+
self._meta[ORIGIN_KEY] = value
96100

97101
@deprecated("Cloning contexts will no longer be required in 0.50", version="0.50")
98102
def clone(self):
@@ -101,106 +105,4 @@ def clone(self):
101105
Partially clones the current context.
102106
It copies everything EXCEPT the registered and finished spans.
103107
"""
104-
return self._clone()
105-
106-
def _get_current_root_span(self):
107-
# type: () -> Optional[Span]
108-
with self._lock:
109-
return self._local_root_span
110-
111-
@deprecated("Please use tracer.current_root_span() instead", version="0.50")
112-
def get_current_root_span(self):
113-
# type: () -> Optional[Span]
114-
"""
115-
Return the root span of the context or None if it does not exist.
116-
"""
117-
return self._get_current_root_span()
118-
119-
def _get_current_span(self):
120-
# type: () -> Optional[Span]
121-
with self._lock:
122-
return self._current_span
123-
124-
@deprecated("Please use tracer.current_span() instead", version="0.50")
125-
def get_current_span(self):
126-
# type: () -> Optional[Span]
127-
"""
128-
Return the last active span that corresponds to the last inserted
129-
item in the trace list. This cannot be considered as the current active
130-
span in asynchronous environments, because some spans can be closed
131-
earlier while child spans still need to finish their traced execution.
132-
"""
133-
return self._get_current_span()
134-
135-
def _set_current_span(self, span):
136-
# type: (Optional[Span]) -> None
137-
"""
138-
Set current span internally.
139-
140-
Non-safe if not used with a lock. For internal Context usage only.
141-
"""
142-
self._current_span = span
143-
if span:
144-
self._parent_trace_id = span.trace_id
145-
self._parent_span_id = span.span_id
146-
else:
147-
self._parent_span_id = None
148-
149-
def _add_span(self, span):
150-
# type: (Span) -> None
151-
with self._lock:
152-
# Assume the first span added to the context is the local root
153-
if self._local_root_span is None:
154-
self._local_root_span = span
155-
self._set_current_span(span)
156-
span._context = self
157-
158-
@deprecated("Context will no longer support active span management in a later version.", version="0.50")
159-
def add_span(self, span):
160-
# type: (Span) -> None
161-
"""Activates span in the context."""
162-
return self._add_span(span)
163-
164-
def _close_span(self, span):
165-
# type: (Span) -> None
166-
with self._lock:
167-
if span == self._local_root_span:
168-
if self.dd_origin is not None:
169-
span.meta[ORIGIN_KEY] = self.dd_origin
170-
if self._sampling_priority is not None:
171-
span.metrics[SAMPLING_PRIORITY_KEY] = self._sampling_priority
172-
173-
# If a parent exists to the closing span and it is unfinished, then
174-
# activate it next.
175-
if span._parent and not span._parent.finished:
176-
self._set_current_span(span._parent)
177-
# Else if the span is the local root of this context, then clear the
178-
# context so the next trace can be started.
179-
elif span == self._local_root_span:
180-
self._set_current_span(span._parent)
181-
self._local_root_span = None
182-
self._parent_trace_id = None
183-
self._parent_span_id = None
184-
self._sampling_priority = None
185-
# Else the span that is closing is closing after its parent.
186-
# This is most likely an error. To ensure future traces are not
187-
# affected clear out the context and set the current span to
188-
# ``None``.
189-
else:
190-
log.debug(
191-
"span %r closing after its parent %r, this is an error when not using async", span, span._parent
192-
)
193-
self._set_current_span(None)
194-
self._local_root_span = None
195-
self._parent_trace_id = None
196-
self._parent_span_id = None
197-
self._sampling_priority = None
198-
199-
@deprecated(message="Context will no longer support active span management in a later version.", version="0.50")
200-
def close_span(self, span):
201-
# type: (Span) -> None
202-
"""Updates the context after a span has finished.
203-
204-
The next active span becomes `span`'s parent.
205-
"""
206-
return self._close_span(span)
108+
return self

ddtrace/contrib/asyncio/helpers.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import ddtrace
99

10-
from ...context import Context
1110
from .provider import CONTEXT_ATTR
1211
from .wrappers import wrapped_create_task
1312

@@ -29,7 +28,7 @@ def ensure_future(coro_or_future, *, loop=None, tracer=None):
2928
If the current task already has a Context, it will be attached to the new Task so the Trace list will be preserved.
3029
"""
3130
tracer = tracer or ddtrace.tracer
32-
current_ctx = tracer.get_call_context()
31+
current_ctx = tracer.current_trace_context()
3332
task = asyncio.ensure_future(coro_or_future, loop=loop)
3433
set_call_context(task, current_ctx)
3534
return task
@@ -53,12 +52,10 @@ def run_in_executor(loop, executor, func, *args, tracer=None):
5352
5453
"""
5554
tracer = tracer or ddtrace.tracer
56-
ctx = Context()
57-
current_ctx = tracer.get_call_context()
58-
ctx._current_span = current_ctx._current_span
55+
current_ctx = tracer.current_trace_context()
5956

6057
# prepare the future using an executor wrapper
61-
future = loop.run_in_executor(executor, _wrap_executor, func, args, tracer, ctx)
58+
future = loop.run_in_executor(executor, _wrap_executor, func, args, tracer, current_ctx)
6259
return future
6360

6461

ddtrace/contrib/asyncio/patch.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import asyncio
2+
import sys
23

34
from ddtrace.vendor.wrapt import ObjectProxy
45
from ddtrace.vendor.wrapt import wrap_function_wrapper as _w
56

6-
from ...internal.compat import CONTEXTVARS_IS_AVAILABLE
77
from ...utils.wrappers import unwrap as _u
88
from .wrappers import wrapped_create_task
9-
from .wrappers import wrapped_create_task_contextvars
109

1110

1211
def patch():
@@ -17,14 +16,13 @@ def patch():
1716
return
1817
setattr(asyncio, "_datadog_patch", True)
1918

20-
wrapper = wrapped_create_task_contextvars if CONTEXTVARS_IS_AVAILABLE else wrapped_create_task
19+
if sys.version_info < (3, 7, 0):
20+
_w(asyncio.BaseEventLoop, "create_task", wrapped_create_task)
2121

22-
_w(asyncio.BaseEventLoop, "create_task", wrapper)
23-
24-
# also patch event loop if not inheriting the wrapped create_task from BaseEventLoop
25-
loop = asyncio.get_event_loop()
26-
if not isinstance(loop.create_task, ObjectProxy):
27-
_w(loop, "create_task", wrapper)
22+
# also patch event loop if not inheriting the wrapped create_task from BaseEventLoop
23+
loop = asyncio.get_event_loop()
24+
if not isinstance(loop.create_task, ObjectProxy):
25+
_w(loop, "create_task", wrapped_create_task)
2826

2927

3028
def unpatch():
@@ -33,9 +31,10 @@ def unpatch():
3331
if getattr(asyncio, "_datadog_patch", False):
3432
setattr(asyncio, "_datadog_patch", False)
3533

36-
_u(asyncio.BaseEventLoop, "create_task")
34+
if sys.version_info < (3, 7, 0):
35+
_u(asyncio.BaseEventLoop, "create_task")
3736

38-
# also unpatch event loop if not inheriting the already unwrapped create_task from BaseEventLoop
39-
loop = asyncio.get_event_loop()
40-
if isinstance(loop.create_task, ObjectProxy):
41-
_u(loop, "create_task")
37+
# also unpatch event loop if not inheriting the already unwrapped create_task from BaseEventLoop
38+
loop = asyncio.get_event_loop()
39+
if isinstance(loop.create_task, ObjectProxy):
40+
_u(loop, "create_task")

0 commit comments

Comments
 (0)