Skip to content

Commit aa2a252

Browse files
mergify[bot]jd
andauthored
refactor(profiling): split lock profiling in two parts (#3268) (#3421)
This splits the lock profiling code to be able to profile any type of lock. (cherry picked from commit 2325f93) Co-authored-by: Julien Danjou <[email protected]>
1 parent 6c8249e commit aa2a252

File tree

9 files changed

+312
-269
lines changed

9 files changed

+312
-269
lines changed
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
from __future__ import absolute_import
2+
3+
import abc
4+
import os.path
5+
import sys
6+
import typing
7+
8+
import attr
9+
10+
from ddtrace.internal import compat
11+
from ddtrace.internal import nogevent
12+
from ddtrace.internal.utils import attr as attr_utils
13+
from ddtrace.internal.utils import formats
14+
from ddtrace.profiling import _threading
15+
from ddtrace.profiling import collector
16+
from ddtrace.profiling import event
17+
from ddtrace.profiling.collector import _task
18+
from ddtrace.profiling.collector import _traceback
19+
from ddtrace.vendor import wrapt
20+
21+
22+
@event.event_class
23+
class LockEventBase(event.StackBasedEvent):
24+
"""Base Lock event."""
25+
26+
lock_name = attr.ib(default="<unknown lock name>", type=str)
27+
sampling_pct = attr.ib(default=0, type=int)
28+
29+
30+
@event.event_class
31+
class LockAcquireEvent(LockEventBase):
32+
"""A lock has been acquired."""
33+
34+
wait_time_ns = attr.ib(default=0, type=int)
35+
36+
37+
@event.event_class
38+
class LockReleaseEvent(LockEventBase):
39+
"""A lock has been released."""
40+
41+
locked_for_ns = attr.ib(default=0, type=int)
42+
43+
44+
def _current_thread():
45+
# type: (...) -> typing.Tuple[int, str]
46+
thread_id = nogevent.thread_get_ident()
47+
return thread_id, _threading.get_thread_name(thread_id)
48+
49+
50+
# We need to know if wrapt is compiled in C or not. If it's not using the C module, then the wrappers function will
51+
# appear in the stack trace and we need to hide it.
52+
if os.environ.get("WRAPT_DISABLE_EXTENSIONS"):
53+
WRAPT_C_EXT = False
54+
else:
55+
try:
56+
import ddtrace.vendor.wrapt._wrappers as _w # noqa: F401
57+
except ImportError:
58+
WRAPT_C_EXT = False
59+
else:
60+
WRAPT_C_EXT = True
61+
del _w
62+
63+
64+
class _ProfiledLock(wrapt.ObjectProxy):
65+
66+
ACQUIRE_EVENT_CLASS = LockAcquireEvent
67+
RELEASE_EVENT_CLASS = LockReleaseEvent
68+
69+
def __init__(self, wrapped, recorder, tracer, max_nframes, capture_sampler, endpoint_collection_enabled):
70+
wrapt.ObjectProxy.__init__(self, wrapped)
71+
self._self_recorder = recorder
72+
self._self_tracer = tracer
73+
self._self_max_nframes = max_nframes
74+
self._self_capture_sampler = capture_sampler
75+
self._self_endpoint_collection_enabled = endpoint_collection_enabled
76+
frame = sys._getframe(2 if WRAPT_C_EXT else 3)
77+
code = frame.f_code
78+
self._self_name = "%s:%d" % (os.path.basename(code.co_filename), frame.f_lineno)
79+
80+
def acquire(self, *args, **kwargs):
81+
if not self._self_capture_sampler.capture():
82+
return self.__wrapped__.acquire(*args, **kwargs)
83+
84+
start = compat.monotonic_ns()
85+
try:
86+
return self.__wrapped__.acquire(*args, **kwargs)
87+
finally:
88+
try:
89+
end = self._self_acquired_at = compat.monotonic_ns()
90+
thread_id, thread_name = _current_thread()
91+
task_id, task_name, task_frame = _task.get_task(thread_id)
92+
93+
if task_frame is None:
94+
frame = sys._getframe(1)
95+
else:
96+
frame = task_frame
97+
98+
frames, nframes = _traceback.pyframe_to_frames(frame, self._self_max_nframes)
99+
100+
event = self.ACQUIRE_EVENT_CLASS(
101+
lock_name=self._self_name,
102+
frames=frames,
103+
nframes=nframes,
104+
thread_id=thread_id,
105+
thread_name=thread_name,
106+
task_id=task_id,
107+
task_name=task_name,
108+
wait_time_ns=end - start,
109+
sampling_pct=self._self_capture_sampler.capture_pct,
110+
)
111+
112+
if self._self_tracer is not None:
113+
event.set_trace_info(self._self_tracer.current_span(), self._self_endpoint_collection_enabled)
114+
115+
self._self_recorder.push_event(event)
116+
except Exception:
117+
pass
118+
119+
def release(
120+
self,
121+
*args, # type: typing.Any
122+
**kwargs # type: typing.Any
123+
):
124+
# type: (...) -> None
125+
try:
126+
return self.__wrapped__.release(*args, **kwargs)
127+
finally:
128+
try:
129+
if hasattr(self, "_self_acquired_at"):
130+
try:
131+
end = compat.monotonic_ns()
132+
thread_id, thread_name = _current_thread()
133+
task_id, task_name, task_frame = _task.get_task(thread_id)
134+
135+
if task_frame is None:
136+
frame = sys._getframe(1)
137+
else:
138+
frame = task_frame
139+
140+
frames, nframes = _traceback.pyframe_to_frames(frame, self._self_max_nframes)
141+
142+
event = self.RELEASE_EVENT_CLASS( # type: ignore[call-arg]
143+
lock_name=self._self_name,
144+
frames=frames,
145+
nframes=nframes,
146+
thread_id=thread_id,
147+
thread_name=thread_name,
148+
task_id=task_id,
149+
task_name=task_name,
150+
locked_for_ns=end - self._self_acquired_at,
151+
sampling_pct=self._self_capture_sampler.capture_pct,
152+
)
153+
154+
if self._self_tracer is not None:
155+
event.set_trace_info(
156+
self._self_tracer.current_span(), self._self_endpoint_collection_enabled
157+
)
158+
159+
self._self_recorder.push_event(event)
160+
finally:
161+
del self._self_acquired_at
162+
except Exception:
163+
pass
164+
165+
acquire_lock = acquire
166+
167+
168+
class FunctionWrapper(wrapt.FunctionWrapper):
169+
# Override the __get__ method: whatever happens, _allocate_lock is always considered by Python like a "static"
170+
# method, even when used as a class attribute. Python never tried to "bind" it to a method, because it sees it is a
171+
# builtin function. Override default wrapt behavior here that tries to detect bound method.
172+
def __get__(self, instance, owner=None):
173+
return self
174+
175+
176+
@attr.s
177+
class LockCollector(collector.CaptureSamplerCollector):
178+
"""Record lock usage."""
179+
180+
nframes = attr.ib(factory=attr_utils.from_env("DD_PROFILING_MAX_FRAMES", 64, int))
181+
endpoint_collection_enabled = attr.ib(
182+
factory=attr_utils.from_env("DD_PROFILING_ENDPOINT_COLLECTION_ENABLED", True, formats.asbool)
183+
)
184+
tracer = attr.ib(default=None)
185+
186+
_original = attr.ib(init=False, repr=False, type=typing.Any, cmp=False)
187+
188+
@abc.abstractmethod
189+
def _get_original(self):
190+
# type: (...) -> typing.Any
191+
pass
192+
193+
@abc.abstractmethod
194+
def _set_original(
195+
self, value # type: typing.Any
196+
):
197+
# type: (...) -> None
198+
pass
199+
200+
def _start_service(self): # type: ignore[override]
201+
# type: (...) -> None
202+
"""Start collecting lock usage."""
203+
self.patch()
204+
super(LockCollector, self)._start_service()
205+
206+
def _stop_service(self): # type: ignore[override]
207+
# type: (...) -> None
208+
"""Stop collecting lock usage."""
209+
super(LockCollector, self)._stop_service()
210+
self.unpatch()
211+
212+
def patch(self):
213+
# type: (...) -> None
214+
"""Patch the module for tracking lock allocation."""
215+
# We only patch the lock from the `threading` module.
216+
# Nobody should use locks from `_thread`; if they do so, then it's deliberate and we don't profile.
217+
self.original = self._get_original()
218+
219+
def _allocate_lock(wrapped, instance, args, kwargs):
220+
lock = wrapped(*args, **kwargs)
221+
return self.PROFILED_LOCK_CLASS(
222+
lock, self.recorder, self.tracer, self.nframes, self._capture_sampler, self.endpoint_collection_enabled
223+
)
224+
225+
self._set_original(FunctionWrapper(self.original, _allocate_lock))
226+
227+
def unpatch(self):
228+
# type: (...) -> None
229+
"""Unpatch the threading module for tracking lock allocation."""
230+
self._set_original(self.original)

0 commit comments

Comments
 (0)