Skip to content

Commit 2051e04

Browse files
[core] Collect run-time metrics (#819)
* [metrics] initial implementation - add data structures with tests - add collectors for psutil and platform modules - send metrics to agent statsd * [metrics] add gc generation metrics * [metrics] clean-up * [metrics] add thread worker, additional metrics * [metrics] linting * [metrics] code organization * [metrics] add runtime_id to tracer * [metrics] resolve rebase conflicts * [metrics] linting * [metrics] add runtime-id tag * [metrics] linting * [metrics] linting * Add environment variable for enabling runtime metrics * Environment configuration for dogstatsd * apply brettlinter - thanks brett :) Co-Authored-By: Kyle-Verhoog <[email protected]> * [metrics] remove unnecessary LazyValues * [metrics] in-line psutil method calls * [metrics] use internal logger * [metrics] add reset method, gather services * [metrics] support multiple services properly * [metrics] use base test case * [metrics] handle process forking * [metrics] add runtime metrics tags to spans * Remove LazyValue * Add dependencies for runtime metrics to library * Refactor metrics collectors and add tests * Begin major refactoring of api * Decouple dogstatsd from runtime metrics * Fix constant * Fix flake8 * Separate host/port for trace agent and dogstatsd * Update ddtrace_run tests * Fix integration test * Vendor datadogpy to fix issues with gevent+requests * Revert change to on import * Add license for dogstatsd * Move runtime metrics into internal * Fixes for ddtrace.internal.runtime * Wrap worker flush in try-except to log errors * Flush calls gauge which is a UDP so no need to catch errors * Remove unused datadog and metrics tests * Rename class in repr * Remove collect_fn argument from ValueCollector * Fix flake8 * Remove tags not called for in RFC * Better metric names for cpu * Use 0-1-2 for gc collections * Comments * Fix daemon for threading * Add test on metrics received by dogstatsd * Remove datadog dependency since we have it vendored * Fix cpu metrics * Fix cumulative metrics * Fix reset * Flag check unnecessary * Fix runtime tag names Co-Authored-By: majorgreys <[email protected]> * Only tag root span with runtime info * Use common namespace for gc metric names * Remove unnecessary set check * Wait for tests of metrics received * Fix for constant tags and services * Fix broken config * Fix flake8 * Fix ddtrace-run test for runtime metrics enabled * Update ddtrace/bootstrap/sitecustomize.py Co-Authored-By: majorgreys <[email protected]>
1 parent 5e032ee commit 2051e04

27 files changed

+1682
-124
lines changed

ddtrace/bootstrap/sitecustomize.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ def add_global_tags(tracer):
9898
if priority_sampling:
9999
opts["priority_sampling"] = asbool(priority_sampling)
100100

