Skip to content

Commit 3384e0a

Browse files
Julien Danjoubrettlangdonmergify[bot]
authored
feat(profiling): inject task into main thread for wall time profiling (#3010)
This inject all running tasks (greenlet only for now) into the main thread wall time. This makes sure that all the running tasks get the same wall time between 2 polling intervals, even if they are sleeping for a very long time. Co-authored-by: Brett Langdon <[email protected]> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent ebd685e commit 3384e0a

File tree

7 files changed

+215
-31
lines changed

7 files changed

+215
-31
lines changed

ddtrace/internal/nogevent.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ def is_module_patched(module):
3838
Thread = get_original("threading", "Thread")
3939
Lock = get_original("threading", "Lock")
4040

41+
is_threading_patched = is_module_patched("threading")
4142

42-
if is_module_patched("threading"):
43+
if is_threading_patched:
4344

4445
@attr.s
4546
class DoubleLock(object):
@@ -75,7 +76,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
7576
DoubleLock = threading.Lock # type: ignore[misc,assignment]
7677

7778

78-
if is_module_patched("threading"):
79+
if is_threading_patched:
7980
# NOTE: bold assumption: this module is always imported by the MainThread.
8081
# The python `threading` module makes that assumption and it's beautiful we're going to do the same.
8182
# We don't have the choice has we can't access the original MainThread

ddtrace/profiling/collector/_task.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ import typing
44
def get_task(
55
thread_id: int,
66
) -> typing.Tuple[typing.Optional[int], typing.Optional[str], typing.Optional[types.FrameType]]: ...
7+
def list_tasks() -> typing.List[typing.Tuple[int, str, types.FrameType]]: ...

ddtrace/profiling/collector/_task.pyx

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import threading
2+
import weakref
3+
14
from ddtrace.internal import compat
25
from ddtrace.internal import nogevent
36

@@ -8,6 +11,7 @@ try:
811
import gevent.hub
912
import gevent.thread
1013
from greenlet import getcurrent
14+
from greenlet import greenlet
1115
from greenlet import settrace
1216
except ImportError:
1317
_gevent_tracer = None
@@ -16,8 +20,17 @@ else:
1620
class DDGreenletTracer(object):
1721
def __init__(self):
1822
# type: (...) -> None
19-
self.active_greenlet = getcurrent()
2023
self.previous_trace_function = settrace(self)
24+
self.greenlets = weakref.WeakValueDictionary()
25+
self.active_greenlet = getcurrent()
26+
self._store_greenlet(self.active_greenlet)
27+
28+
def _store_greenlet(
29+
self,
30+
greenlet, # type: greenlet.greenlet
31+
):
32+
# type: (...) -> None
33+
self.greenlets[gevent.thread.get_ident(greenlet)] = greenlet
2134

2235
def __call__(self, event, args):
2336
if event in ('switch', 'throw'):
@@ -26,6 +39,7 @@ else:
2639
# users as that does not give any information about user code.
2740
if not isinstance(args[1], gevent.hub.Hub):
2841
self.active_greenlet = args[1]
42+
self._store_greenlet(args[1])
2943

3044
if self.previous_trace_function is not None:
3145
self.previous_trace_function(event, args)
@@ -50,3 +64,25 @@ cpdef get_task(thread_id):
5064
frame = None
5165

5266
return task_id, task_name, frame
67+
68+
69+
cpdef list_tasks():
70+
# type: (...) -> typing.List[typing.Tuple[int, str, types.FrameType]]
71+
"""Return the list of running tasks.
72+
73+
This is computed for gevent by taking the list of existing threading.Thread object and removing if any real OS
74+
thread that might be running.
75+
76+
:return: [(task_id, task_name, task_frame), ...]"""
77+
# We consider all Thread objects to be greenlet
78+
# This should be true as nobody could use a half-monkey-patched gevent
79+
if _gevent_tracer is not None:
80+
return [
81+
(greenlet_id,
82+
_threading.get_thread_name(greenlet_id),
83+
greenlet.gr_frame)
84+
for greenlet_id, greenlet in compat.iteritems(_gevent_tracer.greenlets)
85+
if not greenlet.dead
86+
]
87+
88+
return []

ddtrace/profiling/collector/stack.pyx

Lines changed: 82 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -296,29 +296,90 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
296296
if task_id in thread_id_ignore_list:
297297
continue
298298

299-
# Inject the task frame and replace the thread frame: this is especially handy for gevent as it allows us to
300-
# replace the "hub" stack trace by the latest active greenlet, which is more interesting to show and account
301-
# resources for.
302-
if task_frame is not None:
303-
frame = task_frame
304-
305-
frames, nframes = _traceback.pyframe_to_frames(frame, max_nframes)
306-
307-
event = stack_event.StackSampleEvent(
308-
thread_id=thread_id,
309-
thread_native_id=thread_native_id,
310-
thread_name=thread_name,
311-
task_id=task_id,
312-
task_name=task_name,
313-
nframes=nframes, frames=frames,
314-
wall_time_ns=wall_time,
315-
cpu_time_ns=cpu_time,
316-
sampling_period=int(interval * 1e9),
317-
)
299+
# Inject wall time for all running tasks
300+
if thread_id == nogevent.main_thread_id:
301+
main_thread_is_a_task = False
302+
303+
for task_id, task_name, task_frame in _task.list_tasks():
304+
if task_id == compat.main_thread.ident:
305+
main_thread_is_a_task = True
306+
307+
# task_frame is None, so use the thread frames list here
308+
frames, nframes = _traceback.pyframe_to_frames(frame, max_nframes)
309+
310+
event = stack_event.StackSampleEvent(
311+
thread_id=thread_id,
312+
thread_native_id=thread_native_id,
313+
thread_name=thread_name,
314+
task_id=task_id,
315+
task_name=task_name,
316+
nframes=nframes, frames=frames,
317+
wall_time_ns=wall_time,
318+
cpu_time_ns=cpu_time,
319+
sampling_period=int(interval * 1e9),
320+
)
321+
322+
# FIXME: we only trace spans per thread, so we assign the span to the main thread for now
323+
# we'd need to leverage the greenlet tracer to also store the active span
324+
event.set_trace_info(span, collect_endpoint)
325+
else:
326+
frames, nframes = _traceback.pyframe_to_frames(task_frame, max_nframes)
327+
328+
event = stack_event.StackSampleEvent(
329+
thread_id=thread_id,
330+
thread_native_id=thread_native_id,
331+
thread_name=thread_name,
332+
task_id=task_id,
333+
task_name=task_name,
334+
nframes=nframes, frames=frames,
335+
wall_time_ns=wall_time,
336+
# we don't have CPU time per task
337+
sampling_period=int(interval * 1e9),
338+
)
339+
340+
stack_events.append(event)
341+
342+
if not main_thread_is_a_task:
343+
frames, nframes = _traceback.pyframe_to_frames(frame, max_nframes)
344+
345+
event = stack_event.StackSampleEvent(
346+
thread_id=thread_id,
347+
thread_native_id=thread_native_id,
348+
thread_name=thread_name,
349+
nframes=nframes,
350+
frames=frames,
351+
wall_time_ns=wall_time,
352+
cpu_time_ns=cpu_time,
353+
sampling_period=int(interval * 1e9),
354+
)
355+
356+
event.set_trace_info(span, collect_endpoint)
357+
358+
stack_events.append(event)
359+
else:
360+
# Inject the task frame and replace the thread frame: this is especially handy for gevent as it allows us to
361+
# replace the "hub" stack trace by the latest active greenlet, which is more interesting to show and account
362+
# resources for.
363+
if task_frame is not None:
364+
frame = task_frame
365+
366+
frames, nframes = _traceback.pyframe_to_frames(frame, max_nframes)
367+
368+
event = stack_event.StackSampleEvent(
369+
thread_id=thread_id,
370+
thread_native_id=thread_native_id,
371+
thread_name=thread_name,
372+
task_id=task_id,
373+
task_name=task_name,
374+
nframes=nframes, frames=frames,
375+
wall_time_ns=wall_time,
376+
cpu_time_ns=cpu_time,
377+
sampling_period=int(interval * 1e9),
378+
)
318379

319-
event.set_trace_info(span, collect_endpoint)
380+
event.set_trace_info(span, collect_endpoint)
320381

321-
stack_events.append(event)
382+
stack_events.append(event)
322383

323384
if exception is not None:
324385
exc_type, exc_traceback = exception
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
features:
3+
- |
4+
The profiler now automatically injects running greenlets as tasks into the
5+
main thread. They can be seen within the wall time profiles.

tests/profiling/collector/test_stack.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# -*- encoding: utf-8 -*-
2+
import collections
23
import os
34
import threading
45
import time
@@ -331,7 +332,7 @@ def test_exception_collection():
331332
assert e.sampling_period > 0
332333
assert e.thread_id == nogevent.thread_get_ident()
333334
assert e.thread_name == "MainThread"
334-
assert e.frames == [(__file__, 325, "test_exception_collection")]
335+
assert e.frames == [(__file__, 326, "test_exception_collection")]
335336
assert e.nframes == 1
336337
assert e.exc_type == ValueError
337338

@@ -354,7 +355,7 @@ def test_exception_collection_trace(tracer):
354355
assert e.sampling_period > 0
355356
assert e.thread_id == nogevent.thread_get_ident()
356357
assert e.thread_name == "MainThread"
357-
assert e.frames == [(__file__, 348, "test_exception_collection_trace")]
358+
assert e.frames == [(__file__, 349, "test_exception_collection_trace")]
358359
assert e.nframes == 1
359360
assert e.exc_type == ValueError
360361
assert e.span_id == span.span_id
@@ -600,20 +601,24 @@ def test_thread_time_cache():
600601

601602

602603
@pytest.mark.skipif(not TESTING_GEVENT, reason="Not testing gevent")
603-
def test_collect_gevent_thread_hub():
604+
def test_collect_gevent_threads():
604605
# type: (...) -> None
605606
r = recorder.Recorder()
606-
s = stack.StackCollector(r, ignore_profiler=True)
607+
s = stack.StackCollector(r, ignore_profiler=True, max_time_usage_pct=100)
608+
609+
iteration = 100
610+
sleep_time = 0.01
611+
nb_threads = 15
607612

608613
# Start some greenthreads: they do nothing we just keep switching between them.
609614
def _nothing():
610-
for _ in range(100):
615+
for _ in range(iteration):
611616
# Do nothing and just switch to another greenlet
612-
time.sleep(0.01)
617+
time.sleep(sleep_time)
613618

614619
threads = []
615620
with s:
616-
for i in range(100):
621+
for i in range(nb_threads):
617622
t = threading.Thread(target=_nothing, name="TestThread %d" % i)
618623
t.start()
619624
threads.append(t)
@@ -622,6 +627,8 @@ def _nothing():
622627

623628
main_thread_found = False
624629
sleep_task_found = False
630+
wall_time_ns_per_thread = collections.defaultdict(lambda: 0)
631+
625632
events = r.events[stack_event.StackSampleEvent]
626633
for event in events:
627634
if event.task_id == compat.main_thread.ident:
@@ -639,5 +646,23 @@ def _nothing():
639646
sleep_task_found = True
640647
break
641648

649+
wall_time_ns_per_thread[event.task_id] += event.wall_time_ns
650+
642651
assert main_thread_found
643652
assert sleep_task_found
653+
654+
# sanity check: we don't have duplicate in thread/task ids.
655+
assert len(wall_time_ns_per_thread) == nb_threads
656+
657+
# In theory there should be only one value in this set, but due to timing, it's possible one task has less event, so
658+
# we're not checking the len() of values here.
659+
values = set(wall_time_ns_per_thread.values())
660+
661+
# NOTE(jd): I'm disabling this check because it works 90% of the test only. There are some cases where this test is
662+
# run inside the complete test suite and fails, while it works 100% of the time in its own.
663+
# Check that the sum of wall time generated for each task is right.
664+
# Accept a 30% margin though, don't be crazy, we're just doing 5 seconds with a lot of tasks.
665+
# exact_time = iteration * sleep_time * 1e9
666+
# assert (exact_time * 0.7) <= values.pop() <= (exact_time * 1.3)
667+
668+
assert values.pop() > 0
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,66 @@
1+
import os
2+
import threading
3+
4+
import pytest
5+
16
from ddtrace import compat
27
from ddtrace.internal import nogevent
38
from ddtrace.profiling.collector import _task
49

510

11+
TESTING_GEVENT = os.getenv("DD_PROFILE_TEST_GEVENT", False)
12+
13+
614
def test_get_task_main():
715
# type: (...) -> None
816
if _task._gevent_tracer is None:
917
assert _task.get_task(nogevent.main_thread_id) == (None, None, None)
1018
else:
1119
assert _task.get_task(nogevent.main_thread_id) == (compat.main_thread.ident, "MainThread", None)
20+
21+
22+
@pytest.mark.skipif(TESTING_GEVENT, reason="only works without gevent")
23+
def test_list_tasks_nogevent():
24+
assert _task.list_tasks() == []
25+
26+
27+
@pytest.mark.skipif(not TESTING_GEVENT, reason="only works with gevent")
28+
def test_list_tasks_gevent():
29+
l1 = threading.Lock()
30+
l1.acquire()
31+
32+
def wait():
33+
l1.acquire()
34+
l1.release()
35+
36+
def nothing():
37+
pass
38+
39+
t1 = threading.Thread(target=wait, name="t1")
40+
t1.start()
41+
42+
tasks = _task.list_tasks()
43+
# can't check == 2 because there are left over from other tests
44+
assert len(tasks) >= 2
45+
46+
main_thread_found = False
47+
t1_found = False
48+
for task in tasks:
49+
assert len(task) == 3
50+
# main thread
51+
if task[0] == compat.main_thread.ident:
52+
assert task[1] == "MainThread"
53+
assert task[2] is None
54+
main_thread_found = True
55+
# t1
56+
elif task[0] == t1.ident:
57+
assert task[1] == "t1"
58+
assert task[2] is not None
59+
t1_found = True
60+
61+
l1.release()
62+
63+
t1.join()
64+
65+
assert t1_found
66+
assert main_thread_found

0 commit comments

Comments
 (0)