11""" CPU profiling collector."""
22from __future__ import absolute_import
33
4+ import collections
5+ import logging
46import sys
57import threading
8+ import weakref
69
710from ddtrace import compat
811from ddtrace.profiling import _attr
@@ -12,6 +15,23 @@ from ddtrace.profiling import event
1215from ddtrace.profiling.collector import _traceback
1316from ddtrace.utils import formats
1417from ddtrace.vendor import attr
18+ from ddtrace.vendor import six
19+
20+
21+ _LOG = logging.getLogger(__name__ )
22+
23+
24+ if " gevent" in sys.modules:
25+ try :
26+ import gevent.monkey
27+ except ImportError :
28+ _LOG.error(" gevent loaded but unable to import gevent.monkey" )
29+ from ddtrace.vendor.six.moves._thread import get_ident as _thread_get_ident
30+ else :
31+ _thread_get_ident = gevent.monkey.get_original(" thread" if six.PY2 else " _thread" , " get_ident" )
32+ else :
33+ from ddtrace.vendor.six.moves._thread import get_ident as _thread_get_ident
34+
1535
1636# NOTE: Do not use LOG here. This code runs under a real OS thread and is unable to acquire any lock of the `logging`
1737# module without having gevent crashing our dedicated thread.
@@ -112,6 +132,7 @@ class StackBasedEvent(event.SampleEvent):
112132 thread_name = attr.ib(default = None )
113133 frames = attr.ib(default = None )
114134 nframes = attr.ib(default = None )
135+ trace_ids = attr.ib(default = None )
115136
116137
117138@event.event_class
@@ -198,7 +219,7 @@ cdef get_thread_name(thread_id):
198219 return " Anonymous Thread %d " % thread_id
199220
200221
201- cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_time):
222+ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_time, thread_span_links ):
202223 current_exceptions = []
203224
204225 IF PY_MAJOR_VERSION >= 3 and PY_MINOR_VERSION >= 7 :
@@ -241,6 +262,8 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
241262
242263 running_thread_ids = {t[0 ] for t in running_threads}
243264
265+ thread_span_links.clear_threads(running_thread_ids)
266+
244267 if ignore_profiler:
245268 running_thread_ids -= _periodic.PERIODIC_THREAD_IDS
246269
@@ -250,11 +273,13 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
250273 for tid, frame in running_threads:
251274 if ignore_profiler and tid in _periodic.PERIODIC_THREAD_IDS:
252275 continue
276+ spans = thread_span_links.get_active_leaf_spans_from_thread_id(tid)
253277 frames, nframes = _traceback.pyframe_to_frames(frame, max_nframes)
254278 stack_events.append(
255279 StackSampleEvent(
256280 thread_id = tid,
257281 thread_name = get_thread_name(tid),
282+ trace_ids = set (span.trace_id for span in spans),
258283 nframes = nframes, frames = frames,
259284 wall_time_ns = wall_time,
260285 cpu_time_ns = cpu_time[tid],
@@ -281,6 +306,59 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
281306 return stack_events, exc_events
282307
283308
309+ @ attr.s (slots = True , eq = False )
310+ class _ThreadSpanLinks (object ):
311+
312+ _thread_id_to_spans = attr.ib(factory = lambda : collections.defaultdict(weakref.WeakSet), repr = False , init = False )
313+
314+ # We do not use a lock because:
315+ # - When adding new items, it's not possible for a same thread to call `link_span` at different time
316+ # Since the WeakSet are per-thread, there's no chance of creating 2 WeakSet for the same thread
317+ # - When reading the span WeakSet for a thread, the set is copied which means that if it was ever mutated during
318+ # our reading, we wouldn't care.
319+ # In practice, here, it won't even mutate during the reading since this only used by the stack collector which
320+ # already owns the GIL.
321+
322+ def link_span (self , span ):
323+ """ Link a span to its running environment.
324+
325+ Track threads, tasks, etc.
326+ """
327+ self ._thread_id_to_spans[_thread_get_ident()].add(span)
328+
329+ def clear_threads (self , existing_thread_ids ):
330+ """ Clear the stored list of threads based on the list of existing thread ids.
331+
332+ If any thread that is part of this list was stored, its data will be deleted.
333+
334+ :param existing_thread_ids: A set of thread ids to keep.
335+ """
336+ # Iterate over a copy of the list of keys in case it's mutated during our iteration.
337+ for thread_id in list (self ._thread_id_to_spans.keys()):
338+ if thread_id not in existing_thread_ids:
339+ del self ._thread_id_to_spans[thread_id]
340+
341+ def get_active_leaf_spans_from_thread_id (self , thread_id ):
342+ """ Return the latest active spans for a thread.
343+
344+ In theory this should return a single span, though if multiple children span are active without being finished,
345+ there can be several spans returned.
346+
347+ :param thread_id: The thread id.
348+ :return: A set with the active spans.
349+ """
350+ spans = set (self ._thread_id_to_spans.get(thread_id, ()))
351+ # Iterate over a copy so we can modify the original
352+ for span in set (spans):
353+ if not span.finished:
354+ try :
355+ spans.remove(span._parent)
356+ except KeyError :
357+ pass
358+
359+ return {span for span in spans if not span.finished}
360+
361+
284362@ attr.s (slots = True )
285363class StackCollector (collector.PeriodicCollector ):
286364 """ Execution stacks collector."""
@@ -295,8 +373,10 @@ class StackCollector(collector.PeriodicCollector):
295373 max_time_usage_pct = attr.ib(factory = _attr.from_env(" DD_PROFILING_MAX_TIME_USAGE_PCT" , 2 , float ))
296374 nframes = attr.ib(factory = _attr.from_env(" DD_PROFILING_MAX_FRAMES" , 64 , int ))
297375 ignore_profiler = attr.ib(factory = _attr.from_env(" DD_PROFILING_IGNORE_PROFILER" , True , formats.asbool))
376+ tracer = attr.ib(default = None )
298377 _thread_time = attr.ib(init = False , repr = False )
299378 _last_wall_time = attr.ib(init = False , repr = False )
379+ _thread_span_links = attr.ib(factory = _ThreadSpanLinks, init = False , repr = False )
300380
301381 @max_time_usage_pct.validator
302382 def _check_max_time_usage (self , attribute , value ):
@@ -306,8 +386,15 @@ class StackCollector(collector.PeriodicCollector):
306386 def start (self ):
307387 self ._thread_time = ThreadTime()
308388 self ._last_wall_time = compat.monotonic_ns()
389+ if self .tracer is not None :
390+ self .tracer.on_start_span(self ._thread_span_links.link_span)
309391 super (StackCollector, self ).start()
310392
393+ def stop (self ):
394+ super (StackCollector, self ).stop()
395+ if self .tracer is not None :
396+ self .tracer.deregister_on_start_span(self ._thread_span_links.link_span)
397+
311398 def _compute_new_interval (self , used_wall_time_ns ):
312399 interval = (used_wall_time_ns / (self .max_time_usage_pct / 100.0 )) - used_wall_time_ns
313400 return max (interval / 1e9 , self .MIN_INTERVAL_TIME)
@@ -319,7 +406,7 @@ class StackCollector(collector.PeriodicCollector):
319406 self ._last_wall_time = now
320407
321408 all_events = stack_collect(
322- self .ignore_profiler, self ._thread_time, self .nframes, self .interval, wall_time,
409+ self .ignore_profiler, self ._thread_time, self .nframes, self .interval, wall_time, self ._thread_span_links,
323410 )
324411
325412 used_wall_time_ns = compat.monotonic_ns() - now
0 commit comments