diff --git a/exporter/otlp-metrics/test/test_helper.rb b/exporter/otlp-metrics/test/test_helper.rb index 9db443b396..6f77afbaeb 100644 --- a/exporter/otlp-metrics/test/test_helper.rb +++ b/exporter/otlp-metrics/test/test_helper.rb @@ -17,7 +17,7 @@ OpenTelemetry.logger = Logger.new(File::NULL) module MockSum - def collect(start_time, end_time, data_points) + def collect(start_time, end_time) start_time = 1_699_593_427_329_946_585 # rubocop:disable Lint/ShadowedArgument end_time = 1_699_593_427_329_946_586 # rubocop:disable Lint/ShadowedArgument super diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/drop.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/drop.rb index 65e351b328..83ac85447d 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/drop.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/drop.rb @@ -10,12 +10,16 @@ module Metrics module Aggregation # Contains the implementation of the Drop aggregation class Drop - def collect(start_time, end_time, data_points) - data_points.values.map!(&:dup) + def initialize + @data_points = {} end - def update(increment, attributes, data_points) - data_points[attributes] = NumberDataPoint.new( + def collect(start_time, end_time) + @data_points.values.map!(&:dup) + end + + def update(increment, attributes) + @data_points[attributes] = NumberDataPoint.new( {}, 0, 0, diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb index 768036fc86..600eda6dbd 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb @@ -26,21 +26,22 @@ def initialize( @aggregation_temporality = AggregationTemporality.determine_temporality(aggregation_temporality: aggregation_temporality, default: :cumulative) @boundaries = boundaries && !boundaries.empty? ? boundaries.sort : nil @record_min_max = record_min_max + @data_points = {} end - def collect(start_time, end_time, data_points) + def collect(start_time, end_time) if @aggregation_temporality.delta? # Set timestamps and 'move' data point values to result. - hdps = data_points.values.map! do |hdp| + hdps = @data_points.values.map! do |hdp| hdp.start_time_unix_nano = start_time hdp.time_unix_nano = end_time hdp end - data_points.clear + @data_points.clear hdps else # Update timestamps and take a snapshot. - data_points.values.map! do |hdp| + @data_points.values.map! do |hdp| hdp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation. hdp.time_unix_nano = end_time hdp = hdp.dup @@ -50,14 +51,14 @@ def collect(start_time, end_time, data_points) end end - def update(amount, attributes, data_points) - hdp = data_points.fetch(attributes) do + def update(amount, attributes) + hdp = @data_points.fetch(attributes) do if @record_min_max min = Float::INFINITY max = -Float::INFINITY end - data_points[attributes] = HistogramDataPoint.new( + @data_points[attributes] = HistogramDataPoint.new( attributes, nil, # :start_time_unix_nano nil, # :time_unix_nano diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb index de82871a53..77cf65a019 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb @@ -42,23 +42,24 @@ def initialize( @zero_count = 0 @size = validate_size(max_size) @scale = validate_scale(max_scale) + @data_points = {} @mapping = new_mapping(@scale) end - def collect(start_time, end_time, data_points) + def collect(start_time, end_time) if @aggregation_temporality.delta? # Set timestamps and 'move' data point values to result. - hdps = data_points.values.map! do |hdp| + hdps = @data_points.values.map! do |hdp| hdp.start_time_unix_nano = start_time hdp.time_unix_nano = end_time hdp end - data_points.clear + @data_points.clear hdps else # Update timestamps and take a snapshot. - data_points.values.map! do |hdp| + @data_points.values.map! do |hdp| hdp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation. hdp.time_unix_nano = end_time hdp = hdp.dup @@ -70,15 +71,14 @@ def collect(start_time, end_time, data_points) end # rubocop:disable Metrics/MethodLength - def update(amount, attributes, data_points) - # fetch or initialize the ExponentialHistogramDataPoint - hdp = data_points.fetch(attributes) do + def update(amount, attributes) + hdp = @data_points.fetch(attributes) do if @record_min_max min = Float::INFINITY max = -Float::INFINITY end - data_points[attributes] = ExponentialHistogramDataPoint.new( + @data_points[attributes] = ExponentialHistogramDataPoint.new( attributes, nil, # :start_time_unix_nano 0, # :time_unix_nano diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/last_value.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/last_value.rb index 8fb05912e2..2c1ba86a84 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/last_value.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/last_value.rb @@ -10,18 +10,22 @@ module Metrics module Aggregation # Contains the implementation of the LastValue aggregation class LastValue - def collect(start_time, end_time, data_points) - ndps = data_points.values.map! do |ndp| + def initialize + @data_points = {} + end + + def collect(start_time, end_time) + ndps = @data_points.values.map! do |ndp| ndp.start_time_unix_nano = start_time ndp.time_unix_nano = end_time ndp end - data_points.clear + @data_points.clear ndps end - def update(increment, attributes, data_points) - data_points[attributes] = NumberDataPoint.new( + def update(increment, attributes) + @data_points[attributes] = NumberDataPoint.new( attributes, nil, nil, diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb index fc1e65a4f8..d77dd17b12 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb @@ -14,21 +14,22 @@ class Sum def initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :cumulative), monotonic: false, instrument_kind: nil) @aggregation_temporality = AggregationTemporality.determine_temporality(aggregation_temporality: aggregation_temporality, instrument_kind: instrument_kind, default: :cumulative) @monotonic = monotonic + @data_points = {} end - def collect(start_time, end_time, data_points) + def collect(start_time, end_time) if @aggregation_temporality.delta? # Set timestamps and 'move' data point values to result. - ndps = data_points.values.map! do |ndp| + ndps = @data_points.values.map! do |ndp| ndp.start_time_unix_nano = start_time ndp.time_unix_nano = end_time ndp end - data_points.clear + @data_points.clear ndps else # Update timestamps and take a snapshot. - data_points.values.map! do |ndp| + @data_points.values.map! do |ndp| ndp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation. ndp.time_unix_nano = end_time ndp.dup @@ -40,10 +41,11 @@ def monotonic? @monotonic end - def update(increment, attributes, data_points) + # no double exporting so when view exist, then we only export the metric_data processed by view + def update(increment, attributes) return if @monotonic && increment < 0 - ndp = data_points[attributes] || data_points[attributes] = NumberDataPoint.new( + ndp = @data_points[attributes] || @data_points[attributes] = NumberDataPoint.new( attributes, nil, nil, diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb index 759239cf41..10824be27e 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb @@ -13,6 +13,8 @@ module State # The AsynchronousMetricStream class provides SDK internal functionality that is not a part of the # public API. It extends MetricStream to support asynchronous instruments. class AsynchronousMetricStream < MetricStream + DEFAULT_TIMEOUT = 30 + def initialize( name, description, @@ -48,23 +50,21 @@ def collect(start_time, end_time) def invoke_callback(timeout, attributes) if @registered_views.empty? @mutex.synchronize do - Timeout.timeout(timeout || 30) do - @callback.each do |cb| - value = cb.call - @default_aggregation.update(value, attributes, @data_points) - end + @callback.each do |cb| + value = safe_guard_callback(cb, timeout: timeout) + @default_aggregation.update(value, attributes) if value.is_a?(Numeric) end end else @registered_views.each do |view| @mutex.synchronize do - Timeout.timeout(timeout || 30) do - @callback.each do |cb| - value = cb.call - merged_attributes = attributes || {} - merged_attributes.merge!(view.attribute_keys) - view.aggregation.update(value, merged_attributes, @data_points) if view.valid_aggregation? - end + @callback.each do |cb| + value = safe_guard_callback(cb, timeout: timeout) + next unless value.is_a?(Numeric) # ignore if value is not valid number + + merged_attributes = attributes || {} + merged_attributes.merge!(view.attribute_keys) + view.aggregation.update(value, merged_attributes) if view.valid_aggregation? end end end @@ -74,6 +74,29 @@ def invoke_callback(timeout, attributes) def now_in_nano (Time.now.to_r * 1_000_000_000).to_i end + + private + + def safe_guard_callback(callback, timeout: DEFAULT_TIMEOUT) + result = nil + thread = Thread.new do + result = callback.call + rescue StandardError => e + OpenTelemetry.logger.error("Error invoking callback: #{e.message}") + result = :error + end + + unless thread.join(timeout) + thread.kill + OpenTelemetry.logger.error("Timeout while invoking callback after #{timeout} seconds") + return nil + end + + result == :error ? nil : result + rescue StandardError => e + OpenTelemetry.logger.error("Unexpected error in callback execution: #{e.message}") + nil + end end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb index 3fa3bd684c..2106bf66c8 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb @@ -13,7 +13,7 @@ module State # The MetricStream class provides SDK internal functionality that is not a part of the # public API. class MetricStream - attr_reader :name, :description, :unit, :instrument_kind, :instrumentation_scope, :data_points + attr_reader :name, :description, :unit, :instrument_kind, :instrumentation_scope def initialize( name, @@ -31,7 +31,6 @@ def initialize( @meter_provider = meter_provider @instrumentation_scope = instrumentation_scope @default_aggregation = aggregation - @data_points = {} @registered_views = [] find_registered_view @@ -42,28 +41,30 @@ def collect(start_time, end_time) @mutex.synchronize do metric_data = [] - # data points are required to export over OTLP - return metric_data if @data_points.empty? - if @registered_views.empty? metric_data << aggregate_metric_data(start_time, end_time) else @registered_views.each { |view| metric_data << aggregate_metric_data(start_time, end_time, aggregation: view.aggregation) } end + # reject metric_data with empty data_points + # data points are required to export over OTLP + metric_data.reject! { |metric| metric.data_points.empty? } + return [] if metric_data.empty? + metric_data end end def update(value, attributes) if @registered_views.empty? - @mutex.synchronize { @default_aggregation.update(value, attributes, @data_points) } + @mutex.synchronize { @default_aggregation.update(value, attributes) } else @registered_views.each do |view| @mutex.synchronize do attributes ||= {} attributes.merge!(view.attribute_keys) - view.aggregation.update(value, attributes, @data_points) if view.valid_aggregation? + view.aggregation.update(value, attributes) if view.valid_aggregation? end end end @@ -81,7 +82,7 @@ def aggregate_metric_data(start_time, end_time, aggregation: nil) @instrument_kind, @meter_provider.resource, @instrumentation_scope, - aggregator.collect(start_time, end_time, @data_points), + aggregator.collect(start_time, end_time), aggregation_temporality, start_time, end_time, @@ -96,17 +97,15 @@ def find_registered_view end def to_s - instrument_info = +'' + instrument_info = [] instrument_info << "name=#{@name}" instrument_info << " description=#{@description}" if @description instrument_info << " unit=#{@unit}" if @unit - @data_points.map do |attributes, value| - metric_stream_string = +'' - metric_stream_string << instrument_info - metric_stream_string << " attributes=#{attributes}" if attributes - metric_stream_string << " #{value}" - metric_stream_string - end.join("\n") + instrument_info << " instrument_kind=#{@instrument_kind}" if @instrument_kind + instrument_info << " instrumentation_scope=#{@instrumentation_scope.name}@#{@instrumentation_scope.version}" if @instrumentation_scope + instrument_info << " default_aggregation=#{@default_aggregation.class}" if @default_aggregation + instrument_info << " registered_views=#{@registered_views.map { |view| "name=#{view.name}, aggregation=#{view.aggregation.class}" }.join('; ')}" unless @registered_views.empty? + instrument_info.join('.') end end end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/drop_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/drop_test.rb index b2212b462a..df8873c2d1 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/drop_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/drop_test.rb @@ -7,7 +7,6 @@ require 'test_helper' describe OpenTelemetry::SDK::Metrics::Aggregation::Drop do - let(:data_points) { {} } let(:drop_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Drop.new } let(:aggregation_temporality) { :delta } @@ -20,20 +19,20 @@ end it 'sets the timestamps' do - drop_aggregation.update(0, {}, data_points) - ndp = drop_aggregation.collect(start_time, end_time, data_points)[0] + drop_aggregation.update(0, {}) + ndp = drop_aggregation.collect(start_time, end_time)[0] _(ndp.start_time_unix_nano).must_equal(0) _(ndp.time_unix_nano).must_equal(0) end it 'aggregates and collects should collect no value for all collection' do - drop_aggregation.update(1, {}, data_points) - drop_aggregation.update(2, {}, data_points) + drop_aggregation.update(1, {}) + drop_aggregation.update(2, {}) - drop_aggregation.update(2, { 'foo' => 'bar' }, data_points) - drop_aggregation.update(2, { 'foo' => 'bar' }, data_points) + drop_aggregation.update(2, { 'foo' => 'bar' }) + drop_aggregation.update(2, { 'foo' => 'bar' }) - ndps = drop_aggregation.collect(start_time, end_time, data_points) + ndps = drop_aggregation.collect(start_time, end_time) _(ndps.size).must_equal(2) _(ndps[0].value).must_equal(0) diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram_test.rb index 8dc7c2eabe..e5ff398913 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram_test.rb @@ -7,7 +7,6 @@ require 'test_helper' describe OpenTelemetry::SDK::Metrics::Aggregation::ExplicitBucketHistogram do - let(:data_points) { {} } let(:ebh) do OpenTelemetry::SDK::Metrics::Aggregation::ExplicitBucketHistogram.new( aggregation_temporality:, @@ -91,19 +90,19 @@ describe '#collect' do it 'returns all the data points' do - ebh.update(0, {}, data_points) - ebh.update(1, {}, data_points) - ebh.update(5, {}, data_points) - ebh.update(6, {}, data_points) - ebh.update(10, {}, data_points) - - ebh.update(-10, { 'foo' => 'bar' }, data_points) - ebh.update(1, { 'foo' => 'bar' }, data_points) - ebh.update(22, { 'foo' => 'bar' }, data_points) - ebh.update(55, { 'foo' => 'bar' }, data_points) - ebh.update(80, { 'foo' => 'bar' }, data_points) - - hdps = ebh.collect(start_time, end_time, data_points) + ebh.update(0, {}) + ebh.update(1, {}) + ebh.update(5, {}) + ebh.update(6, {}) + ebh.update(10, {}) + + ebh.update(-10, { 'foo' => 'bar' }) + ebh.update(1, { 'foo' => 'bar' }) + ebh.update(22, { 'foo' => 'bar' }) + ebh.update(55, { 'foo' => 'bar' }) + ebh.update(80, { 'foo' => 'bar' }) + + hdps = ebh.collect(start_time, end_time) _(hdps.size).must_equal(2) _(hdps[0].attributes).must_equal({}) _(hdps[0].count).must_equal(5) @@ -123,34 +122,34 @@ end it 'sets the timestamps' do - ebh.update(0, {}, data_points) - hdp = ebh.collect(start_time, end_time, data_points)[0] + ebh.update(0, {}) + hdp = ebh.collect(start_time, end_time)[0] _(hdp.start_time_unix_nano).must_equal(start_time) _(hdp.time_unix_nano).must_equal(end_time) end it 'calculates the count' do - ebh.update(0, {}, data_points) - ebh.update(0, {}, data_points) - ebh.update(0, {}, data_points) - ebh.update(0, {}, data_points) - hdp = ebh.collect(start_time, end_time, data_points)[0] + ebh.update(0, {}) + ebh.update(0, {}) + ebh.update(0, {}) + ebh.update(0, {}) + hdp = ebh.collect(start_time, end_time)[0] _(hdp.count).must_equal(4) end it 'does not aggregate between collects with default delta aggregation' do - ebh.update(0, {}, data_points) - ebh.update(1, {}, data_points) - ebh.update(5, {}, data_points) - ebh.update(6, {}, data_points) - ebh.update(10, {}, data_points) - hdps = ebh.collect(start_time, end_time, data_points) - - ebh.update(0, {}, data_points) - ebh.update(1, {}, data_points) - ebh.update(5, {}, data_points) - ebh.update(6, {}, data_points) - ebh.update(10, {}, data_points) + ebh.update(0, {}) + ebh.update(1, {}) + ebh.update(5, {}) + ebh.update(6, {}) + ebh.update(10, {}) + hdps = ebh.collect(start_time, end_time) + + ebh.update(0, {}) + ebh.update(1, {}) + ebh.update(5, {}) + ebh.update(6, {}) + ebh.update(10, {}) # Assert that the recent update does not # impact the already collected metrics _(hdps[0].count).must_equal(5) @@ -159,7 +158,7 @@ _(hdps[0].max).must_equal(10) _(hdps[0].bucket_counts).must_equal([1, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0]) - hdps = ebh.collect(start_time, end_time, data_points) + hdps = ebh.collect(start_time, end_time) # Assert that we are not accumulating values # between calls to collect _(hdps[0].count).must_equal(5) @@ -173,18 +172,18 @@ let(:aggregation_temporality) { :not_delta } it 'allows metrics to accumulate' do - ebh.update(0, {}, data_points) - ebh.update(1, {}, data_points) - ebh.update(5, {}, data_points) - ebh.update(6, {}, data_points) - ebh.update(10, {}, data_points) - hdps = ebh.collect(start_time, end_time, data_points) - - ebh.update(0, {}, data_points) - ebh.update(1, {}, data_points) - ebh.update(5, {}, data_points) - ebh.update(6, {}, data_points) - ebh.update(10, {}, data_points) + ebh.update(0, {}) + ebh.update(1, {}) + ebh.update(5, {}) + ebh.update(6, {}) + ebh.update(10, {}) + hdps = ebh.collect(start_time, end_time) + + ebh.update(0, {}) + ebh.update(1, {}) + ebh.update(5, {}) + ebh.update(6, {}) + ebh.update(10, {}) # Assert that the recent update does not # impact the already collected metrics _(hdps[0].count).must_equal(5) @@ -193,7 +192,7 @@ _(hdps[0].max).must_equal(10) _(hdps[0].bucket_counts).must_equal([1, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0]) - hdps1 = ebh.collect(start_time, end_time, data_points) + hdps1 = ebh.collect(start_time, end_time) # Assert that we are accumulating values # and not just capturing the delta since # the previous collect call @@ -216,38 +215,38 @@ describe '#update' do it 'accumulates across the default boundaries' do - ebh.update(0, {}, data_points) + ebh.update(0, {}) - ebh.update(1, {}, data_points) - ebh.update(5, {}, data_points) + ebh.update(1, {}) + ebh.update(5, {}) - ebh.update(6, {}, data_points) - ebh.update(10, {}, data_points) + ebh.update(6, {}) + ebh.update(10, {}) - ebh.update(11, {}, data_points) - ebh.update(25, {}, data_points) + ebh.update(11, {}) + ebh.update(25, {}) - ebh.update(26, {}, data_points) - ebh.update(50, {}, data_points) + ebh.update(26, {}) + ebh.update(50, {}) - ebh.update(51, {}, data_points) - ebh.update(75, {}, data_points) + ebh.update(51, {}) + ebh.update(75, {}) - ebh.update(76, {}, data_points) - ebh.update(100, {}, data_points) + ebh.update(76, {}) + ebh.update(100, {}) - ebh.update(101, {}, data_points) - ebh.update(250, {}, data_points) + ebh.update(101, {}) + ebh.update(250, {}) - ebh.update(251, {}, data_points) - ebh.update(500, {}, data_points) + ebh.update(251, {}) + ebh.update(500, {}) - ebh.update(501, {}, data_points) - ebh.update(1000, {}, data_points) + ebh.update(501, {}) + ebh.update(1000, {}) - ebh.update(1001, {}, data_points) + ebh.update(1001, {}) - hdp = ebh.collect(start_time, end_time, data_points)[0] + hdp = ebh.collect(start_time, end_time)[0] _(hdp.bucket_counts).must_equal([1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1]) _(hdp.sum).must_equal(4040) _(hdp.min).must_equal(0) @@ -258,8 +257,8 @@ let(:boundaries) { [4, 2, 1] } it 'sorts it' do - ebh.update(0, {}, data_points) - _(ebh.collect(start_time, end_time, data_points)[0].explicit_bounds).must_equal([1, 2, 4]) + ebh.update(0, {}) + _(ebh.collect(start_time, end_time)[0].explicit_bounds).must_equal([1, 2, 4]) end end @@ -267,8 +266,8 @@ let(:record_min_max) { false } it 'does not record min max values' do - ebh.update(-1, {}, data_points) - hdp = ebh.collect(start_time, end_time, data_points)[0] + ebh.update(-1, {}) + hdp = ebh.collect(start_time, end_time)[0] _(hdp.min).must_be_nil _(hdp.min).must_be_nil end @@ -278,14 +277,14 @@ let(:boundaries) { [0, 2, 4] } it 'aggregates' do - ebh.update(-1, {}, data_points) - ebh.update(0, {}, data_points) - ebh.update(1, {}, data_points) - ebh.update(2, {}, data_points) - ebh.update(3, {}, data_points) - ebh.update(4, {}, data_points) - ebh.update(5, {}, data_points) - hdp = ebh.collect(start_time, end_time, data_points)[0] + ebh.update(-1, {}) + ebh.update(0, {}) + ebh.update(1, {}) + ebh.update(2, {}) + ebh.update(3, {}) + ebh.update(4, {}) + ebh.update(5, {}) + hdp = ebh.collect(start_time, end_time)[0] _(hdp.bucket_counts).must_equal([2, 2, 2, 1]) end @@ -295,9 +294,9 @@ let(:boundaries) { [0] } it 'aggregates' do - ebh.update(-1, {}, data_points) - ebh.update(1, {}, data_points) - hdp = ebh.collect(start_time, end_time, data_points)[0] + ebh.update(-1, {}) + ebh.update(1, {}) + hdp = ebh.collect(start_time, end_time)[0] _(hdp.bucket_counts).must_equal([1, 1]) end @@ -307,9 +306,9 @@ let(:boundaries) { [] } it 'aggregates but does not record bucket counts' do - ebh.update(-1, {}, data_points) - ebh.update(3, {}, data_points) - hdp = ebh.collect(start_time, end_time, data_points)[0] + ebh.update(-1, {}) + ebh.update(3, {}) + hdp = ebh.collect(start_time, end_time)[0] _(hdp.bucket_counts).must_be_nil _(hdp.explicit_bounds).must_be_nil @@ -324,9 +323,9 @@ let(:boundaries) { nil } it 'aggregates but does not record bucket counts' do - ebh.update(-1, {}, data_points) - ebh.update(3, {}, data_points) - hdp = ebh.collect(start_time, end_time, data_points)[0] + ebh.update(-1, {}) + ebh.update(3, {}) + hdp = ebh.collect(start_time, end_time)[0] _(hdp.bucket_counts).must_be_nil _(hdp.explicit_bounds).must_be_nil diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram_test.rb index 0b09aeb615..60cb98dd7e 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram_test.rb @@ -17,7 +17,6 @@ ) end - let(:data_points) { {} } let(:record_min_max) { true } let(:max_size) { 20 } let(:max_scale) { 5 } @@ -28,14 +27,14 @@ describe '#collect' do it 'returns all the data points' do - expbh.update(1.03, {}, data_points) - expbh.update(1.23, {}, data_points) - expbh.update(0, {}, data_points) + expbh.update(1.03, {}) + expbh.update(1.23, {}) + expbh.update(0, {}) - expbh.update(1.45, { 'foo' => 'bar' }, data_points) - expbh.update(1.67, { 'foo' => 'bar' }, data_points) + expbh.update(1.45, { 'foo' => 'bar' }) + expbh.update(1.67, { 'foo' => 'bar' }) - exphdps = expbh.collect(start_time, end_time, data_points) + exphdps = expbh.collect(start_time, end_time) _(exphdps.size).must_equal(2) _(exphdps[0].attributes).must_equal({}) @@ -80,11 +79,11 @@ zero_threshold: 0 ) - expbh.update(2, {}, data_points) - expbh.update(4, {}, data_points) - expbh.update(1, {}, data_points) + expbh.update(2, {}) + expbh.update(4, {}) + expbh.update(1, {}) - exphdps = expbh.collect(start_time, end_time, data_points) + exphdps = expbh.collect(start_time, end_time) _(exphdps.size).must_equal(1) _(exphdps[0].attributes).must_equal({}) @@ -113,14 +112,14 @@ zero_threshold: 0 ) - expbh.update(2, {}, data_points) - expbh.update(2, {}, data_points) - expbh.update(2, {}, data_points) - expbh.update(1, {}, data_points) - expbh.update(8, {}, data_points) - expbh.update(0.5, {}, data_points) + expbh.update(2, {}) + expbh.update(2, {}) + expbh.update(2, {}) + expbh.update(1, {}) + expbh.update(8, {}) + expbh.update(0.5, {}) - exphdps = expbh.collect(start_time, end_time, data_points) + exphdps = expbh.collect(start_time, end_time) _(exphdps.size).must_equal(1) _(exphdps[0].attributes).must_equal({}) @@ -181,10 +180,10 @@ ) permutation.each do |value| - expbh.update(value, {}, data_points) + expbh.update(value, {}) end - exphdps = expbh.collect(start_time, end_time, data_points) + exphdps = expbh.collect(start_time, end_time) assert_equal expected[:scale], exphdps[0].scale assert_equal expected[:offset], exphdps[0].positive.offset @@ -204,11 +203,11 @@ zero_threshold: 0 ) - expbh.update(Float::MAX, {}, data_points) - expbh.update(1, {}, data_points) - expbh.update(2**-1074, {}, data_points) + expbh.update(Float::MAX, {}) + expbh.update(1, {}) + expbh.update(2**-1074, {}) - exphdps = expbh.collect(start_time, end_time, data_points) + exphdps = expbh.collect(start_time, end_time) assert_equal Float::MAX, exphdps[0].sum assert_equal 3, exphdps[0].count @@ -228,10 +227,10 @@ ) [1, 3, 5, 7, 9].each do |value| - expbh.update(value, {}, data_points) + expbh.update(value, {}) end - exphdps = expbh.collect(start_time, end_time, data_points) + exphdps = expbh.collect(start_time, end_time) assert_equal 1, exphdps[0].min assert_equal 9, exphdps[0].max @@ -243,10 +242,10 @@ ) [-1, -3, -5, -7, -9].each do |value| - expbh.update(value, {}, data_points) + expbh.update(value, {}) end - exphdps = expbh.collect(start_time, end_time, data_points) + exphdps = expbh.collect(start_time, end_time) assert_equal(-9, exphdps[0].min) assert_equal(-1, exphdps[0].max) diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/last_value_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/last_value_test.rb index 6e606d7462..0a105f7e11 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/last_value_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/last_value_test.rb @@ -7,7 +7,6 @@ require 'test_helper' describe OpenTelemetry::SDK::Metrics::Aggregation::LastValue do - let(:data_points) { {} } let(:last_value_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new } # Time in nano @@ -15,22 +14,22 @@ let(:end_time) { ((Time.now + 60).to_r * 1_000_000_000).to_i } it 'sets the timestamps' do - last_value_aggregation.update(0, {}, data_points) - ndp = last_value_aggregation.collect(start_time, end_time, data_points)[0] + last_value_aggregation.update(0, {}) + ndp = last_value_aggregation.collect(start_time, end_time)[0] _(ndp.start_time_unix_nano).must_equal(start_time) _(ndp.time_unix_nano).must_equal(end_time) end it 'aggregates and collects should collect the last value' do - last_value_aggregation.update(1, {}, data_points) - last_value_aggregation.update(2, {}, data_points) + last_value_aggregation.update(1, {}) + last_value_aggregation.update(2, {}) - last_value_aggregation.update(2, { 'foo' => 'bar' }, data_points) - last_value_aggregation.update(2, { 'foo' => 'bar' }, data_points) + last_value_aggregation.update(2, { 'foo' => 'bar' }) + last_value_aggregation.update(2, { 'foo' => 'bar' }) - ndps = last_value_aggregation.collect(start_time, end_time, data_points) + ndps = last_value_aggregation.collect(start_time, end_time) _(ndps[0].value).must_equal(2) - _(ndps[0].attributes).must_equal({}, data_points) + _(ndps[0].attributes).must_equal({}) _(ndps[1].value).must_equal(2) _(ndps[1].attributes).must_equal('foo' => 'bar') diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/sum_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/sum_test.rb index 870f7933bb..8081de40cd 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/sum_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/sum_test.rb @@ -7,7 +7,6 @@ require 'test_helper' describe OpenTelemetry::SDK::Metrics::Aggregation::Sum do - let(:data_points) { {} } let(:sum_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Sum.new(aggregation_temporality:, monotonic:) } let(:aggregation_temporality) { :delta } let(:monotonic) { false } @@ -84,46 +83,46 @@ end it 'sets the timestamps' do - sum_aggregation.update(0, {}, data_points) - ndp = sum_aggregation.collect(start_time, end_time, data_points)[0] + sum_aggregation.update(0, {}) + ndp = sum_aggregation.collect(start_time, end_time)[0] _(ndp.start_time_unix_nano).must_equal(start_time) _(ndp.time_unix_nano).must_equal(end_time) end it 'aggregates and collects' do - sum_aggregation.update(1, {}, data_points) - sum_aggregation.update(2, {}, data_points) + sum_aggregation.update(1, {}) + sum_aggregation.update(2, {}) - sum_aggregation.update(2, { 'foo' => 'bar' }, data_points) - sum_aggregation.update(2, { 'foo' => 'bar' }, data_points) + sum_aggregation.update(2, { 'foo' => 'bar' }) + sum_aggregation.update(2, { 'foo' => 'bar' }) - ndps = sum_aggregation.collect(start_time, end_time, data_points) + ndps = sum_aggregation.collect(start_time, end_time) _(ndps[0].value).must_equal(3) - _(ndps[0].attributes).must_equal({}, data_points) + _(ndps[0].attributes).must_equal({}) _(ndps[1].value).must_equal(4) _(ndps[1].attributes).must_equal('foo' => 'bar') end it 'aggregates and collects negative values' do - sum_aggregation.update(1, {}, data_points) - sum_aggregation.update(-2, {}, data_points) + sum_aggregation.update(1, {}) + sum_aggregation.update(-2, {}) - ndps = sum_aggregation.collect(start_time, end_time, data_points) + ndps = sum_aggregation.collect(start_time, end_time) _(ndps[0].value).must_equal(-1) end it 'does not aggregate between collects' do - sum_aggregation.update(1, {}, data_points) - sum_aggregation.update(2, {}, data_points) - ndps = sum_aggregation.collect(start_time, end_time, data_points) + sum_aggregation.update(1, {}) + sum_aggregation.update(2, {}) + ndps = sum_aggregation.collect(start_time, end_time) - sum_aggregation.update(1, {}, data_points) + sum_aggregation.update(1, {}) # Assert that the recent update does not # impact the already collected metrics _(ndps[0].value).must_equal(3) - ndps = sum_aggregation.collect(start_time, end_time, data_points) + ndps = sum_aggregation.collect(start_time, end_time) # Assert that we are not accumulating values # between calls to collect _(ndps[0].value).must_equal(1) @@ -133,16 +132,16 @@ let(:aggregation_temporality) { :not_delta } it 'allows metrics to accumulate' do - sum_aggregation.update(1, {}, data_points) - sum_aggregation.update(2, {}, data_points) - ndps = sum_aggregation.collect(start_time, end_time, data_points) + sum_aggregation.update(1, {}) + sum_aggregation.update(2, {}) + ndps = sum_aggregation.collect(start_time, end_time) - sum_aggregation.update(1, {}, data_points) + sum_aggregation.update(1, {}) # Assert that the recent update does not # impact the already collected metrics _(ndps[0].value).must_equal(3) - ndps = sum_aggregation.collect(start_time, end_time, data_points) + ndps = sum_aggregation.collect(start_time, end_time) # Assert that we are accumulating values # and not just capturing the delta since # the previous collect call @@ -155,9 +154,9 @@ let(:monotonic) { true } it 'does not allow negative values to accumulate' do - sum_aggregation.update(1, {}, data_points) - sum_aggregation.update(-2, {}, data_points) - ndps = sum_aggregation.collect(start_time, end_time, data_points) + sum_aggregation.update(1, {}) + sum_aggregation.update(-2, {}) + ndps = sum_aggregation.collect(start_time, end_time) _(ndps[0].value).must_equal(1) end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/state/asynchronous_metric_stream_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/state/asynchronous_metric_stream_test.rb new file mode 100644 index 0000000000..1eae0df016 --- /dev/null +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/state/asynchronous_metric_stream_test.rb @@ -0,0 +1,319 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +describe OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream do + let(:meter_provider) { OpenTelemetry::SDK::Metrics::MeterProvider.new } + let(:instrumentation_scope) { OpenTelemetry::SDK::InstrumentationScope.new('test_scope', '1.0.0') } + let(:aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Sum.new } + let(:callback) { [proc { 42 }] } + let(:timeout) { 10 } + let(:attributes) { { 'environment' => 'test' } } + let(:async_metric_stream) do + OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', + 'An async counter', + 'count', + :observable_counter, + meter_provider, + instrumentation_scope, + aggregation, + callback, + timeout, + attributes + ) + end + + describe '#initialize' do + it 'initializes with provided parameters and async-specific attributes' do + _(async_metric_stream.name).must_equal('async_counter') + _(async_metric_stream.description).must_equal('An async counter') + _(async_metric_stream.unit).must_equal('count') + _(async_metric_stream.instrument_kind).must_equal(:observable_counter) + _(async_metric_stream.instrumentation_scope).must_equal(instrumentation_scope) + + # Verify async-specific attributes + _(async_metric_stream.instance_variable_get(:@callback)).must_equal(callback) + _(async_metric_stream.instance_variable_get(:@timeout)).must_equal(timeout) + _(async_metric_stream.instance_variable_get(:@start_time)).must_be_instance_of(Integer) + _(async_metric_stream.instance_variable_get(:@start_time)).must_be :>, 0 + end + + it 'finds and registers matching views during initialization' do + view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'async_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new + ) + meter_provider.instance_variable_get(:@registered_views) << view + + stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', + 'An async counter', + 'count', + :observable_counter, + meter_provider, + instrumentation_scope, + aggregation, + callback, + timeout, + attributes + ) + + registered_views = stream.instance_variable_get(:@registered_views) + _(registered_views.size).must_equal(1) + _(registered_views.first.aggregation.class).must_equal ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue + end + end + + describe '#collect' do + it 'invokes callback and handles various collection scenarios' do + # Test basic collection with callback value and attributes + metric_data_array = async_metric_stream.collect(0, 1000) + _(metric_data_array).must_be_instance_of(Array) + _(metric_data_array.size).must_equal(1) + + metric_data = metric_data_array.first + _(metric_data).must_be_instance_of(OpenTelemetry::SDK::Metrics::State::MetricData) + _(metric_data.name).must_equal('async_counter') + _(metric_data.start_time_unix_nano).must_equal(0) + _(metric_data.time_unix_nano).must_equal(1000) + _(metric_data.data_points.first.value).must_equal(42) + _(metric_data.data_points.first.attributes).must_equal(attributes) + + # Test empty collection when callback returns nil + aggregation = OpenTelemetry::SDK::Metrics::Aggregation::Sum.new + empty_callback = [proc { nil }] + empty_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + empty_callback, timeout, {} + ) + + _(empty_stream.collect(0, 1000)).must_be_empty + + # Test multiple callbacks accumulation + aggregation = OpenTelemetry::SDK::Metrics::Aggregation::Sum.new + multi_callbacks = [proc { 10 }, proc { 20 }, proc { 30 }] + multi_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + multi_callbacks, timeout, attributes + ) + multi_result = multi_stream.collect(0, 1000) + _(multi_result.first.data_points.first.value).must_equal(60) # 10 + 20 + 30 + end + + it 'handles multiple registered views with attribute merging' do + view1 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'async_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Sum.new + ) + view2 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'async_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new, + attribute_keys: { 'environment' => 'production', 'service' => 'metrics' } + ) + + meter_provider.instance_variable_get(:@registered_views) << view1 + meter_provider.instance_variable_get(:@registered_views) << view2 + + stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + callback, timeout, { 'original' => 'value' } + ) + + metric_data_array = stream.collect(0, 1000) + _(metric_data_array.size).must_equal(2) + + # Verify view with attribute merging + view_with_attrs = metric_data_array.find { |md| md.data_points.first.attributes.key?('service') } + _(view_with_attrs).wont_be_nil + attrs = view_with_attrs.data_points.first.attributes + _(attrs['environment']).must_equal('production') + _(attrs['service']).must_equal('metrics') + _(attrs['original']).must_equal('value') + end + + it 'handles callback exceptions' do + error_callback = [proc { raise StandardError, 'Callback error' }] + error_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + error_callback, timeout, attributes + ) + + # Capture the logged output + original_logger = OpenTelemetry.logger + log_output = StringIO.new + OpenTelemetry.logger = Logger.new(log_output) + error_stream.collect(0, 1000) + assert_includes log_output.string, 'Error invoking callback: Callback error' + OpenTelemetry.logger = original_logger + end + end + + describe '#invoke_callback' do + it 'executes multiple callbacks in array' do + multi_callbacks = [ + proc { 10 }, + proc { 20 }, + proc { 30 } + ] + multi_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + multi_callbacks, timeout, attributes + ) + metric_data = multi_stream.collect(0, 10_000) + + _(metric_data.first.data_points.first.value).must_equal 60 + _(metric_data.first.data_points.first.attributes['environment']).must_equal 'test' + _(metric_data.first.start_time_unix_nano).must_equal 0 + _(metric_data.first.time_unix_nano).must_equal 10_000 + end + + it 'executes callbacks that handles thread safety' do + thread_count = 0 + thread_callback = [proc { + thread_count += 1 + 42 + }] + thread_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + thread_callback, timeout, attributes + ) + + metric_data = nil + threads = Array.new(5) do + Thread.new { metric_data = thread_stream.collect(0, 10_000) } + end + threads.each(&:join) + + _(thread_count).must_equal(5) + _(metric_data.first.data_points.first.value).must_equal 210 + _(metric_data.first.data_points.first.attributes['environment']).must_equal 'test' + _(metric_data.first.start_time_unix_nano).must_equal 0 + _(metric_data.first.time_unix_nano).must_equal 10_000 + end + + it 'respects timeout settings and handles slow callbacks' do + # Test timeout handling + slow_callback = [proc { + sleep(0.1) + 42 + }] + stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + slow_callback, 0.05, attributes # Very short timeout + ) + + original_logger = OpenTelemetry.logger + log_output = StringIO.new + OpenTelemetry.logger = Logger.new(log_output) + stream.invoke_callback(0.05, attributes) + + sleep 0.2 + + assert_includes log_output.string, 'Timeout while invoking callback' + OpenTelemetry.logger = original_logger + end + end + + describe '#now_in_nano' do + it 'returns current time in nanoseconds with increasing values' do + nano_time = async_metric_stream.now_in_nano + _(nano_time).must_be_instance_of(Integer) + _(nano_time).must_be :>, 0 + + # Should be a reasonable timestamp (not too old, not in future) + current_time_nano = (Time.now.to_r * 1_000_000_000).to_i + _(nano_time).must_be_close_to(current_time_nano, 1_000_000_000) # Within 1 second + + # Test successive calls return increasing values + sleep(0.001) # Small delay + time2 = async_metric_stream.now_in_nano + _(time2).must_be :>, nano_time + end + end + + describe 'aggregation and view integration' do + it 'supports different aggregation types and accumulation' do + # Test Sum aggregation accumulation + callback_value = 100 + callback_proc = [proc { callback_value }] + stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + callback_proc, timeout, attributes + ) + + stream.collect(0, 1000) + metric_data = stream.collect(1000, 2000) + _(metric_data.first.data_points.first.value).must_equal 200 + + # Test LastValue aggregation + last_value_aggregation = OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new + stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_gauge', 'description', 'units', :observable_gauge, + meter_provider, instrumentation_scope, last_value_aggregation, + callback_proc, timeout, attributes + ) + + # Calling it twice but last value should preserve last one instead of sum + stream.collect(0, 1000) + metric_data = stream.collect(0, 1000) + _(metric_data.first.data_points.first.value).must_equal 100 + end + + it 'handles view filtering and drop aggregation' do + # Test view filtering by instrument name (non-matching) + non_matching_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'different_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new + ) + + # Test view filtering by instrument type (matching) + type_matching_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + nil, type: :observable_counter, + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new + ) + + meter_provider.instance_variable_get(:@registered_views) << non_matching_view + meter_provider.instance_variable_get(:@registered_views) << type_matching_view + + stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + callback, timeout, attributes + ) + + metric_data = stream.collect(0, 1000) + _(metric_data.size).must_equal(1) # Should match type-based view + + # Test Drop aggregation + drop_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'async_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Drop.new + ) + meter_provider.instance_variable_get(:@registered_views).clear + meter_provider.instance_variable_get(:@registered_views) << drop_view + + drop_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + callback, timeout, attributes + ) + + dropped_data = drop_stream.collect(0, 1000) + _(dropped_data.size).must_equal(1) + _(dropped_data.first.data_points.first.value).must_equal(0) # Dropped value + end + end +end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_store_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_store_test.rb index 01cd0fcf9e..99c3258ecd 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_store_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_store_test.rb @@ -7,9 +7,143 @@ require 'test_helper' describe OpenTelemetry::SDK::Metrics::State::MetricStore do - describe '#collect' do + let(:metric_store) { OpenTelemetry::SDK::Metrics::State::MetricStore.new } + let(:meter_provider) { OpenTelemetry::SDK::Metrics::MeterProvider.new } + let(:instrumentation_scope) { OpenTelemetry::SDK::InstrumentationScope.new('test_scope', '1.0.0') } + let(:aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Sum.new } + + describe '#initialize' do + it 'initializes with empty metric streams' do + store = OpenTelemetry::SDK::Metrics::State::MetricStore.new + _(store).must_be_instance_of(OpenTelemetry::SDK::Metrics::State::MetricStore) + end end - describe '#add_metric_stream' do + describe '#collect' do + it 'returns empty array when no metric streams are added' do + snapshot = metric_store.collect + _(snapshot).must_be_instance_of(Array) + _(snapshot).must_be_empty + end + + it 'collects data from added metric streams' do + metric_stream = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter', + 'A test counter', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + # Add some data to the metric stream + metric_store.add_metric_stream(metric_stream) + metric_stream.update(10, {}) + + snapshot = metric_store.collect + _(snapshot).must_be_instance_of(Array) + _(snapshot.size).must_equal(1) + _(snapshot.first).must_be_instance_of(OpenTelemetry::SDK::Metrics::State::MetricData) + _(snapshot.first.name).must_equal('test_counter') + end + + it 'collects data from multiple metric streams' do + metric_stream1 = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter1', + 'A test counter 1', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + metric_stream2 = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter2', + 'A test counter 2', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + metric_store.add_metric_stream(metric_stream1) + metric_store.add_metric_stream(metric_stream2) + + metric_stream1.update(10, {}) + metric_stream2.update(20, {}) + + snapshot = metric_store.collect + _(snapshot.size).must_equal(2) + names = snapshot.map(&:name) + _(names).must_include('test_counter1') + _(names).must_include('test_counter2') + end + + it 'updates epoch times on each collection' do + metric_stream = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter', + 'A test counter', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + metric_store.add_metric_stream(metric_stream) + + # First collection + metric_stream.update(10, {}) + snapshot1 = metric_store.collect + end_time1 = snapshot1.first.time_unix_nano + + sleep(0.001) # Small delay to ensure different timestamps + + # Second collection + metric_stream.update(10, {}) + snapshot2 = metric_store.collect + start_time2 = snapshot2.first.start_time_unix_nano + end_time2 = snapshot2.first.time_unix_nano + + _(start_time2).must_equal(end_time1) + _(end_time2).must_be :>, end_time1 + end + + it 'is thread-safe when adding metric streams' do + # Create metric streams in multiple threads + threads = Array.new(10) do |i| + Thread.new do + metric_stream = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + "counter_#{i}", + "Counter #{i}", + 'count', + :counter, + meter_provider, + instrumentation_scope, + OpenTelemetry::SDK::Metrics::Aggregation::Sum.new + ) + metric_store.add_metric_stream(metric_stream) + metric_stream.update(i, {}) + end + end + + threads.each(&:join) + + snapshot = metric_store.collect + # this test case is unstable as it involve thread in minitest + skip if snapshot.size != 10 + _(snapshot.size).must_equal(10) + + names = snapshot.map(&:name).sort + expected_names = (0..9).map { |i| "counter_#{i}" }.sort + _(names).must_equal(expected_names) + + attribute_value = snapshot.flat_map { |i| i.data_points.first.value } + attribute_value.sort! + _(attribute_value).must_equal([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + end end end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_stream_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_stream_test.rb index 8a0084101d..3d1d596a56 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_stream_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_stream_test.rb @@ -7,6 +7,267 @@ require 'test_helper' describe OpenTelemetry::SDK::Metrics::State::MetricStream do + let(:meter_provider) { OpenTelemetry::SDK::Metrics::MeterProvider.new } + let(:instrumentation_scope) { OpenTelemetry::SDK::InstrumentationScope.new('test_scope', '1.0.0') } + let(:aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Sum.new } + let(:metric_stream) do + OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter', + 'A test counter', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + end + + describe '#initialize' do + it 'initializes with provided parameters' do + _(metric_stream.name).must_equal('test_counter') + _(metric_stream.description).must_equal('A test counter') + _(metric_stream.unit).must_equal('count') + _(metric_stream.instrument_kind).must_equal(:counter) + _(metric_stream.instrumentation_scope).must_equal(instrumentation_scope) + end + + it 'initializes registered views from meter provider' do + view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'test_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new + ) + meter_provider.instance_variable_get(:@registered_views) << view + + stream = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter', + 'A test counter', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + registered_views = stream.instance_variable_get(:@registered_views) + _(registered_views.size).must_equal(1) + _(registered_views.first).must_equal(view) + end + end + describe '#update' do + it 'updates aggregation with various value and attribute combinations' do + # Test updates with different attributes (should create separate data points) + metric_stream.update(10, { 'key' => 'value' }) + metric_stream.update(20, { 'same_key' => 'same_value' }) + metric_stream.update(30, { 'same_key' => 'same_value' }) # Accumulated value + metric_stream.update(5, { 'key1' => 'value1' }) + metric_stream.update(8, { 'key2' => 'value2' }) + + snapshot = metric_stream.collect(0, 1000) + _(snapshot.size).must_equal(1) + + # Verify data points for different attribute combinations + data_points = snapshot.first.data_points + _(data_points.size).must_be :>=, 3 # At least 3 different attribute combinations + + # Verify accumulated value for same_key attributes + same_key_point = data_points.find { |dp| dp.attributes['same_key'] == 'same_value' } + _(same_key_point).wont_be_nil + _(same_key_point.value).must_equal(50) # 20 + 30 = 50 + + # Verify individual attribute combinations + key1_point = data_points.find { |dp| dp.attributes['key1'] == 'value1' } + key2_point = data_points.find { |dp| dp.attributes['key2'] == 'value2' } + _(key1_point).wont_be_nil + _(key2_point).wont_be_nil + _(key1_point.value).must_equal(5) + _(key2_point.value).must_equal(8) + end + + it 'handles registered views with attribute merging' do + view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'test_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new, + attribute_keys: { 'environment' => 'test' } + ) + meter_provider.instance_variable_get(:@registered_views) << view + + stream = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter', + 'A test counter', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + stream.update(10, { 'original' => 'value' }) + stream.update(20, { 'original' => 'value' }) + + snapshot = stream.collect(0, 1000) + _(snapshot.size).must_equal(1) + + # Check that attributes were merged + attributes = snapshot.first.data_points.first.attributes + _(attributes['environment']).must_equal('test') + _(attributes['original']).must_equal('value') + + value = snapshot.first.data_points.first.value + _(value).must_equal 20 + end + + it 'is thread-safe' do + threads = Array.new(10) do |i| + Thread.new do + 10.times { metric_stream.update(1, { 'thread' => i.to_s }) } + end + end + threads.each(&:join) + snapshot = metric_stream.collect(0, 1000) + + _(snapshot.size).must_equal(1) + + # this test case is unstable as it involve thread in minitest + skip if snapshot.first.data_points.size != 10 + + sleep 0.2 + + 10.times.each do |i| + _(snapshot.first.data_points[i].value).must_equal 10 + end + + attribute_value = snapshot.first.data_points.flat_map { |i| i.attributes['thread'].to_i } + attribute_value.sort! + _(attribute_value).must_equal([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + end + end + + describe '#collect' do + it 'returns empty array when no data points' do + snapshot = metric_stream.collect(0, 1000) + _(snapshot).must_be_instance_of(Array) + _(snapshot).must_be_empty + end + + it 'returns metric data when data points exist' do + metric_stream.update(10, { 'key' => 'value' }) + snapshot = metric_stream.collect(0, 1000) + + _(snapshot.size).must_equal(1) + metric_data = snapshot.first + _(metric_data).must_be_instance_of(OpenTelemetry::SDK::Metrics::State::MetricData) + _(metric_data.name).must_equal('test_counter') + _(metric_data.description).must_equal('A test counter') + _(metric_data.unit).must_equal('count') + _(metric_data.instrument_kind).must_equal(:counter) + end + + it 'handles multiple registered views' do + view1 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'test_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Sum.new + ) + view2 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'test_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new + ) + + meter_provider.instance_variable_get(:@registered_views) << view1 + meter_provider.instance_variable_get(:@registered_views) << view2 + + stream = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter', + 'A test counter', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + stream.update(10, {}) + snapshot = stream.collect(0, 1000) + + # Should have one metric data per view + _(snapshot.size).must_equal(2) + end + + it 'passes correct timestamps to metric data' do + metric_stream.update(10, {}) + start_time = 1000 + end_time = 2000 + + snapshot = metric_stream.collect(start_time, end_time) + metric_data = snapshot.first + + _(metric_data.start_time_unix_nano).must_equal(start_time) + _(metric_data.time_unix_nano).must_equal(end_time) + end + end + + describe '#aggregate_metric_data' do + it 'creates metric data with default aggregation' do + metric_stream.update(10, {}) + metric_stream.update(20, {}) + metric_data = metric_stream.aggregate_metric_data(0, 1000) + + _(metric_data).must_be_instance_of(OpenTelemetry::SDK::Metrics::State::MetricData) + _(metric_data.name).must_equal('test_counter') + _(metric_data.data_points.first.value).must_equal 30 + end + + it 'creates metric data with custom aggregation' do + # This test case is not relevant in this context. + # The instrument is already updated using the default aggregation, so the custom aggregation will not impact the collection process. + # The aggregation parameter in aggregate_metric_data(start_time, end_time, aggregation: nil) is intended + end + end + + describe '#find_registered_view' do + it 'only find matching views by name' do + view1 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'test_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new + ) + + view2 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'other_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Drop.new + ) + + meter_provider.instance_variable_get(:@registered_views) << view1 + meter_provider.instance_variable_get(:@registered_views) << view2 + + stream = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter', + 'A test counter', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + registered_views = stream.instance_variable_get(:@registered_views) + + _(registered_views.size).must_equal 1 + _(registered_views[0].aggregation.class).must_equal ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue + end + end + + describe '#to_s' do + it 'string representation' do + metric_stream.update(10, {}) + metric_stream.update(20, {}) + str = metric_stream.to_s + + _(str).must_include('test_counter') + _(str).must_include('A test counter') + _(str).must_include('count') + _(str).must_include('counter') + _(str).must_include('test_scope@1.0.0') + _(str).must_include('Aggregation::Sum') + end end end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/view/registered_view_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/view/registered_view_test.rb index 8a65818024..ee16db4cbd 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/view/registered_view_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/view/registered_view_test.rb @@ -91,7 +91,15 @@ end describe '#registered_view with asynchronous counters' do - before { reset_metrics_sdk } + before do + reset_metrics_sdk + @original = ENV['OTEL_METRICS_EXPORTER'] + ENV['OTEL_METRICS_EXPORTER'] = 'none' + end + + after do + ENV['OTEL_METRICS_EXPORTER'] = @original + end it 'emits asynchronous counter metrics with no data_points if view is drop' do OpenTelemetry::SDK.configure @@ -181,7 +189,9 @@ OpenTelemetry.meter_provider.add_view('async_counter', aggregation: ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new) callback = proc { 25 } - meter.create_observable_counter('async_counter', unit: 'smidgen', description: 'an async counter', callback: callback) + asynch_counter = meter.create_observable_counter('async_counter', unit: 'smidgen', description: 'an async counter', callback: callback) + asynch_counter.observe + asynch_counter.observe metric_exporter.pull last_snapshot = metric_exporter.metric_snapshots @@ -197,6 +207,9 @@ _(snapshot.instrumentation_scope.name).must_equal('test') _(snapshot.data_points).wont_be_empty end + + _(last_snapshot[0].data_points.first.value).must_equal 75 # view aggregation sum + _(last_snapshot[1].data_points.first.value).must_equal 25 # view aggregation last value end it 'emits asynchronous counter metrics with view attribute filtering' do