Skip to content

Commit 652e319

Browse files
robertlaurinfbogsanyahayworth
authored
feat: Metrics SDK Synchronous aggregation (#1346)
* Feat: add sync aggregation * More tests * add more tests * Chore: Apply Francis suggested changes Co-authored-by: Francis Bogsanyi <[email protected]> * fix: syntax error on review suggestion * fix: assert histogram bounds are sorted * fix: remove dead code * chore: update comment for default boundaries * fix: add counter instrument tests * fix: Add histogram test * fix: add up down counter test * fix: remove shutdown check on test exporter * fix: only set min max when enabled * chore: appease the cop * fix: remove unused mutex in sum agg * fix: remove unused flags field * fix: apply Francis code suggestions Co-authored-by: Francis Bogsanyi <[email protected]> * chore: appease the cop * fix: dup data points on collection * chore: appease the cop * fix: histogram min max conditional assignment * fix: Apply Francis feedback Co-authored-by: Francis Bogsanyi <[email protected]> * fix: handle nil boundaries on explicit histogram * fix: empty boundaries to behave same as nil * fix: Francis temporality refactoring Co-authored-by: Francis Bogsanyi <[email protected]> * fix: add temporality tests for sum aggregation * fix: add histogram aggregation tests * fix: rebase fixups * fix: Apply Francis feedback Co-authored-by: Francis Bogsanyi <[email protected]> * fix: Set differing start end times Co-authored-by: Francis Bogsanyi <[email protected]> Co-authored-by: Andrew Hayworth <[email protected]>
1 parent cb883ed commit 652e319

File tree

19 files changed

+700
-86
lines changed

19 files changed

+700
-86
lines changed

metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation.rb

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,18 @@
44
#
55
# SPDX-License-Identifier: Apache-2.0
66

7-
require 'opentelemetry/sdk/metrics/aggregation/histogram'
8-
97
module OpenTelemetry
108
module SDK
119
module Metrics
1210
# The Aggregation module contains the OpenTelemetry metrics reference
1311
# aggregation implementations.
1412
module Aggregation
15-
extend self
16-
17-
SUM = ->(v1, v2) { v1 + v2 }
18-
EXPLICIT_BUCKET_HISTOGRAM = ExplicitBucketHistogram.new
1913
end
2014
end
2115
end
2216
end
17+
18+
require 'opentelemetry/sdk/metrics/aggregation/number_data_point'
19+
require 'opentelemetry/sdk/metrics/aggregation/histogram_data_point'
20+
require 'opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram'
21+
require 'opentelemetry/sdk/metrics/aggregation/sum'
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright The OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module SDK
9+
module Metrics
10+
module Aggregation
11+
# Contains the implementation of the ExplicitBucketHistogram aggregation
12+
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#explicit-bucket-histogram-aggregation
13+
class ExplicitBucketHistogram
14+
DEFAULT_BOUNDARIES = [0, 5, 10, 25, 50, 75, 100, 250, 500, 1000].freeze
15+
private_constant :DEFAULT_BOUNDARIES
16+
17+
# The default value for boundaries represents the following buckets:
18+
# (-inf, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0],
19+
# (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0],
20+
# (500.0, 1000.0], (1000.0, +inf)
21+
def initialize(
22+
aggregation_temporality: :delta,
23+
boundaries: DEFAULT_BOUNDARIES,
24+
record_min_max: true
25+
)
26+
@data_points = {}
27+
@aggregation_temporality = aggregation_temporality
28+
@boundaries = boundaries && !boundaries.empty? ? boundaries.sort : nil
29+
@record_min_max = record_min_max
30+
end
31+
32+
def collect(start_time, end_time)
33+
if @aggregation_temporality == :delta
34+
# Set timestamps and 'move' data point values to result.
35+
hdps = @data_points.values.each do |hdp|
36+
hdp.start_time_unix_nano = start_time
37+
hdp.time_unix_nano = end_time
38+
end
39+
@data_points.clear
40+
hdps
41+
else
42+
# Update timestamps and take a snapshot.
43+
@data_points.values.map! do |hdp|
44+
hdp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation.
45+
hdp.time_unix_nano = end_time
46+
hdp = hdp.dup
47+
hdp.bucket_counts = hdp.bucket_counts.dup
48+
hdp
49+
end
50+
end
51+
end
52+
53+
def update(amount, attributes)
54+
hdp = @data_points.fetch(attributes) do
55+
if @record_min_max
56+
min = Float::INFINITY
57+
max = -Float::INFINITY
58+
end
59+
60+
@data_points[attributes] = HistogramDataPoint.new(
61+
attributes,
62+
nil, # :start_time_unix_nano
63+
nil, # :time_unix_nano
64+
0, # :count
65+
0, # :sum
66+
empty_bucket_counts, # :bucket_counts
67+
@boundaries, # :explicit_bounds
68+
nil, # :exemplars
69+
min, # :min
70+
max # :max
71+
)
72+
end
73+
74+
if @record_min_max
75+
hdp.max = amount if amount > hdp.max
76+
hdp.min = amount if amount < hdp.min
77+
end
78+
79+
hdp.sum += amount
80+
hdp.count += 1
81+
if @boundaries
82+
bucket_index = @boundaries.bsearch_index { _1 >= amount } || @boundaries.size
83+
hdp.bucket_counts[bucket_index] += 1
84+
end
85+
nil
86+
end
87+
88+
private
89+
90+
def empty_bucket_counts
91+
@boundaries ? Array.new(@boundaries.size + 1, 0) : nil
92+
end
93+
end
94+
end
95+
end
96+
end
97+
end

metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/histogram.rb

Lines changed: 0 additions & 30 deletions
This file was deleted.
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright The OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module SDK
9+
module Metrics
10+
module Aggregation
11+
HistogramDataPoint = Struct.new(:attributes, # optional Hash{String => String, Numeric, Boolean, Array<String, Numeric, Boolean>}
12+
:start_time_unix_nano, # Integer nanoseconds since Epoch
13+
:time_unix_nano, # Integer nanoseconds since Epoch
14+
:count, # Integer count is the number of values in the population. Must be non-negative.
15+
:sum, # Integer sum of the values in the population. If count is zero then this field then this field must be zero
16+
:bucket_counts, # optional Array[Integer] field contains the count values of histogram for each bucket.
17+
:explicit_bounds, # Array[Float] specifies buckets with explicitly defined bounds for values.
18+
:exemplars, # optional List of exemplars collected from measurements that were used to form the data point
19+
:min, # optional Float min is the minimum value over (start_time, end_time].
20+
:max) # optional Float max is the maximum value over (start_time, end_time].
21+
end
22+
end
23+
end
24+
end
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright The OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module SDK
9+
module Metrics
10+
module Aggregation
11+
NumberDataPoint = Struct.new(:attributes, # Hash{String => String, Numeric, Boolean, Array<String, Numeric, Boolean>}
12+
:start_time_unix_nano, # Integer nanoseconds since Epoch
13+
:time_unix_nano, # Integer nanoseconds since Epoch
14+
:value, # Integer
15+
:exemplars) # optional List of exemplars collected from measurements that were used to form the data point
16+
end
17+
end
18+
end
19+
end
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright The OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module SDK
9+
module Metrics
10+
module Aggregation
11+
# Contains the implementation of the Sum aggregation
12+
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#sum-aggregation
13+
class Sum
14+
def initialize(aggregation_temporality: :delta)
15+
@aggregation_temporality = aggregation_temporality
16+
@data_points = {}
17+
end
18+
19+
def collect(start_time, end_time)
20+
if @aggregation_temporality == :delta
21+
# Set timestamps and 'move' data point values to result.
22+
ndps = @data_points.values.each do |ndp|
23+
ndp.start_time_unix_nano = start_time
24+
ndp.time_unix_nano = end_time
25+
end
26+
@data_points.clear
27+
ndps
28+
else
29+
# Update timestamps and take a snapshot.
30+
@data_points.values.map! do |ndp|
31+
ndp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation.
32+
ndp.time_unix_nano = end_time
33+
ndp.dup
34+
end
35+
end
36+
end
37+
38+
def update(increment, attributes)
39+
ndp = @data_points[attributes] || @data_points[attributes] = NumberDataPoint.new(
40+
attributes,
41+
nil,
42+
nil,
43+
0,
44+
nil
45+
)
46+
47+
ndp.value += increment
48+
nil
49+
end
50+
end
51+
end
52+
end
53+
end
54+
end

metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ def pull
2525

2626
def export(metrics)
2727
@mutex.synchronize do
28-
return FAILURE if @stopped
29-
3028
@metric_snapshots << metrics
3129
end
3230
SUCCESS

metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/counter.rb

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ module Metrics
1010
module Instrument
1111
# {Counter} is the SDK implementation of {OpenTelemetry::Metrics::Counter}.
1212
class Counter < OpenTelemetry::SDK::Metrics::Instrument::SynchronousInstrument
13-
DEFAULT_AGGREGATION = OpenTelemetry::SDK::Metrics::Aggregation::SUM
14-
1513
# Returns the instrument kind as a Symbol
1614
#
1715
# @return [Symbol]
@@ -33,16 +31,19 @@ def add(increment, attributes: {})
3331
if increment.negative?
3432
OpenTelemetry.logger.warn("#{@name} received a negative value")
3533
else
36-
update(
37-
OpenTelemetry::Metrics::Measurement.new(increment, attributes),
38-
DEFAULT_AGGREGATION
39-
)
34+
update(increment, attributes)
4035
end
4136
nil
4237
rescue StandardError => e
4338
OpenTelemetry.handle_error(exception: e)
4439
nil
4540
end
41+
42+
private
43+
44+
def default_aggregation
45+
OpenTelemetry::SDK::Metrics::Aggregation::Sum.new
46+
end
4647
end
4748
end
4849
end

metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/histogram.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ module Metrics
1010
module Instrument
1111
# {Histogram} is the SDK implementation of {OpenTelemetry::Metrics::Histogram}.
1212
class Histogram < OpenTelemetry::SDK::Metrics::Instrument::SynchronousInstrument
13-
DEFAULT_AGGREGATION = OpenTelemetry::SDK::Metrics::Aggregation::EXPLICIT_BUCKET_HISTOGRAM
14-
1513
# Returns the instrument kind as a Symbol
1614
#
1715
# @return [Symbol]
@@ -27,12 +25,18 @@ def instrument_kind
2725
# Array values must not contain nil elements and all elements must be of
2826
# the same basic type (string, numeric, boolean).
2927
def record(amount, attributes: nil)
30-
update(OpenTelemetry::Metrics::Measurement.new(amount, attributes))
28+
update(amount, attributes)
3129
nil
3230
rescue StandardError => e
3331
OpenTelemetry.handle_error(exception: e)
3432
nil
3533
end
34+
35+
private
36+
37+
def default_aggregation
38+
OpenTelemetry::SDK::Metrics::Aggregation::ExplicitBucketHistogram.new
39+
end
3640
end
3741
end
3842
end

metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/synchronous_instrument.rb

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,28 @@ def initialize(name, unit, description, instrumentation_scope, meter_provider)
1919
@meter_provider = meter_provider
2020
@metric_streams = []
2121

22-
meter_provider.metric_readers.each do |metric_reader|
23-
register_with_new_metric_store(metric_reader.metric_store)
24-
end
22+
meter_provider.register_synchronous_instrument(self)
2523
end
2624

2725
# @api private
28-
def register_with_new_metric_store(metric_store)
26+
def register_with_new_metric_store(metric_store, aggregation: default_aggregation)
2927
ms = OpenTelemetry::SDK::Metrics::State::MetricStream.new(
3028
@name,
3129
@description,
3230
@unit,
3331
instrument_kind,
3432
@meter_provider,
35-
@instrumentation_scope
33+
@instrumentation_scope,
34+
aggregation
3635
)
3736
@metric_streams << ms
3837
metric_store.add_metric_stream(ms)
3938
end
4039

4140
private
4241

43-
def update(measurement, aggregation)
44-
@metric_streams.each do |ms|
45-
ms.update(measurement, aggregation)
46-
end
42+
def update(value, attributes)
43+
@metric_streams.each { |ms| ms.update(value, attributes) }
4744
end
4845
end
4946
end

0 commit comments

Comments
 (0)