Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit 87a70bb

Browse files
authored
Drop negative measurements on record (#389)
Prevents sets of measurements from being recorded to view data objects if any measurement in the set is negative, prevents MeasurementMaps from being recorded if they've ever contained negative value measurements, and removes 0- and negative-valued bucket boundaries from DistributionAggregations.
1 parent 19f89e1 commit 87a70bb

File tree

11 files changed

+247
-70
lines changed

11 files changed

+247
-70
lines changed

opencensus/stats/aggregation.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,15 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import logging
16+
1517
from opencensus.stats import bucket_boundaries
1618
from opencensus.stats import aggregation_data
1719

1820

21+
logger = logging.getLogger(__name__)
22+
23+
1924
class Type(object):
2025
""" The type of aggregation function used on a View.
2126
@@ -123,11 +128,26 @@ class DistributionAggregation(BaseAggregation):
123128
:param aggregation_type: represents the type of this aggregation
124129
125130
"""
126-
def __init__(
127-
self,
128-
boundaries=None,
129-
distribution=None,
130-
aggregation_type=Type.DISTRIBUTION):
131+
132+
def __init__(self,
133+
boundaries=None,
134+
distribution=None,
135+
aggregation_type=Type.DISTRIBUTION):
136+
if boundaries:
137+
if not all(boundaries[ii] < boundaries[ii + 1]
138+
for ii in range(len(boundaries) - 1)):
139+
raise ValueError("bounds must be sorted in increasing order")
140+
for ii, bb in enumerate(boundaries):
141+
if bb > 0:
142+
break
143+
else:
144+
ii += 1
145+
if ii:
146+
logger.warning("Dropping {} negative bucket boundaries, the "
147+
"values must be strictly > 0"
148+
.format(ii))
149+
boundaries = boundaries[ii:]
150+
131151
super(DistributionAggregation, self).__init__(
132152
buckets=boundaries, aggregation_type=aggregation_type)
133153
self._boundaries = bucket_boundaries.BucketBoundaries(boundaries)

opencensus/stats/aggregation_data.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,14 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import logging
16+
1517
from opencensus.stats import bucket_boundaries
1618

1719

20+
logger = logging.getLogger(__name__)
21+
22+
1823
class BaseAggregationData(object):
1924
"""Aggregation Data represents an aggregated value from a collection
2025
@@ -126,12 +131,17 @@ def __init__(self,
126131
self._sum_of_sqd_deviations = sum_of_sqd_deviations
127132
if bounds is None:
128133
bounds = []
134+
else:
135+
assert bounds == list(sorted(set(bounds)))
129136

130137
if counts_per_bucket is None:
131138
counts_per_bucket = [0 for ii in range(len(bounds) + 1)]
132-
elif len(counts_per_bucket) != len(bounds) + 1:
133-
raise ValueError("counts_per_bucket length does not match bounds "
134-
"length")
139+
else:
140+
assert all(cc >= 0 for cc in counts_per_bucket)
141+
assert len(counts_per_bucket) == len(bounds) + 1
142+
143+
assert bounds == sorted(bounds)
144+
assert all(bb > 0 for bb in bounds)
135145

136146
self._counts_per_bucket = counts_per_bucket
137147
self._bounds = bucket_boundaries.BucketBoundaries(

opencensus/stats/exporters/prometheus_exporter.py

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -164,31 +164,13 @@ def to_metric(self, desc, view):
164164
labels=labels)
165165
elif isinstance(agg_data,
166166
aggregation_data_module.DistributionAggregationData):
167+
168+
assert(agg_data.bounds == sorted(agg_data.bounds))
167169
points = {}
168-
# Histograms are cumulative in Prometheus.
169-
# 1. Sort buckets in ascending order but, retain
170-
# their indices for reverse lookup later on.
171-
# TODO: If there is a guarantee that distribution elements
172-
# are always sorted, then skip the sorting.
173-
indices_map = {}
174-
buckets = []
175-
i = 0
176-
for boundarie in view.aggregation.boundaries.boundaries:
177-
if boundarie not in indices_map \
178-
or indices_map == {}: # pragma: NO COVER
179-
indices_map[str(boundarie)] = i
180-
buckets.append(str(boundarie))
181-
i += 1
182-
183-
buckets.sort()
184-
185-
# 2. Now that the buckets are sorted by magnitude
186-
# we can create cumulative indicesmap them back by reverse index
187170
cum_count = 0
188-
for bucket in buckets:
189-
i = indices_map[bucket]
190-
cum_count += int(agg_data.counts_per_bucket[i])
191-
points[bucket] = cum_count
171+
for ii, bound in enumerate(agg_data.bounds):
172+
cum_count += agg_data.counts_per_bucket[ii]
173+
points[str(bound)] = cum_count
192174
labels = desc['labels'] if points is None else None
193175
return HistogramMetricFamily(name=desc['name'],
194176
documentation=desc['documentation'],
@@ -217,7 +199,7 @@ def to_metric(self, desc, view):
217199
% type(agg_data))
218200

219201
def collect(self): # pragma: NO COVER
220-
""" Collect fetches the statistics from OpenCensus
202+
"""Collect fetches the statistics from OpenCensus
221203
and delivers them as Prometheus Metrics.
222204
Collect is invoked everytime a prometheus.Gatherer is run
223205
for example when the HTTP endpoint is invoked by Prometheus.

opencensus/stats/measure_to_view_map.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from opencensus.stats.view_data import ViewData
1615
from collections import defaultdict
17-
import logging
1816
import copy
17+
import logging
18+
19+
from opencensus.stats import view_data as view_data_module
1920

2021

2122
class MeasureToViewMap(object):
@@ -91,10 +92,12 @@ def register_view(self, view, timestamp):
9192
if registered_measure is None:
9293
self._registered_measures[measure.name] = measure
9394
self._measure_to_view_data_list_map[view.measure.name].append(
94-
ViewData(view=view, start_time=timestamp, end_time=timestamp))
95+
view_data_module.ViewData(view=view, start_time=timestamp,
96+
end_time=timestamp))
9597

9698
def record(self, tags, measurement_map, timestamp, attachments=None):
9799
"""records stats with a set of tags"""
100+
assert all(vv >= 0 for vv in measurement_map.values())
98101
for measure, value in measurement_map.items():
99102
if measure != self._registered_measures.get(measure.name):
100103
return

opencensus/stats/measurement_map.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,14 @@
1313
# limitations under the License.
1414

1515
from datetime import datetime
16+
import logging
17+
1618
from opencensus.tags import execution_context
1719

1820

21+
logger = logging.getLogger(__name__)
22+
23+
1924
class MeasurementMap(object):
2025
"""Measurement Map is a map from Measures to measured values
2126
to be recorded at the same time
@@ -33,6 +38,10 @@ def __init__(self, measure_to_view_map, attachments=None):
3338
self._measurement_map = {}
3439
self._measure_to_view_map = measure_to_view_map
3540
self._attachments = attachments
41+
# If the user tries to record a negative value for any measurement,
42+
# refuse to record all measurements from this map. Recording negative
43+
# measurements will become an error in a later release.
44+
self._invalid = False
3645

3746
@property
3847
def measurement_map(self):
@@ -51,10 +60,16 @@ def attachments(self):
5160

5261
def measure_int_put(self, measure, value):
5362
"""associates the measure of type Int with the given value"""
63+
if value < 0:
64+
# Should be an error in a later release.
65+
logger.warning("Cannot record negative values")
5466
self._measurement_map[measure] = value
5567

5668
def measure_float_put(self, measure, value):
5769
"""associates the measure of type Float with the given value"""
70+
if value < 0:
71+
# Should be an error in a later release.
72+
logger.warning("Cannot record negative values")
5873
self._measurement_map[measure] = value
5974

6075
def measure_put_attachment(self, key, value):
@@ -75,11 +90,27 @@ def measure_put_attachment(self, key, value):
7590

7691
self._attachments[key] = value
7792

78-
def record(self, tag_map_tags=execution_context.get_current_tag_map()):
93+
def record(self, tag_map_tags=None):
7994
"""records all the measures at the same time with a tag_map.
8095
tag_map could either be explicitly passed to the method, or implicitly
8196
read from current execution context.
8297
"""
98+
if tag_map_tags is None:
99+
tag_map_tags = execution_context.get_current_tag_map()
100+
if self._invalid:
101+
logger.warning("Measurement map has included negative value "
102+
"measurements, refusing to record")
103+
return
104+
for measure, value in self.measurement_map.items():
105+
if value < 0:
106+
self._invalid = True
107+
logger.warning("Dropping values, value to record must be "
108+
"non-negative")
109+
logger.info("Measure '{}' has negative value ({}), refusing "
110+
"to record measurements from {}"
111+
.format(measure.name, value, self))
112+
return
113+
83114
self.measure_to_view_map.record(
84115
tags=tag_map_tags,
85116
measurement_map=self.measurement_map,

tests/unit/stats/exporter/test_prometheus_stats.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242

4343
VIDEO_SIZE_VIEW_NAME = "my.org/views/video_size_test2"
4444
VIDEO_SIZE_DISTRIBUTION = aggregation_module.DistributionAggregation(
45-
[0.0, 16.0 * MiB, 256.0 * MiB])
45+
[16.0 * MiB, 256.0 * MiB])
4646
VIDEO_SIZE_VIEW = view_module.View(
4747
VIDEO_SIZE_VIEW_NAME, "processed video size over time", [FRONTEND_KEY],
4848
VIDEO_SIZE_MEASURE, VIDEO_SIZE_DISTRIBUTION)
@@ -189,7 +189,7 @@ def test_collector_to_metric_histogram(self):
189189
self.assertEqual(desc['name'], metric.name)
190190
self.assertEqual(desc['documentation'], metric.documentation)
191191
self.assertEqual('histogram', metric.type)
192-
self.assertEqual(5, len(metric.samples))
192+
self.assertEqual(4, len(metric.samples))
193193

194194
def test_collector_to_metric_invalid_dist(self):
195195
agg = mock.Mock()
@@ -232,7 +232,7 @@ def test_collector_collect(self):
232232
self.assertEqual(desc['name'], metric.name)
233233
self.assertEqual(desc['documentation'], metric.documentation)
234234
self.assertEqual('histogram', metric.type)
235-
self.assertEqual(5, len(metric.samples))
235+
self.assertEqual(4, len(metric.samples))
236236

237237

238238
class TestPrometheusStatsExporter(unittest.TestCase):

tests/unit/stats/exporter/test_stackdriver_stats.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343

4444
VIDEO_SIZE_VIEW_NAME = "my.org/views/video_size_test2"
4545
VIDEO_SIZE_DISTRIBUTION = aggregation_module.DistributionAggregation(
46-
[0.0, 16.0 * MiB, 256.0 * MiB])
46+
[16.0 * MiB, 256.0 * MiB])
4747
VIDEO_SIZE_VIEW = view_module.View(
4848
VIDEO_SIZE_VIEW_NAME, "processed video size over time", [FRONTEND_KEY],
4949
VIDEO_SIZE_MEASURE, VIDEO_SIZE_DISTRIBUTION)

