-
Notifications
You must be signed in to change notification settings - Fork 315
[AMLII-2019] Max samples per context for Histogram, Distribution and Timing metrics (Experimental Feature) #863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 74 commits
6de4f9b
c171911
572da5c
79590b0
c112d5b
890c657
e591bbb
4c2b238
a84af9d
583e287
b01ed1d
bb863c5
ca4981d
1091569
606b271
a7f2a56
a5d4b15
7cf00b1
92481b9
6aa0243
58af7c0
f5a4cd1
3725fca
2f8c3fe
26255de
44352ac
02e3b23
148ef17
c5d9563
08091d5
b482e72
3f9168a
a46b5d2
1e6d213
5bbf6c5
6809466
4039b6b
21b5e08
50337ba
fe0c521
4dcee39
0444e99
fc87fa5
f0c4db0
e7b62d2
9d4f24a
620ea1e
a54c136
7d78065
393201b
dd7e743
a442722
5d872a3
0e06501
10e9ebe
1e2eed8
bad35ba
bca7448
90b083d
22c2ac9
08d150a
dfd1a29
5c86590
0276bfa
e50a85f
f6d963d
4dbaba5
b2f714b
b46b539
b80da4c
c27bf8d
52fd25f
ec9a487
363d940
f274b9e
810353a
351a8cc
a94afa5
d334346
ac7c86d
a4c055a
e3e4c6d
aa64e24
721b375
a9c55ce
0c551ed
f13b48c
60143c6
5f3bb5b
09ee570
c1bc06d
911b942
cbbb938
d3ba223
737d48b
f65051e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -160,6 +160,7 @@ def __init__( | |
| telemetry_port=None, # type: Union[str, int] | ||
| telemetry_socket_path=None, # type: Text | ||
| max_buffer_len=0, # type: int | ||
| max_metric_samples_per_context=0, # type: int | ||
| container_id=None, # type: Optional[Text] | ||
| origin_detection_enabled=True, # type: bool | ||
| socket_timeout=0, # type: Optional[float] | ||
|
|
@@ -236,9 +237,14 @@ def __init__( | |
| it overrides the default value. | ||
| :type flush_interval: float | ||
|
|
||
| :disable_aggregation: If true, metrics (Count, Gauge, Set) are no longered aggregated by the client | ||
| :disable_aggregation: If true, metrics (Count, Gauge, Set) are no longer aggregated by the client | ||
| :type disable_aggregation: bool | ||
|
|
||
| :max_metric_samples_per_context: Sets the maximum amount of samples for Histogram, Distribution | ||
| and Timings metrics (default 0). This feature should be used alongside aggregation. This feature | ||
| is experimental. | ||
| :type max_metric_samples_per_context: int | ||
|
|
||
| :disable_buffering: If set, metrics are no longered buffered by the client and | ||
| all data is sent synchronously to the server | ||
| :type disable_buffering: bool | ||
|
|
@@ -450,7 +456,7 @@ def __init__( | |
| self._flush_interval = flush_interval | ||
| self._flush_thread = None | ||
| self._flush_thread_stop = threading.Event() | ||
| self.aggregator = Aggregator() | ||
| self.aggregator = Aggregator(max_metric_samples_per_context) | ||
| # Indicates if the process is about to fork, so we shouldn't start any new threads yet. | ||
| self._forking = False | ||
|
|
||
|
|
@@ -643,10 +649,11 @@ def disable_aggregation(self): | |
| self._stop_flush_thread() | ||
| log.debug("Statsd aggregation is disabled") | ||
|
|
||
| def enable_aggregation(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL): | ||
| def enable_aggregation(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, max_samples_per_context=0): | ||
| with self._config_lock: | ||
| if not self._disable_aggregation: | ||
| return | ||
| self.aggregator.set_max_samples_per_context(max_samples_per_context) | ||
| self._disable_aggregation = False | ||
| self._flush_interval = flush_interval | ||
| if self._disable_buffering: | ||
|
|
@@ -826,6 +833,10 @@ def flush_aggregated_metrics(self): | |
| for m in metrics: | ||
| self._report(m.name, m.metric_type, m.value, m.tags, m.rate, m.timestamp) | ||
|
|
||
| sampled_metrics = self.aggregator.flush_aggregated_sampled_metrics() | ||
| for m in sampled_metrics: | ||
| self._report(m.name, m.metric_type, m.value, m.tags, m.rate, m.timestamp, False) | ||
|
|
||
| def gauge( | ||
| self, | ||
| metric, # type: Text | ||
|
|
@@ -960,7 +971,12 @@ def histogram( | |
| >>> statsd.histogram("uploaded.file.size", 1445) | ||
| >>> statsd.histogram("album.photo.count", 26, tags=["gender:female"]) | ||
| """ | ||
| self._report(metric, "h", value, tags, sample_rate) | ||
| if not self._disable_aggregation and self.aggregator.max_samples_per_context != 0: | ||
| print("Aggregated histogram") | ||
andrewqian2001datadog marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self.aggregator.histogram(metric, value, tags, sample_rate) | ||
| else: | ||
| print("Regular histogram") | ||
| self._report(metric, "h", value, tags, sample_rate) | ||
|
|
||
| def distribution( | ||
| self, | ||
|
|
@@ -975,7 +991,10 @@ def distribution( | |
| >>> statsd.distribution("uploaded.file.size", 1445) | ||
| >>> statsd.distribution("album.photo.count", 26, tags=["gender:female"]) | ||
| """ | ||
| self._report(metric, "d", value, tags, sample_rate) | ||
| if not self._disable_aggregation and self.aggregator.max_samples_per_context != 0: | ||
| self.aggregator.distribution(metric, value, tags, sample_rate) | ||
| else: | ||
| self._report(metric, "d", value, tags, sample_rate) | ||
|
|
||
| def timing( | ||
| self, | ||
|
|
@@ -989,7 +1008,10 @@ def timing( | |
|
|
||
| >>> statsd.timing("query.response.time", 1234) | ||
| """ | ||
| self._report(metric, "ms", value, tags, sample_rate) | ||
| if not self._disable_aggregation and self.aggregator.max_samples_per_context != 0: | ||
| self.aggregator.timing(metric, value, tags, sample_rate) | ||
| else: | ||
| self._report(metric, "ms", value, tags, sample_rate) | ||
|
|
||
| def timed(self, metric=None, tags=None, sample_rate=None, use_ms=None): | ||
| """ | ||
|
|
@@ -1093,7 +1115,7 @@ def _serialize_metric( | |
| ("|T" + text(timestamp)) if timestamp > 0 else "", | ||
| ) | ||
|
|
||
| def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0): | ||
| def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0, sampling=True): | ||
| """ | ||
| Create a metric packet and send it. | ||
|
|
||
|
|
@@ -1109,11 +1131,12 @@ def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0): | |
| if self._telemetry: | ||
| self.metrics_count += 1 | ||
|
|
||
| if sample_rate is None: | ||
| sample_rate = self.default_sample_rate | ||
| if sampling: | ||
| if sample_rate is None: | ||
| sample_rate = self.default_sample_rate | ||
|
|
||
| if sample_rate != 1 and random() > sample_rate: | ||
| return | ||
| if sample_rate != 1 and random() > sample_rate: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Code Vulnerabilitydo not use random (...read more)Make sure to use values that are actually random. The Learn More |
||
| return | ||
| # timestamps (protocol v1.3) only allowed on gauges and counts | ||
| allows_timestamp = metric_type == MetricType.GAUGE or metric_type == MetricType.COUNT | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| import random | ||
| from datadog.dogstatsd.metric_types import MetricType | ||
| from datadog.dogstatsd.metrics import MetricAggregator | ||
|
|
||
|
|
||
| class MaxSampleMetric(object): | ||
| def __init__(self, name, tags, metric_type, specified_rate=1.0, max_metric_samples=0): | ||
| self.name = name | ||
| self.tags = tags | ||
| self.metric_type = metric_type | ||
| self.max_metric_samples = max_metric_samples | ||
| self.specified_rate = specified_rate | ||
| self.data = [] | ||
|
||
| self.stored_metric_samples = 0 | ||
| self.total_metric_samples = 0 | ||
|
|
||
| def sample(self, value): | ||
| self.data.append(value) | ||
| self.stored_metric_samples += 1 | ||
| self.total_metric_samples += 1 | ||
|
|
||
| def maybe_keep_sample(self, value): | ||
| if self.max_metric_samples > 0: | ||
ddrthall marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self.total_metric_samples += 1 | ||
|
|
||
| if self.stored_metric_samples < self.max_metric_samples: | ||
| self.data.append(value) | ||
| self.stored_metric_samples += 1 | ||
| else: | ||
| i = random.randint(0, self.total_metric_samples - 1) | ||
ddrthall marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if i < self.max_metric_samples: | ||
| self.data[i] = value | ||
| else: | ||
| self.sample(value) | ||
|
|
||
|
|
||
| def skip_sample(self): | ||
| self.total_metric_samples += 1 | ||
|
|
||
| def flush(self): | ||
| return [ | ||
| MetricAggregator(self.name, self.tags, self.specified_rate, self.metric_type, value) | ||
| for value in self.data | ||
| ] | ||
|
|
||
|
|
||
| class HistogramMetric(MaxSampleMetric): | ||
| def __init__(self, name, tags, rate=1.0, max_metric_samples=0): | ||
| super(HistogramMetric, self).__init__(name, tags, MetricType.HISTOGRAM, rate, max_metric_samples) | ||
|
|
||
|
|
||
| class DistributionMetric(MaxSampleMetric): | ||
| def __init__(self, name, tags, rate=1.0, max_metric_samples=0): | ||
| super(DistributionMetric, self).__init__(name, tags, MetricType.DISTRIBUTION, rate, max_metric_samples) | ||
|
|
||
|
|
||
| class TimingMetric(MaxSampleMetric): | ||
| def __init__(self, name, tags, rate=1.0, max_metric_samples=0): | ||
| super(TimingMetric, self).__init__(name, tags, MetricType.TIMING, rate, max_metric_samples) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| from threading import Lock | ||
| import random | ||
|
|
||
|
|
||
| class MaxSampleMetricContexts: | ||
| def __init__(self, max_sample_metric_type): | ||
| self.lock = Lock() | ||
| self.values = {} | ||
| self.max_sample_metric_type = max_sample_metric_type | ||
|
|
||
| def flush(self): | ||
| metrics = [] | ||
| """Flush the metrics and reset the stored values.""" | ||
| with self.lock: | ||
| copiedValues = self.values.copy() | ||
|
||
| self.values.clear() | ||
| self.values = {} | ||
| for _, metric in copiedValues.items(): | ||
| metrics.append(metric.flush()) | ||
|
|
||
| return metrics | ||
|
|
||
| def sample(self, name, value, tags, rate, context_key, max_samples_per_context): | ||
| """Sample a metric and store it if it meets the criteria.""" | ||
| keeping_sample = self.should_sample(rate) | ||
| with self.lock: | ||
| if context_key not in self.values: | ||
| # Create a new metric if it doesn't exist | ||
| self.values[context_key] = self.max_sample_metric_type(name, tags, rate, max_samples_per_context) | ||
| metric = self.values[context_key] | ||
| if keeping_sample: | ||
andrewqian2001datadog marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| metric.maybe_keep_sample(value) | ||
| else: | ||
| metric.skip_sample() | ||
|
|
||
| def should_sample(self, rate): | ||
| """Determine if a sample should be kept based on the specified rate.""" | ||
| if rate >= 1: | ||
| return True | ||
| return random.random() < rate | ||
andrewqian2001datadog marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,3 +2,6 @@ class MetricType: | |
| COUNT = "c" | ||
| GAUGE = "g" | ||
| SET = "s" | ||
| HISTOGRAM = "h" | ||
| DISTRIBUTION = "d" | ||
| TIMING = "ms" | ||
Uh oh!
There was an error while loading. Please reload this page.