101+
opts['collect_metrics'] = asbool(get_env('runtime_metrics', 'enabled'))
102+
101103
if opts:
102104
tracer.configure(**opts)
103105

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from .runtime_metrics import (
2+
RuntimeTags,
3+
RuntimeMetrics,
4+
RuntimeWorker,
5+
)
6+
7+
8+
__all__ = [
9+
'RuntimeTags',
10+
'RuntimeMetrics',
11+
'RuntimeWorker',
12+
]
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import importlib
2+
3+
from ..logger import get_logger
4+
5+
log = get_logger(__name__)
6+
7+
8+
class ValueCollector(object):
9+
"""A basic state machine useful for collecting, caching and updating data
10+
obtained from different Python modules.
11+
12+
The two primary use-cases are
13+
1) data loaded once (like tagging information)
14+
2) periodically updating data sources (like thread count)
15+
16+
Functionality is provided for requiring and importing modules which may or
17+
may not be installed.
18+
"""
19+
enabled = True
20+
periodic = False
21+
required_modules = []
22+
value = None
23+
value_loaded = False
24+
25+
def __init__(self, enabled=None, periodic=None, required_modules=None):
26+
self.enabled = self.enabled if enabled is None else enabled
27+
self.periodic = self.periodic if periodic is None else periodic
28+
self.required_modules = self.required_modules if required_modules is None else required_modules
29+
30+
self._modules_successfully_loaded = False
31+
self.modules = self._load_modules()
32+
if self._modules_successfully_loaded:
33+
self._on_modules_load()
34+
35+
def _on_modules_load(self):
36+
"""Hook triggered after all required_modules have been successfully loaded.
37+
"""
38+
39+
def _load_modules(self):
40+
modules = {}
41+
try:
42+
for module in self.required_modules:
43+
modules[module] = importlib.import_module(module)
44+
self._modules_successfully_loaded = True
45+
except ImportError:
46+
# DEV: disable collector if we cannot load any of the required modules
47+
self.enabled = False
48+
log.warn('Could not import module "{}" for {}. Disabling collector.'.format(module, self))
49+
return None
50+
return modules
51+
52+
def collect(self, keys=None):
53+
"""Returns metrics as collected by `collect_fn`.
54+
55+
:param keys: The keys of the metrics to collect.
56+
"""
57+
if not self.enabled:
58+
return self.value
59+
60+
keys = keys or set()
61+
62+
if not self.periodic and self.value_loaded:
63+
return self.value
64+
65+
# call underlying collect function and filter out keys not requested
66+
self.value = self.collect_fn(keys)
67+
68+
# filter values for keys
69+
if len(keys) > 0 and isinstance(self.value, list):
70+
self.value = [
71+
(k, v)
72+
for (k, v) in self.value
73+
if k in keys
74+
]
75+
76+
self.value_loaded = True
77+
return self.value
78+
79+
def __repr__(self):
80+
return '<{}(enabled={},periodic={},required_modules={})>'.format(
81+
self.__class__.__name__,
82+
self.enabled,
83+
self.periodic,
84+
self.required_modules,
85+
)
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
GC_COUNT_GEN0 = 'runtime.python.gc.count.gen0'
2+
GC_COUNT_GEN1 = 'runtime.python.gc.count.gen1'
3+
GC_COUNT_GEN2 = 'runtime.python.gc.count.gen2'
4+
5+
THREAD_COUNT = 'runtime.python.thread_count'
6+
MEM_RSS = 'runtime.python.mem.rss'
7+
CPU_TIME_SYS = 'runtime.python.cpu.time.sys'
8+
CPU_TIME_USER = 'runtime.python.cpu.time.user'
9+
CPU_PERCENT = 'runtime.python.cpu.percent'
10+
CTX_SWITCH_VOLUNTARY = 'runtime.python.cpu.ctx_switch.voluntary'
11+
CTX_SWITCH_INVOLUNTARY = 'runtime.python.cpu.ctx_switch.involuntary'
12+
13+
GC_RUNTIME_METRICS = set([
14+
GC_COUNT_GEN0,
15+
GC_COUNT_GEN1,
16+
GC_COUNT_GEN2,
17+
])
18+
19+
PSUTIL_RUNTIME_METRICS = set([
20+
THREAD_COUNT,
21+
MEM_RSS,
22+
CTX_SWITCH_VOLUNTARY,
23+
CTX_SWITCH_INVOLUNTARY,
24+
CPU_TIME_SYS,
25+
CPU_TIME_USER,
26+
CPU_PERCENT,
27+
])
28+
29+
DEFAULT_RUNTIME_METRICS = GC_RUNTIME_METRICS | PSUTIL_RUNTIME_METRICS
30+
31+
RUNTIME_ID = 'runtime-id'
32+
SERVICE = 'service'
33+
LANG_INTERPRETER = 'lang_interpreter'
34+
LANG_VERSION = 'lang_version'
35+
36+
TRACER_TAGS = set([
37+
RUNTIME_ID,
38+
SERVICE,
39+
])
40+
41+
PLATFORM_TAGS = set([
42+
LANG_INTERPRETER,
43+
LANG_VERSION
44+
])
45+
46+
DEFAULT_RUNTIME_TAGS = TRACER_TAGS
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import os
2+
3+
from .collector import ValueCollector
4+
from .constants import (
5+
GC_COUNT_GEN0,
6+
GC_COUNT_GEN1,
7+
GC_COUNT_GEN2,
8+
THREAD_COUNT,
9+
MEM_RSS,
10+
CTX_SWITCH_VOLUNTARY,
11+
CTX_SWITCH_INVOLUNTARY,
12+
CPU_TIME_SYS,
13+
CPU_TIME_USER,
14+
CPU_PERCENT,
15+
)
16+
17+
18+
class RuntimeMetricCollector(ValueCollector):
19+
value = []
20+
periodic = True
21+
22+
23+
class GCRuntimeMetricCollector(RuntimeMetricCollector):
24+
""" Collector for garbage collection generational counts
25+
26+
More information at https://docs.python.org/3/library/gc.html
27+
"""
28+
required_modules = ['gc']
29+
30+
def collect_fn(self, keys):
31+
gc = self.modules.get('gc')
32+
33+
counts = gc.get_count()
34+
metrics = [
35+
(GC_COUNT_GEN0, counts[0]),
36+
(GC_COUNT_GEN1, counts[1]),
37+
(GC_COUNT_GEN2, counts[2]),
38+
]
39+
40+
return metrics
41+
42+
43+
class PSUtilRuntimeMetricCollector(RuntimeMetricCollector):
44+
"""Collector for psutil metrics.
45+
46+
Performs batched operations via proc.oneshot() to optimize the calls.
47+
See https://psutil.readthedocs.io/en/latest/#psutil.Process.oneshot
48+
for more information.
49+
"""
50+
required_modules = ['psutil']
51+
stored_value = dict(
52+
CPU_TIME_SYS_TOTAL=0,
53+
CPU_TIME_USER_TOTAL=0,
54+
CTX_SWITCH_VOLUNTARY_TOTAL=0,
55+
CTX_SWITCH_INVOLUNTARY_TOTAL=0,
56+
)
57+
58+
def _on_modules_load(self):
59+
self.proc = self.modules['psutil'].Process(os.getpid())
60+
61+
def collect_fn(self, keys):
62+
with self.proc.oneshot():
63+
# only return time deltas
64+
# TODO[tahir]: better abstraction for metrics based on last value
65+
cpu_time_sys_total = self.proc.cpu_times().system
66+
cpu_time_user_total = self.proc.cpu_times().user
67+
cpu_time_sys = cpu_time_sys_total - self.stored_value['CPU_TIME_SYS_TOTAL']
68+
cpu_time_user = cpu_time_user_total - self.stored_value['CPU_TIME_USER_TOTAL']
69+
70+
ctx_switch_voluntary_total = self.proc.num_ctx_switches().voluntary
71+
ctx_switch_involuntary_total = self.proc.num_ctx_switches().involuntary
72+
ctx_switch_voluntary = ctx_switch_voluntary_total - self.stored_value['CTX_SWITCH_VOLUNTARY_TOTAL']
73+
ctx_switch_involuntary = ctx_switch_involuntary_total - self.stored_value['CTX_SWITCH_INVOLUNTARY_TOTAL']
74+
75+
self.stored_value = dict(
76+
CPU_TIME_SYS_TOTAL=cpu_time_sys_total,
77+
CPU_TIME_USER_TOTAL=cpu_time_user_total,
78+
CTX_SWITCH_VOLUNTARY_TOTAL=ctx_switch_voluntary_total,
79+
CTX_SWITCH_INVOLUNTARY_TOTAL=ctx_switch_involuntary_total,
80+
)
81+
82+
metrics = [
83+
(THREAD_COUNT, self.proc.num_threads()),
84+
(MEM_RSS, self.proc.memory_info().rss),
85+
(CTX_SWITCH_VOLUNTARY, ctx_switch_voluntary),
86+
(CTX_SWITCH_INVOLUNTARY, ctx_switch_involuntary),
87+
(CPU_TIME_SYS, cpu_time_sys),
88+
(CPU_TIME_USER, cpu_time_user),
89+
(CPU_PERCENT, self.proc.cpu_percent()),
90+
]
91+
92+
return metrics
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import threading
2+
import time
3+
import itertools
4+
5+
from ..logger import get_logger
6+
from .constants import (
7+
DEFAULT_RUNTIME_METRICS,
8+
DEFAULT_RUNTIME_TAGS,
9+
)
10+
from .metric_collectors import (
11+
GCRuntimeMetricCollector,
12+
PSUtilRuntimeMetricCollector,
13+
)
14+
from .tag_collectors import (
15+
TracerTagCollector,
16+
)
17+
18+
log = get_logger(__name__)
19+
20+
21+
class RuntimeCollectorsIterable(object):
22+
def __init__(self, enabled=None):
23+
self._enabled = enabled or self.ENABLED
24+
# Initialize the collectors.
25+
self._collectors = [c() for c in self.COLLECTORS]
26+
27+
def __iter__(self):
28+
collected = (
29+
collector.collect(self._enabled)
30+
for collector in self._collectors
31+
)
32+
return itertools.chain.from_iterable(collected)
33+
34+
def __repr__(self):
35+
return '{}(enabled={})'.format(
36+
self.__class__.__name__,
37+
self._enabled,
38+
)
39+
40+
41+
class RuntimeTags(RuntimeCollectorsIterable):
42+
ENABLED = DEFAULT_RUNTIME_TAGS
43+
COLLECTORS = [
44+
TracerTagCollector,
45+
]
46+
47+
48+
class RuntimeMetrics(RuntimeCollectorsIterable):
49+
ENABLED = DEFAULT_RUNTIME_METRICS
50+
COLLECTORS = [
51+
GCRuntimeMetricCollector,
52+
PSUtilRuntimeMetricCollector,
53+
]
54+
55+
56+
class RuntimeWorker(object):
57+
""" Worker thread for collecting and writing runtime metrics to a DogStatsd
58+
client.
59+
"""
60+
61+
FLUSH_INTERVAL = 10
62+
63+
def __init__(self, statsd_client, flush_interval=None):
64+
self._stay_alive = None
65+
self._thread = None
66+
self._flush_interval = flush_interval or self.FLUSH_INTERVAL
67+
self._statsd_client = statsd_client
68+
self._runtime_metrics = RuntimeMetrics()
69+
70+
def _target(self):
71+
while self._stay_alive:
72+
self.flush()
73+
time.sleep(self._flush_interval)
74+
75+
def start(self):
76+
if not self._thread:
77+
log.debug("Starting {}".format(self))
78+
self._stay_alive = True
79+
self._thread = threading.Thread(target=self._target)
80+
self._thread.setDaemon(True)
81+
self._thread.start()
82+
83+
def stop(self):
84+
if self._thread and self._stay_alive:
85+
log.debug("Stopping {}".format(self))
86+
self._stay_alive = False
87+
88+
def _write_metric(self, key, value):
89+
log.debug('Writing metric {}:{}'.format(key, value))
90+
self._statsd_client.gauge(key, value)
91+
92+
def flush(self):
93+
if not self._statsd_client:
94+
log.warn('Attempted flush with uninitialized or failed statsd client')
95+
return
96+
97+
for key, value in self._runtime_metrics:
98+
self._write_metric(key, value)
99+
100+
def reset(self):
101+
self._runtime_metrics = RuntimeMetrics()
102+
103+
def __repr__(self):
104+
return '{}(runtime_metrics={})'.format(
105+
self.__class__.__name__,
106+
self._runtime_metrics,
107+
)

0 commit comments

Comments
 (0)