diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb index 7f4752695f..7b48524cdb 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb @@ -13,6 +13,8 @@ module State # The AsynchronousMetricStream class provides SDK internal functionality that is not a part of the # public API. It extends MetricStream to support asynchronous instruments. class AsynchronousMetricStream < MetricStream + DEFAULT_TIMEOUT = 30 + def initialize( name, description, @@ -48,28 +50,49 @@ def collect(start_time, end_time) def invoke_callback(timeout, attributes) if @registered_views.empty? @mutex.synchronize do - Timeout.timeout(timeout || 30) do - @callback.each do |cb| - value = cb.call - @default_aggregation.update(value, attributes, @data_points) - end + @callback.each do |cb| + value = safe_guard_callback(cb, timeout: timeout) + @default_aggregation.update(value, attributes, @data_points) if value.is_a?(Numeric) end end else @registered_views.each do |view| @mutex.synchronize do - Timeout.timeout(timeout || 30) do - @callback.each do |cb| - value = cb.call - merged_attributes = attributes || {} - merged_attributes.merge!(view.attribute_keys) - view.aggregation.update(value, merged_attributes, @data_points) if view.valid_aggregation? - end + @callback.each do |cb| + value = safe_guard_callback(cb, timeout: timeout) + next unless value.is_a?(Numeric) # ignore if value is not valid number + + merged_attributes = attributes || {} + merged_attributes.merge!(view.attribute_keys) + view.aggregation.update(value, merged_attributes, @data_points) if view.valid_aggregation? end end end end end + + private + + def safe_guard_callback(callback, timeout: DEFAULT_TIMEOUT) + result = nil + thread = Thread.new do + result = callback.call + rescue StandardError => e + OpenTelemetry.logger.error("Error invoking callback: #{e.message}") + result = :error + end + + unless thread.join(timeout) + thread.kill + OpenTelemetry.logger.error("Timeout while invoking callback after #{timeout} seconds") + return nil + end + + result == :error ? nil : result + rescue StandardError => e + OpenTelemetry.logger.error("Unexpected error in callback execution: #{e.message}") + nil + end end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb index 3fa3bd684c..1c7e9a177a 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb @@ -55,6 +55,7 @@ def collect(start_time, end_time) end end + # view will modify the data_point that is not suitable when there are multiple views def update(value, attributes) if @registered_views.empty? @mutex.synchronize { @default_aggregation.update(value, attributes, @data_points) } diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/state/asynchronous_metric_stream_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/state/asynchronous_metric_stream_test.rb new file mode 100644 index 0000000000..3435752f08 --- /dev/null +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/state/asynchronous_metric_stream_test.rb @@ -0,0 +1,274 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +describe OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream do + let(:meter_provider) { OpenTelemetry::SDK::Metrics::MeterProvider.new } + let(:instrumentation_scope) { OpenTelemetry::SDK::InstrumentationScope.new('test_scope', '1.0.0') } + let(:aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Sum.new } + let(:callback) { [proc { 42 }] } + let(:timeout) { 10 } + let(:attributes) { { 'environment' => 'test' } } + let(:async_metric_stream) do + OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', + 'An async counter', + 'count', + :observable_counter, + meter_provider, + instrumentation_scope, + aggregation, + callback, + timeout, + attributes + ) + end + + describe '#initialize' do + it 'initializes with provided parameters and async-specific attributes' do + _(async_metric_stream.name).must_equal('async_counter') + _(async_metric_stream.description).must_equal('An async counter') + _(async_metric_stream.unit).must_equal('count') + _(async_metric_stream.instrument_kind).must_equal(:observable_counter) + _(async_metric_stream.instrumentation_scope).must_equal(instrumentation_scope) + _(async_metric_stream.data_points).must_be_instance_of(Hash) + _(async_metric_stream.data_points).must_be_empty + + # Verify async-specific attributes + _(async_metric_stream.instance_variable_get(:@callback)).must_equal(callback) + _(async_metric_stream.instance_variable_get(:@timeout)).must_equal(timeout) + _(async_metric_stream.instance_variable_get(:@start_time)).must_be_instance_of(Integer) + _(async_metric_stream.instance_variable_get(:@start_time)).must_be :>, 0 + end + + it 'finds and registers matching views during initialization' do + view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'async_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new + ) + meter_provider.instance_variable_get(:@registered_views) << view + + stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', + 'An async counter', + 'count', + :observable_counter, + meter_provider, + instrumentation_scope, + aggregation, + callback, + timeout, + attributes + ) + + registered_views = stream.instance_variable_get(:@registered_views) + _(registered_views.size).must_equal(1) + _(registered_views.first.aggregation.class).must_equal ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue + end + end + + describe '#collect' do + it 'invokes callback and handles various collection scenarios' do + # Test basic collection with callback value and attributes + metric_data_array = async_metric_stream.collect(0, 1000) + _(metric_data_array).must_be_instance_of(Array) + _(metric_data_array.size).must_equal(1) + + metric_data = metric_data_array.first + _(metric_data).must_be_instance_of(OpenTelemetry::SDK::Metrics::State::MetricData) + _(metric_data.name).must_equal('async_counter') + _(metric_data.start_time_unix_nano).must_equal(0) + _(metric_data.time_unix_nano).must_equal(1000) + _(metric_data.data_points.first.value).must_equal(42) + _(metric_data.data_points.first.attributes).must_equal(attributes) + + # Test empty collection when callback returns nil + empty_callback = [proc { nil }] + empty_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + empty_callback, timeout, {} + ) + _(empty_stream.collect(0, 1000)).must_be_empty + + # Test multiple callbacks accumulation + multi_callbacks = [proc { 10 }, proc { 20 }, proc { 30 }] + multi_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + multi_callbacks, timeout, attributes + ) + multi_result = multi_stream.collect(0, 1000) + _(multi_result.first.data_points.first.value).must_equal(60) # 10 + 20 + 30 + end + + it 'handles multiple registered views with attribute merging' do + view1 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'async_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Sum.new + ) + view2 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'async_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new, + attribute_keys: { 'environment' => 'production', 'service' => 'metrics' } + ) + + meter_provider.instance_variable_get(:@registered_views) << view1 + meter_provider.instance_variable_get(:@registered_views) << view2 + + stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + callback, timeout, { 'original' => 'value' } + ) + + metric_data_array = stream.collect(0, 1000) + _(metric_data_array.size).must_equal(2) + + # Verify view with attribute merging + view_with_attrs = metric_data_array.find { |md| md.data_points.first.attributes.key?('service') } + _(view_with_attrs).wont_be_nil + attrs = view_with_attrs.data_points.first.attributes + _(attrs['environment']).must_equal('production') + _(attrs['service']).must_equal('metrics') + _(attrs['original']).must_equal('value') + end + + it 'handles callback exceptions' do + error_callback = [proc { raise StandardError, 'Callback error' }] + error_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + error_callback, timeout, attributes + ) + + # Capture the logged output + original_logger = OpenTelemetry.logger + log_output = StringIO.new + OpenTelemetry.logger = Logger.new(log_output) + error_stream.collect(0, 1000) + assert_includes log_output.string, 'Error invoking callback: Callback error' + OpenTelemetry.logger = original_logger + end + end + + describe '#invoke_callback' do + it 'executes callbacks with timeout and handles thread safety with multiple callback' do + # Test multiple callbacks in array + multi_callbacks = [ + proc { 10 }, + proc { 20 }, + proc { 30 } + ] + multi_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + multi_callbacks, timeout, attributes + ) + multi_stream.invoke_callback(timeout, attributes) + + # Test thread safety + thread_count = 0 + thread_callback = [proc { + thread_count += 1 + 42 + }] + thread_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + thread_callback, timeout, attributes + ) + + metric_data = nil + threads = Array.new(5) do + # Thread.new { thread_stream.invoke_callback(timeout, attributes) } + Thread.new { metric_data = thread_stream.collect(0, 10_000) } + end + threads.each(&:join) + + _(thread_count).must_equal(5) + _(metric_data.first.data_points.first.value).must_equal 210 + _(metric_data.first.data_points.first.attributes['environment']).must_equal 'test' + _(metric_data.first.start_time_unix_nano).must_equal 0 + _(metric_data.first.time_unix_nano).must_equal 10_000 + end + end + + describe 'aggregation and view integration' do + it 'supports different aggregation types and accumulation' do + # Test Sum aggregation accumulation + callback_value = 100 + callback_proc = [proc { callback_value }] + stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + callback_proc, timeout, attributes + ) + + stream.collect(0, 1000) + metric_data = stream.collect(1000, 2000) + _(metric_data.first.data_points.first.value).must_equal 200 + + # Test LastValue aggregation + last_value_aggregation = OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new + stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_gauge', 'description', 'units', :observable_gauge, + meter_provider, instrumentation_scope, last_value_aggregation, + callback_proc, timeout, attributes + ) + + # Calling it twice but last value should preserve last one instead of sum + stream.collect(0, 1000) + metric_data = stream.collect(0, 1000) + _(metric_data.first.data_points.first.value).must_equal 100 + end + + it 'handles view filtering and drop aggregation' do + # Test view filtering by instrument name (non-matching) + non_matching_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'different_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new + ) + + # Test view filtering by instrument type (matching) + type_matching_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + nil, type: :observable_counter, + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new + ) + + meter_provider.instance_variable_get(:@registered_views) << non_matching_view + meter_provider.instance_variable_get(:@registered_views) << type_matching_view + + stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + callback, timeout, attributes + ) + + metric_data = stream.collect(0, 1000) + _(metric_data.size).must_equal(1) # Should match type-based view + + # Test Drop aggregation + drop_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'async_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Drop.new + ) + meter_provider.instance_variable_get(:@registered_views).clear + meter_provider.instance_variable_get(:@registered_views) << drop_view + + drop_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + 'async_counter', 'description', 'unit', :observable_counter, + meter_provider, instrumentation_scope, aggregation, + callback, timeout, attributes + ) + + dropped_data = drop_stream.collect(0, 1000) + _(dropped_data.size).must_equal(1) + _(dropped_data.first.data_points.first.value).must_equal(0) # Dropped value + end + end +end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_store_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_store_test.rb index 01cd0fcf9e..cd25df3db1 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_store_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_store_test.rb @@ -7,9 +7,109 @@ require 'test_helper' describe OpenTelemetry::SDK::Metrics::State::MetricStore do - describe '#collect' do + let(:metric_store) { OpenTelemetry::SDK::Metrics::State::MetricStore.new } + let(:meter_provider) { OpenTelemetry::SDK::Metrics::MeterProvider.new } + let(:instrumentation_scope) { OpenTelemetry::SDK::InstrumentationScope.new('test_scope', '1.0.0') } + let(:aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Sum.new } + + describe '#initialize' do + it 'initializes with empty metric streams' do + store = OpenTelemetry::SDK::Metrics::State::MetricStore.new + _(store).must_be_instance_of(OpenTelemetry::SDK::Metrics::State::MetricStore) + end end - describe '#add_metric_stream' do + describe '#collect' do + it 'returns empty array when no metric streams are added' do + snapshot = metric_store.collect + _(snapshot).must_be_instance_of(Array) + _(snapshot).must_be_empty + end + + it 'collects data from added metric streams' do + metric_stream = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter', + 'A test counter', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + # Add some data to the metric stream + metric_store.add_metric_stream(metric_stream) + metric_stream.update(10, {}) + + snapshot = metric_store.collect + _(snapshot).must_be_instance_of(Array) + _(snapshot.size).must_equal(1) + _(snapshot.first).must_be_instance_of(OpenTelemetry::SDK::Metrics::State::MetricData) + _(snapshot.first.name).must_equal('test_counter') + end + + it 'collects data from multiple metric streams' do + metric_stream1 = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter1', + 'A test counter 1', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + metric_stream2 = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter2', + 'A test counter 2', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + metric_store.add_metric_stream(metric_stream1) + metric_store.add_metric_stream(metric_stream2) + + metric_stream1.update(10, {}) + metric_stream2.update(20, {}) + + snapshot = metric_store.collect + _(snapshot.size).must_equal(2) + names = snapshot.map(&:name) + _(names).must_include('test_counter1') + _(names).must_include('test_counter2') + end + + it 'updates epoch times on each collection' do + metric_stream = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter', + 'A test counter', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + metric_store.add_metric_stream(metric_stream) + + # First collection + metric_stream.update(10, {}) + snapshot1 = metric_store.collect + end_time1 = snapshot1.first.time_unix_nano + + sleep(0.001) # Small delay to ensure different timestamps + + # Second collection + metric_stream.update(10, {}) + snapshot2 = metric_store.collect + start_time2 = snapshot2.first.start_time_unix_nano + end_time2 = snapshot2.first.time_unix_nano + + _(start_time2).must_equal(end_time1) + _(end_time2).must_be :>, end_time1 + end end end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_stream_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_stream_test.rb index 8a0084101d..b7512c74b2 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_stream_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_stream_test.rb @@ -7,6 +7,252 @@ require 'test_helper' describe OpenTelemetry::SDK::Metrics::State::MetricStream do + let(:meter_provider) { OpenTelemetry::SDK::Metrics::MeterProvider.new } + let(:instrumentation_scope) { OpenTelemetry::SDK::InstrumentationScope.new('test_scope', '1.0.0') } + let(:aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Sum.new } + let(:metric_stream) do + OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter', + 'A test counter', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + end + + describe '#initialize' do + it 'initializes with provided parameters' do + _(metric_stream.name).must_equal('test_counter') + _(metric_stream.description).must_equal('A test counter') + _(metric_stream.unit).must_equal('count') + _(metric_stream.instrument_kind).must_equal(:counter) + _(metric_stream.instrumentation_scope).must_equal(instrumentation_scope) + _(metric_stream.data_points).must_be_instance_of(Hash) + _(metric_stream.data_points).must_be_empty + end + + it 'initializes registered views from meter provider' do + view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'test_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new + ) + meter_provider.instance_variable_get(:@registered_views) << view + + stream = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter', + 'A test counter', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + registered_views = stream.instance_variable_get(:@registered_views) + _(registered_views.size).must_equal(1) + _(registered_views.first).must_equal(view) + end + end + describe '#update' do + it 'updates aggregation with various value and attribute combinations' do + # Test updates with different attributes (should create separate data points) + metric_stream.update(10, { 'key' => 'value' }) + metric_stream.update(20, { 'same_key' => 'same_value' }) + metric_stream.update(30, { 'same_key' => 'same_value' }) # Accumulated value + metric_stream.update(5, { 'key1' => 'value1' }) + metric_stream.update(8, { 'key2' => 'value2' }) + + snapshot = metric_stream.collect(0, 1000) + _(snapshot.size).must_equal(1) + + # Verify data points for different attribute combinations + data_points = snapshot.first.data_points + _(data_points.size).must_be :>=, 3 # At least 3 different attribute combinations + + # Verify accumulated value for same_key attributes + same_key_point = data_points.find { |dp| dp.attributes['same_key'] == 'same_value' } + _(same_key_point).wont_be_nil + _(same_key_point.value).must_equal(50) # 20 + 30 = 50 + + # Verify individual attribute combinations + key1_point = data_points.find { |dp| dp.attributes['key1'] == 'value1' } + key2_point = data_points.find { |dp| dp.attributes['key2'] == 'value2' } + _(key1_point).wont_be_nil + _(key2_point).wont_be_nil + _(key1_point.value).must_equal(5) + _(key2_point.value).must_equal(8) + end + + it 'handles registered views with attribute merging' do + view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'test_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new, + attribute_keys: { 'environment' => 'test' } + ) + meter_provider.instance_variable_get(:@registered_views) << view + + stream = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter', + 'A test counter', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + stream.update(10, { 'original' => 'value' }) + stream.update(20, { 'original' => 'value' }) + + snapshot = stream.collect(0, 1000) + _(snapshot.size).must_equal(1) + + # Check that attributes were merged + attributes = snapshot.first.data_points.first.attributes + _(attributes['environment']).must_equal('test') + _(attributes['original']).must_equal('value') + + value = snapshot.first.data_points.first.value + _(value).must_equal 20 + end + end + + describe '#collect' do + it 'returns empty array when no data points' do + snapshot = metric_stream.collect(0, 1000) + _(snapshot).must_be_instance_of(Array) + _(snapshot).must_be_empty + end + + it 'returns metric data when data points exist' do + metric_stream.update(10, { 'key' => 'value' }) + snapshot = metric_stream.collect(0, 1000) + + _(snapshot.size).must_equal(1) + metric_data = snapshot.first + _(metric_data).must_be_instance_of(OpenTelemetry::SDK::Metrics::State::MetricData) + _(metric_data.name).must_equal('test_counter') + _(metric_data.description).must_equal('A test counter') + _(metric_data.unit).must_equal('count') + _(metric_data.instrument_kind).must_equal(:counter) + end + + it 'handles multiple registered views' do + view1 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'test_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Sum.new + ) + view2 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'test_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new + ) + + meter_provider.instance_variable_get(:@registered_views) << view1 + meter_provider.instance_variable_get(:@registered_views) << view2 + + stream = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter', + 'A test counter', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + stream.update(10, {}) + snapshot = stream.collect(0, 1000) + + # Should have one metric data per view + _(snapshot.size).must_equal(2) + end + + it 'passes correct timestamps to metric data' do + metric_stream.update(10, {}) + start_time = 1000 + end_time = 2000 + + snapshot = metric_stream.collect(start_time, end_time) + metric_data = snapshot.first + + _(metric_data.start_time_unix_nano).must_equal(start_time) + _(metric_data.time_unix_nano).must_equal(end_time) + end + end + + describe '#aggregate_metric_data' do + it 'creates metric data with default aggregation' do + metric_stream.update(10, {}) + metric_stream.update(20, {}) + metric_data = metric_stream.aggregate_metric_data(0, 1000) + + _(metric_data).must_be_instance_of(OpenTelemetry::SDK::Metrics::State::MetricData) + _(metric_data.name).must_equal('test_counter') + _(metric_data.data_points.first.value).must_equal 30 + end + + it 'creates metric data with custom aggregation' do + # This test case is not relevant in this context. + # The instrument is already updated using the default aggregation, so the custom aggregation will not impact the collection process. + # The aggregation parameter in aggregate_metric_data(start_time, end_time, aggregation: nil) is intended + end + end + + describe '#find_registered_view' do + it 'only find matching views by name' do + view1 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'test_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new + ) + + view2 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( + 'other_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Drop.new + ) + + meter_provider.instance_variable_get(:@registered_views) << view1 + meter_provider.instance_variable_get(:@registered_views) << view2 + + stream = OpenTelemetry::SDK::Metrics::State::MetricStream.new( + 'test_counter', + 'A test counter', + 'count', + :counter, + meter_provider, + instrumentation_scope, + aggregation + ) + + registered_views = stream.instance_variable_get(:@registered_views) + + _(registered_views.size).must_equal 1 + _(registered_views[0].aggregation.class).must_equal ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue + end + end + + describe '#to_s' do + it 'returns string representation without data points' do + str = metric_stream.to_s + _(str).must_be_instance_of(String) + _(str).must_be_empty # No data points yet + end + + it 'includes data points in string representation' do + metric_stream.update(10, { 'key1' => 'value1' }) + metric_stream.update(20, { 'key2' => 'value2' }) + str = metric_stream.to_s + + _(str).must_include('test_counter') + _(str).must_include('A test counter') + _(str).must_include('count') + _(str).must_include('key') + _(str).must_include('value') + _(str).must_include('key1') + _(str).must_include('key2') + _(str.lines.size).must_be :>=, 2 + end end end