tests/unit/stats/test_aggregation.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,14 @@ def test_constructor_defaults(self):
100100
distribution_aggregation.aggregation_type)
101101

102102
def test_constructor_explicit(self):
103-
boundaries = ["test"]
104-
distribution = {1: "test"}
103+
boundaries = [1, 2]
104+
distribution = [0, 1, 2]
105105
distribution_aggregation = aggregation_module.DistributionAggregation(
106106
boundaries=boundaries, distribution=distribution)
107107

108-
self.assertEqual(["test"],
108+
self.assertEqual([1, 2],
109109
distribution_aggregation.boundaries.boundaries)
110-
self.assertEqual({1: "test"}, distribution_aggregation.distribution)
110+
self.assertEqual([0, 1, 2], distribution_aggregation.distribution)
111111
self.assertEqual(aggregation_module.Type.DISTRIBUTION,
112112
distribution_aggregation.aggregation_type)
113113

@@ -122,3 +122,20 @@ def test_min_max(self):
122122

123123
self.assertEqual(da.aggregation_data.min, -10)
124124
self.assertEqual(da.aggregation_data.max, 10)
125+
126+
def test_init_bad_boundaries(self):
127+
"""Check that boundaries must be sorted and unique."""
128+
with self.assertRaises(ValueError):
129+
aggregation_module.DistributionAggregation([1, 3, 2])
130+
with self.assertRaises(ValueError):
131+
aggregation_module.DistributionAggregation([1, 1, 2])
132+
133+
def test_init_negative_boundaries(self):
134+
"""Check that non-positive boundaries are dropped."""
135+
da = aggregation_module.DistributionAggregation([-2, -1, 0, 1, 2])
136+
self.assertEqual(da.boundaries.boundaries, [1, 2])
137+
self.assertEqual(da.aggregation_data.bounds, [1, 2])
138+
139+
da2 = aggregation_module.DistributionAggregation([-2, -1])
140+
self.assertEqual(da2.boundaries.boundaries, [])
141+
self.assertEqual(da2.aggregation_data.bounds, [])

0 commit comments

Comments
 (0)