Skip to content

Commit 5b87eae

Browse files
authored
fix(profiling): accounting of time for gevent tasks [backport #5338 to 1.11] (#5549)
Backport of #5338 to 1.11 This change fixes the accounting of wall and CPU time to gevent tasks by only assigning them to the main Python thread, where they are supposed to run under normal circumstances. This change also ensures that task information does not leak onto any other threads that might be running, most notably the profiler threads themselves. The rough picture of the gevent task modelling on top of Python threads that comes with this PR is the following ![image](https://user-images.githubusercontent.com/20231758/226345502-3c1cb6a3-0b37-47b3-b711-728647ad6ad9.png) This means that the `MainThread` will account for the wall time of all the tasks running within it, as well as the native thread stacks. For example, if the main, gevent-based, application runs a `sleep(2)` in the main thread, and a `sleep(1)` in a secondary thread, the `MainThread` thread would report a total of about 5 seconds of wall time (both of the sleeps, plus the 2 seconds spent in the gevent hub as part of the native thread stack), whereas the `MainThread` _task_ would only account for the 2 seconds of the `sleep(2)`. <img width="1050" alt="Screenshot 2023-03-20 at 15 35 13" src="https://user-images.githubusercontent.com/20231758/226390338-bc47941f-3536-4b95-a50a-da0f37a3acff.png"> <img width="1050" alt="Screenshot 2023-03-20 at 15 35 34" src="https://user-images.githubusercontent.com/20231758/226390409-5091beda-7815-4c1d-a437-0a2a3d9e727e.png"> Some of the existing tests have been adapted to check for the accounting proposed by this PR. Dedicated scenarios will be added to internal correctness check to catch future regressions. ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines) are followed. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Author is aware of the performance implications of this PR as reported in the benchmarks PR comment. ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer is aware of, and discussed the performance implications of this PR as reported in the benchmarks PR comment.
1 parent 344242b commit 5b87eae

File tree

7 files changed

+64
-53
lines changed

7 files changed

+64
-53
lines changed

ddtrace/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,14 @@
33

44
ModuleWatchdog.install()
55

6-
from ._logger import configure_ddtrace_logger # noqa: E402
6+
# Acquire a reference to the threading module. Some parts of the library (e.g.
7+
# the profiler) might be enabled programmatically and therefore might end up
8+
# getting a reference to the tracee's threading module. By storing a reference
9+
# to the threading module used by ddtrace here, we make it easy for those parts
10+
# to get a reference to the right threading module.
11+
import threading as _threading
12+
13+
from ._logger import configure_ddtrace_logger
714

815

916
# configure ddtrace logger before other modules log

ddtrace/profiling/_threading.pyx

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,18 @@
11
from __future__ import absolute_import
22

33
import sys
4-
import threading as ddtrace_threading
54
import typing
65
import weakref
76

87
import attr
98
from six.moves import _thread
109

10+
from ddtrace import _threading as ddtrace_threading
1111

12-
cpdef get_thread_name(thread_id):
13-
# Do not force-load the threading module if it's not already loaded
14-
if "threading" not in sys.modules:
15-
return None
1612

17-
import threading
13+
cpdef get_thread_by_id(thread_id):
14+
# Do not force-load the threading module if it's not already loaded
15+
threading = sys.modules.get("threading", ddtrace_threading)
1816

1917
# Look for all threads, including the ones we create
2018
for threading_mod in (threading, ddtrace_threading):
@@ -23,42 +21,34 @@ cpdef get_thread_name(thread_id):
2321
# we fail, it'll just be an anonymous thread because it's either
2422
# starting or dying.
2523
try:
26-
return threading_mod._active[thread_id].name
24+
return threading_mod._active[thread_id]
2725
except KeyError:
2826
try:
29-
return threading_mod._limbo[thread_id].name
27+
return threading_mod._limbo[thread_id]
3028
except KeyError:
3129
pass
3230

3331
return None
3432

3533

36-
cpdef get_thread_native_id(thread_id):
37-
# Do not force-load the threading module if it's not already loaded
38-
if "threading" not in sys.modules:
39-
return None
34+
cpdef get_thread_name(thread_id):
35+
thread = get_thread_by_id(thread_id)
36+
return thread.name if thread is not None else None
4037

41-
import threading
4238

43-
try:
44-
thread_obj = threading._active[thread_id]
45-
except KeyError:
46-
try:
47-
thread_obj = ddtrace_threading._active[thread_id]
48-
except KeyError:
49-
# This should not happen, unless somebody started a thread without
50-
# using the `threading` module.
51-
# In that case, well… just use the thread_id as native_id 🤞
52-
return thread_id
39+
cpdef get_thread_native_id(thread_id):
40+
thread = get_thread_by_id(thread_id)
41+
if thread is None:
42+
return thread_id
5343

5444
try:
5545
# We prioritize using native ids since we expect them to be surely unique for a program. This is less true
5646
# for hashes since they are relative to the memory address which can easily be the same across different
5747
# objects.
58-
return thread_obj.native_id
48+
return thread.native_id
5949
except AttributeError:
6050
# Python < 3.8
61-
return hash(thread_obj)
51+
return hash(thread)
6252

6353

6454
# cython does not play well with mypy

ddtrace/profiling/collector/_task.pyx

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import sys
2+
from types import ModuleType
23
import weakref
34

4-
from ddtrace.internal import compat
55
from ddtrace.vendor.wrapt.importer import when_imported
66

77
from .. import _asyncio
@@ -27,7 +27,7 @@ def install_greenlet_tracer(gevent):
2727

2828
class DDGreenletTracer(object):
2929
def __init__(self, gevent):
30-
# type: (...) -> None
30+
# type: (ModuleType) -> None
3131
self.gevent = gevent
3232

3333
self.previous_trace_function = settrace(self)
@@ -112,16 +112,22 @@ cpdef list_tasks(thread_id):
112112

113113
tasks = []
114114

115-
# We consider all Thread objects to be greenlet
116-
# This should be true as nobody could use a half-monkey-patched gevent
117115
if _gevent_tracer is not None:
118-
tasks.extend([
119-
(greenlet_id,
120-
_threading.get_thread_name(greenlet_id),
121-
greenlet.gr_frame)
122-
for greenlet_id, greenlet in list(compat.iteritems(_gevent_tracer.greenlets))
123-
if not greenlet.dead
124-
])
116+
if type(_threading.get_thread_by_id(thread_id)).__name__.endswith("_MainThread"):
117+
# Under normal circumstances, the Hub is running in the main thread.
118+
# Python will only ever have a single instance of a _MainThread
119+
# class, so if we find it we attribute all the greenlets to it.
120+
tasks.extend(
121+
[
122+
(
123+
greenlet_id,
124+
_threading.get_thread_name(greenlet_id),
125+
greenlet.gr_frame
126+
)
127+
for greenlet_id, greenlet in dict(_gevent_tracer.greenlets).items()
128+
if not greenlet.dead
129+
]
130+
)
125131

126132
policy = _asyncio.get_event_loop_policy()
127133
if isinstance(policy, _asyncio.DdtraceProfilerEventLoopPolicy):

ddtrace/profiling/collector/stack.pyx

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
from __future__ import absolute_import
33

44
import sys
5-
import threading as ddtrace_threading # this is ddtrace's internal copy of the module, not the application's copy
65
import typing
76

87
import attr
98
import six
109

10+
from ddtrace import _threading as ddtrace_threading
1111
from ddtrace import context
1212
from ddtrace import span as ddspan
1313
from ddtrace.internal import compat
@@ -309,7 +309,11 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
309309
exc_events = []
310310

311311
for thread_id, thread_native_id, thread_name, thread_pyframes, exception, span, cpu_time in running_threads:
312-
thread_task_id, thread_task_name, thread_task_frame = _task.get_task(thread_id)
312+
if thread_name is None:
313+
# A Python thread with no name is likely still initialising so we
314+
# ignore it to avoid reporting potentially misleading data.
315+
# Effectively we would be discarding a negligible number of samples.
316+
continue
313317

314318
tasks = _task.list_tasks(thread_id)
315319

@@ -320,9 +324,6 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
320324
if task_pyframes is None:
321325
continue
322326

323-
if task_id in thread_id_ignore_list:
324-
continue
325-
326327
frames, nframes = _traceback.pyframe_to_frames(task_pyframes, max_nframes)
327328
if nframes:
328329
stack_events.append(
@@ -344,8 +345,8 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
344345
thread_id=thread_id,
345346
thread_native_id=thread_native_id,
346347
thread_name=thread_name,
347-
task_id=thread_task_id,
348-
task_name=thread_task_name,
348+
task_id=None,
349+
task_name=None,
349350
nframes=nframes,
350351
frames=frames,
351352
wall_time_ns=wall_time,
@@ -362,8 +363,8 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
362363
thread_id=thread_id,
363364
thread_name=thread_name,
364365
thread_native_id=thread_native_id,
365-
task_id=thread_task_id,
366-
task_name=thread_task_name,
366+
task_id=None,
367+
task_name=None,
367368
nframes=nframes,
368369
frames=frames,
369370
sampling_period=int(interval * 1e9),
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
fixes:
3+
- |
4+
profiling: Corrects accounting of wall and CPU time for gevent tasks within
5+
the main Python thread.

tests/profiling/collector/test_stack.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,15 +215,16 @@ def _dofib():
215215
for t in threads:
216216
t.join()
217217

218+
expected_task_ids = {thread.ident for thread in threads}
218219
for event in r.events[stack_event.StackSampleEvent]:
219-
if event.thread_name is None and event.task_id in {thread.ident for thread in threads}:
220+
if event.task_id in expected_task_ids:
220221
assert event.task_name.startswith("TestThread ")
221222
# This test is not uber-reliable as it has timing issue, therefore
222223
# if we find one of our TestThread with the correct info, we're
223224
# happy enough to stop here.
224225
break
225226
else:
226-
pytest.fail("No gevent thread found")
227+
pytest.fail("No gevent threads found")
227228

228229

229230
def test_max_time_usage():

tests/profiling/simple_program_gevent.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
from gevent import monkey
22

3+
# Import from ddtrace before monkey patching to ensure that we grab all the
4+
# necessary references to the unpatched modules.
5+
from ddtrace.profiling import bootstrap
6+
import ddtrace.profiling.auto # noqa
7+
from ddtrace.profiling.collector import stack_event
8+
39

410
monkey.patch_all()
511

612
import threading
713
import time
814

9-
from ddtrace.profiling import bootstrap
10-
import ddtrace.profiling.auto
11-
from ddtrace.profiling.collector import stack_event
12-
1315

1416
def fibonacci(n):
1517
if n == 0:
@@ -20,7 +22,6 @@ def fibonacci(n):
2022
return fibonacci(n - 1) + fibonacci(n - 2)
2123

2224

23-
# When not using our special PeriodicThread based on real threads, there's 0 event captured.
2425
i = 1
2526
for _ in range(50):
2627
if len(bootstrap.profiler._profiler._recorder.events[stack_event.StackSampleEvent]) >= 10:

0 commit comments

Comments
 (0)