Skip to content

Commit f03da8f

Browse files
committed
[core] initial support for partial flushes (#668)
* [core] partial flush prototype * [core] partial flush when >= min spans required * [core] add tests for partial flushes * [core] use span._finished = True instead of span.finish() for tests * [core] add test for partial flush with remaining opened spans * Update tests/test_context.py * [core] change context config to 'tracer' * [core] move partial flush settings to class level * [core] remove usage of config * [tests] remove unused import
1 parent e5c21e9 commit f03da8f

File tree

2 files changed

+201
-1
lines changed

2 files changed

+201
-1
lines changed

ddtrace/context.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import threading
33

44
from .constants import SAMPLING_PRIORITY_KEY
5-
5+
from .utils.formats import asbool, get_env
66

77
log = logging.getLogger(__name__)
88

@@ -22,6 +22,9 @@ class Context(object):
2222
2323
This data structure is thread-safe.
2424
"""
25+
_partial_flush_enabled = asbool(get_env('tracer', 'partial_flush_enabled', 'false'))
26+
_partial_flush_min_spans = int(get_env('tracer', 'partial_flush_min_spans', 500))
27+
2528
def __init__(self, trace_id=None, span_id=None, sampled=True, sampling_priority=None):
2629
"""
2730
Initialize a new thread-safe ``Context``.
@@ -190,6 +193,31 @@ def get(self):
190193
self._sampling_priority = None
191194
self._sampled = True
192195
return trace, sampled
196+
197+
elif self._partial_flush_enabled and self._finished_spans >= self._partial_flush_min_spans:
198+
# partial flush when enabled and we have more than the minimal required spans
199+
trace = self._trace
200+
sampled = self._sampled
201+
sampling_priority = self._sampling_priority
202+
# attach the sampling priority to the context root span
203+
if sampled and sampling_priority is not None and trace:
204+
trace[0].set_metric(SAMPLING_PRIORITY_KEY, sampling_priority)
205+
206+
# Any open spans will remain as `self._trace`
207+
# Any finished spans will get returned to be flushed
208+
opened_spans = []
209+
closed_spans = []
210+
for span in trace:
211+
if span._finished:
212+
closed_spans.append(span)
213+
else:
214+
opened_spans.append(span)
215+
216+
# Update trace spans and stats
217+
self._trace = opened_spans
218+
self._finished_spans = 0
219+
220+
return closed_spans, sampled
193221
else:
194222
return None, None
195223

tests/test_context.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import contextlib
12
import mock
23
import threading
34

@@ -15,6 +16,20 @@ class TestTracingContext(TestCase):
1516
Tests related to the ``Context`` class that hosts the trace for the
1617
current execution flow.
1718
"""
19+
@contextlib.contextmanager
20+
def override_partial_flush(self, ctx, enabled, min_spans):
21+
original_enabled = ctx._partial_flush_enabled
22+
original_min_spans = ctx._partial_flush_min_spans
23+
24+
ctx._partial_flush_enabled = enabled
25+
ctx._partial_flush_min_spans = min_spans
26+
27+
try:
28+
yield
29+
finally:
30+
ctx._partial_flush_enabled = original_enabled
31+
ctx._partial_flush_min_spans = original_min_spans
32+
1833
def test_add_span(self):
1934
# it should add multiple spans
2035
ctx = Context()
@@ -101,6 +116,163 @@ def test_get_trace_empty(self):
101116
ok_(trace is None)
102117
ok_(sampled is None)
103118

119+
def test_partial_flush(self):
120+
"""
121+
When calling `Context.get`
122+
When partial flushing is enabled
123+
When we have just enough finished spans to flush
124+
We return the finished spans
125+
"""
126+
tracer = get_dummy_tracer()
127+
ctx = Context()
128+
129+
# Create a root span with 5 children, all of the children are finished, the root is not
130+
root = Span(tracer=tracer, name='root')
131+
ctx.add_span(root)
132+
for i in range(5):
133+
child = Span(tracer=tracer, name='child_{}'.format(i), trace_id=root.trace_id, parent_id=root.span_id)
134+
child._parent = root
135+
child._finished = True
136+
ctx.add_span(child)
137+
ctx.close_span(child)
138+
139+
with self.override_partial_flush(ctx, enabled=True, min_spans=5):
140+
trace, sampled = ctx.get()
141+
142+
self.assertIsNotNone(trace)
143+
self.assertIsNotNone(sampled)
144+
145+
self.assertEqual(len(trace), 5)
146+
self.assertEqual(
147+
set(['child_0', 'child_1', 'child_2', 'child_3', 'child_4']),
148+
set([span.name for span in trace])
149+
)
150+
151+
# Ensure we clear/reset internal stats as expected
152+
self.assertEqual(ctx._finished_spans, 0)
153+
self.assertEqual(ctx._trace, [root])
154+
with self.override_partial_flush(ctx, enabled=True, min_spans=5):
155+
trace, sampled = ctx.get()
156+
self.assertIsNone(trace)
157+
self.assertIsNone(sampled)
158+
159+
def test_partial_flush_too_many(self):
160+
"""
161+
When calling `Context.get`
162+
When partial flushing is enabled
163+
When we have more than the minimum number of spans needed to flush
164+
We return the finished spans
165+
"""
166+
tracer = get_dummy_tracer()
167+
ctx = Context()
168+
169+
# Create a root span with 5 children, all of the children are finished, the root is not
170+
root = Span(tracer=tracer, name='root')
171+
ctx.add_span(root)
172+
for i in range(5):
173+
child = Span(tracer=tracer, name='child_{}'.format(i), trace_id=root.trace_id, parent_id=root.span_id)
174+
child._parent = root
175+
child._finished = True
176+
ctx.add_span(child)
177+
ctx.close_span(child)
178+
179+
with self.override_partial_flush(ctx, enabled=True, min_spans=1):
180+
trace, sampled = ctx.get()
181+
182+
self.assertIsNotNone(trace)
183+
self.assertIsNotNone(sampled)
184+
185+
self.assertEqual(len(trace), 5)
186+
self.assertEqual(
187+
set(['child_0', 'child_1', 'child_2', 'child_3', 'child_4']),
188+
set([span.name for span in trace])
189+
)
190+
191+
# Ensure we clear/reset internal stats as expected
192+
self.assertEqual(ctx._finished_spans, 0)
193+
self.assertEqual(ctx._trace, [root])
194+
with self.override_partial_flush(ctx, enabled=True, min_spans=5):
195+
trace, sampled = ctx.get()
196+
self.assertIsNone(trace)
197+
self.assertIsNone(sampled)
198+
199+
def test_partial_flush_too_few(self):
200+
"""
201+
When calling `Context.get`
202+
When partial flushing is enabled
203+
When we do not have enough finished spans to flush
204+
We return no spans
205+
"""
206+
tracer = get_dummy_tracer()
207+
ctx = Context()
208+
209+
# Create a root span with 5 children, all of the children are finished, the root is not
210+
root = Span(tracer=tracer, name='root')
211+
ctx.add_span(root)
212+
for i in range(5):
213+
child = Span(tracer=tracer, name='child_{}'.format(i), trace_id=root.trace_id, parent_id=root.span_id)
214+
child._parent = root
215+
child._finished = True
216+
ctx.add_span(child)
217+
ctx.close_span(child)
218+
219+
# Test with having 1 too few spans for partial flush
220+
with self.override_partial_flush(ctx, enabled=True, min_spans=6):
221+
trace, sampled = ctx.get()
222+
223+
self.assertIsNone(trace)
224+
self.assertIsNone(sampled)
225+
226+
self.assertEqual(len(ctx._trace), 6)
227+
self.assertEqual(ctx._finished_spans, 5)
228+
self.assertEqual(
229+
set(['root', 'child_0', 'child_1', 'child_2', 'child_3', 'child_4']),
230+
set([span.name for span in ctx._trace])
231+
)
232+
233+
def test_partial_flush_remaining(self):
234+
"""
235+
When calling `Context.get`
236+
When partial flushing is enabled
237+
When we have some unfinished spans
238+
We keep the unfinished spans around
239+
"""
240+
tracer = get_dummy_tracer()
241+
ctx = Context()
242+
243+
# Create a root span with 5 children, all of the children are finished, the root is not
244+
root = Span(tracer=tracer, name='root')
245+
ctx.add_span(root)
246+
for i in range(10):
247+
child = Span(tracer=tracer, name='child_{}'.format(i), trace_id=root.trace_id, parent_id=root.span_id)
248+
child._parent = root
249+
ctx.add_span(child)
250+
251+
# CLose the first 5 only
252+
if i < 5:
253+
child._finished = True
254+
ctx.close_span(child)
255+
256+
with self.override_partial_flush(ctx, enabled=True, min_spans=5):
257+
trace, sampled = ctx.get()
258+
259+
# Assert partially flushed spans
260+
self.assertTrue(len(trace), 5)
261+
self.assertIsNotNone(sampled)
262+
self.assertEqual(
263+
set(['child_0', 'child_1', 'child_2', 'child_3', 'child_4']),
264+
set([span.name for span in trace])
265+
)
266+
267+
# Assert remaining unclosed spans
268+
self.assertEqual(len(ctx._trace), 6)
269+
self.assertEqual(ctx._finished_spans, 0)
270+
self.assertEqual(
271+
set(['root', 'child_5', 'child_6', 'child_7', 'child_8', 'child_9']),
272+
set([span.name for span in ctx._trace]),
273+
)
274+
275+
104276
def test_finished(self):
105277
# a Context is finished if all spans inside are finished
106278
ctx = Context()

0 commit comments

Comments
 (0)