-
Notifications
You must be signed in to change notification settings - Fork 272
fix: add test case for metric_store and metric_view #1894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 17 commits
0099074
b08392b
54acd67
9ed4a75
673ca91
6ce71b6
a2a87c5
6024825
2162041
51a42ed
a4440a5
5114da8
15b5a5e
ddbecd5
e99a5cd
24216fe
edf066d
d4878ea
9b86e3f
a557f2f
0848123
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -13,6 +13,8 @@ module State | |||||||||||||||
# The AsynchronousMetricStream class provides SDK internal functionality that is not a part of the | ||||||||||||||||
# public API. It extends MetricStream to support asynchronous instruments. | ||||||||||||||||
class AsynchronousMetricStream < MetricStream | ||||||||||||||||
DEFAULT_TIMEOUT = 30 | ||||||||||||||||
|
||||||||||||||||
def initialize( | ||||||||||||||||
name, | ||||||||||||||||
description, | ||||||||||||||||
|
@@ -48,28 +50,53 @@ def collect(start_time, end_time) | |||||||||||||||
def invoke_callback(timeout, attributes) | ||||||||||||||||
if @registered_views.empty? | ||||||||||||||||
@mutex.synchronize do | ||||||||||||||||
Timeout.timeout(timeout || 30) do | ||||||||||||||||
@callback.each do |cb| | ||||||||||||||||
value = cb.call | ||||||||||||||||
@default_aggregation.update(value, attributes, @data_points) | ||||||||||||||||
end | ||||||||||||||||
@callback.each do |cb| | ||||||||||||||||
value = safe_guard_callback(cb, timeout: timeout) | ||||||||||||||||
@default_aggregation.update(value, attributes, @data_points) if value.is_a?(Numeric) | ||||||||||||||||
end | ||||||||||||||||
end | ||||||||||||||||
else | ||||||||||||||||
@registered_views.each do |view| | ||||||||||||||||
@mutex.synchronize do | ||||||||||||||||
Timeout.timeout(timeout || 30) do | ||||||||||||||||
kaylareopelle marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||
@callback.each do |cb| | ||||||||||||||||
value = cb.call | ||||||||||||||||
merged_attributes = attributes || {} | ||||||||||||||||
merged_attributes.merge!(view.attribute_keys) | ||||||||||||||||
view.aggregation.update(value, merged_attributes, @data_points) if view.valid_aggregation? | ||||||||||||||||
end | ||||||||||||||||
@callback.each do |cb| | ||||||||||||||||
value = safe_guard_callback(cb, timeout: timeout) | ||||||||||||||||
next unless value.is_a?(Numeric) # ignore if value is not valid number | ||||||||||||||||
|
||||||||||||||||
merged_attributes = attributes || {} | ||||||||||||||||
merged_attributes.merge!(view.attribute_keys) | ||||||||||||||||
view.aggregation.update(value, merged_attributes, @data_points) if view.valid_aggregation? | ||||||||||||||||
end | ||||||||||||||||
end | ||||||||||||||||
end | ||||||||||||||||
end | ||||||||||||||||
end | ||||||||||||||||
|
||||||||||||||||
def now_in_nano | ||||||||||||||||
(Time.now.to_r * 1_000_000_000).to_i | ||||||||||||||||
end | ||||||||||||||||
|
# Converts the provided timestamp to nanosecond integer | |
# | |
# @param timestamp [Time] the timestamp to convert, defaults to Time.now | |
# @return [Integer] | |
def time_in_nanoseconds(timestamp = Time.now) | |
(timestamp.to_r * 1_000_000_000).to_i | |
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,316 @@ | ||
# frozen_string_literal: true | ||
|
||
# Copyright The OpenTelemetry Authors | ||
# | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
require 'test_helper' | ||
|
||
describe OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream do | ||
let(:meter_provider) { OpenTelemetry::SDK::Metrics::MeterProvider.new } | ||
let(:instrumentation_scope) { OpenTelemetry::SDK::InstrumentationScope.new('test_scope', '1.0.0') } | ||
let(:aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Sum.new } | ||
let(:callback) { [proc { 42 }] } | ||
let(:timeout) { 10 } | ||
let(:attributes) { { 'environment' => 'test' } } | ||
let(:async_metric_stream) do | ||
OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( | ||
'async_counter', | ||
'An async counter', | ||
'count', | ||
:observable_counter, | ||
meter_provider, | ||
instrumentation_scope, | ||
aggregation, | ||
callback, | ||
timeout, | ||
attributes | ||
) | ||
end | ||
|
||
describe '#initialize' do | ||
it 'initializes with provided parameters and async-specific attributes' do | ||
_(async_metric_stream.name).must_equal('async_counter') | ||
_(async_metric_stream.description).must_equal('An async counter') | ||
_(async_metric_stream.unit).must_equal('count') | ||
_(async_metric_stream.instrument_kind).must_equal(:observable_counter) | ||
_(async_metric_stream.instrumentation_scope).must_equal(instrumentation_scope) | ||
_(async_metric_stream.data_points).must_be_instance_of(Hash) | ||
_(async_metric_stream.data_points).must_be_empty | ||
|
||
# Verify async-specific attributes | ||
_(async_metric_stream.instance_variable_get(:@callback)).must_equal(callback) | ||
_(async_metric_stream.instance_variable_get(:@timeout)).must_equal(timeout) | ||
_(async_metric_stream.instance_variable_get(:@start_time)).must_be_instance_of(Integer) | ||
_(async_metric_stream.instance_variable_get(:@start_time)).must_be :>, 0 | ||
end | ||
|
||
it 'finds and registers matching views during initialization' do | ||
view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( | ||
'async_counter', | ||
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new | ||
) | ||
meter_provider.instance_variable_get(:@registered_views) << view | ||
|
||
stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( | ||
'async_counter', | ||
'An async counter', | ||
'count', | ||
:observable_counter, | ||
meter_provider, | ||
instrumentation_scope, | ||
aggregation, | ||
callback, | ||
timeout, | ||
attributes | ||
) | ||
|
||
registered_views = stream.instance_variable_get(:@registered_views) | ||
_(registered_views.size).must_equal(1) | ||
_(registered_views.first.aggregation.class).must_equal ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue | ||
end | ||
end | ||
|
||
describe '#collect' do | ||
it 'invokes callback and handles various collection scenarios' do | ||
# Test basic collection with callback value and attributes | ||
metric_data_array = async_metric_stream.collect(0, 1000) | ||
_(metric_data_array).must_be_instance_of(Array) | ||
_(metric_data_array.size).must_equal(1) | ||
|
||
metric_data = metric_data_array.first | ||
_(metric_data).must_be_instance_of(OpenTelemetry::SDK::Metrics::State::MetricData) | ||
_(metric_data.name).must_equal('async_counter') | ||
_(metric_data.start_time_unix_nano).must_equal(0) | ||
_(metric_data.time_unix_nano).must_equal(1000) | ||
_(metric_data.data_points.first.value).must_equal(42) | ||
_(metric_data.data_points.first.attributes).must_equal(attributes) | ||
|
||
# Test empty collection when callback returns nil | ||
empty_callback = [proc { nil }] | ||
empty_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( | ||
'async_counter', 'description', 'unit', :observable_counter, | ||
meter_provider, instrumentation_scope, aggregation, | ||
empty_callback, timeout, {} | ||
) | ||
_(empty_stream.collect(0, 1000)).must_be_empty | ||
|
||
# Test multiple callbacks accumulation | ||
multi_callbacks = [proc { 10 }, proc { 20 }, proc { 30 }] | ||
multi_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( | ||
'async_counter', 'description', 'unit', :observable_counter, | ||
meter_provider, instrumentation_scope, aggregation, | ||
multi_callbacks, timeout, attributes | ||
) | ||
multi_result = multi_stream.collect(0, 1000) | ||
_(multi_result.first.data_points.first.value).must_equal(60) # 10 + 20 + 30 | ||
end | ||
|
||
it 'handles multiple registered views with attribute merging' do | ||
view1 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( | ||
'async_counter', | ||
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Sum.new | ||
) | ||
view2 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( | ||
'async_counter', | ||
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new, | ||
attribute_keys: { 'environment' => 'production', 'service' => 'metrics' } | ||
) | ||
|
||
meter_provider.instance_variable_get(:@registered_views) << view1 | ||
meter_provider.instance_variable_get(:@registered_views) << view2 | ||
|
||
stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( | ||
'async_counter', 'description', 'unit', :observable_counter, | ||
meter_provider, instrumentation_scope, aggregation, | ||
callback, timeout, { 'original' => 'value' } | ||
) | ||
|
||
metric_data_array = stream.collect(0, 1000) | ||
_(metric_data_array.size).must_equal(2) | ||
|
||
# Verify view with attribute merging | ||
view_with_attrs = metric_data_array.find { |md| md.data_points.first.attributes.key?('service') } | ||
_(view_with_attrs).wont_be_nil | ||
attrs = view_with_attrs.data_points.first.attributes | ||
_(attrs['environment']).must_equal('production') | ||
_(attrs['service']).must_equal('metrics') | ||
_(attrs['original']).must_equal('value') | ||
end | ||
|
||
it 'handles callback exceptions' do | ||
error_callback = [proc { raise StandardError, 'Callback error' }] | ||
error_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( | ||
'async_counter', 'description', 'unit', :observable_counter, | ||
meter_provider, instrumentation_scope, aggregation, | ||
error_callback, timeout, attributes | ||
) | ||
|
||
# Capture the logged output | ||
original_logger = OpenTelemetry.logger | ||
log_output = StringIO.new | ||
OpenTelemetry.logger = Logger.new(log_output) | ||
error_stream.collect(0, 1000) | ||
assert_includes log_output.string, 'Error invoking callback: Callback error' | ||
OpenTelemetry.logger = original_logger | ||
end | ||
end | ||
|
||
describe '#invoke_callback' do | ||
it 'executes callbacks with timeout and handles thread safety with multiple callback' do | ||
# Test multiple callbacks in array | ||
multi_callbacks = [ | ||
proc { 10 }, | ||
proc { 20 }, | ||
proc { 30 } | ||
] | ||
multi_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( | ||
'async_counter', 'description', 'unit', :observable_counter, | ||
meter_provider, instrumentation_scope, aggregation, | ||
multi_callbacks, timeout, attributes | ||
) | ||
multi_stream.invoke_callback(timeout, attributes) | ||
|
||
# Test thread safety | ||
thread_count = 0 | ||
thread_callback = [proc { | ||
thread_count += 1 | ||
42 | ||
}] | ||
thread_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( | ||
'async_counter', 'description', 'unit', :observable_counter, | ||
meter_provider, instrumentation_scope, aggregation, | ||
thread_callback, timeout, attributes | ||
) | ||
|
||
metric_data = nil | ||
threads = Array.new(5) do | ||
# Thread.new { thread_stream.invoke_callback(timeout, attributes) } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we still need this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed. |
||
Thread.new { metric_data = thread_stream.collect(0, 10_000) } | ||
end | ||
threads.each(&:join) | ||
|
||
_(thread_count).must_equal(5) | ||
_(metric_data.first.data_points.first.value).must_equal 210 | ||
_(metric_data.first.data_points.first.attributes['environment']).must_equal 'test' | ||
_(metric_data.first.start_time_unix_nano).must_equal 0 | ||
_(metric_data.first.time_unix_nano).must_equal 10_000 | ||
end | ||
|
||
it 'respects timeout settings and handles slow callbacks' do | ||
skip 'Threading test unstable on TruffleRuby and JRuby' if %w[truffleruby jruby].include?(RUBY_ENGINE) | ||
|
||
# Test timeout handling | ||
slow_callback = [proc { | ||
sleep(0.1) | ||
42 | ||
}] | ||
stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( | ||
'async_counter', 'description', 'unit', :observable_counter, | ||
meter_provider, instrumentation_scope, aggregation, | ||
slow_callback, 0.05, attributes # Very short timeout | ||
) | ||
|
||
original_logger = OpenTelemetry.logger | ||
log_output = StringIO.new | ||
OpenTelemetry.logger = Logger.new(log_output) | ||
stream.invoke_callback(0.05, attributes) | ||
|
||
sleep 0.2 | ||
|
||
assert_includes log_output.string, 'Timeout while invoking callback' | ||
OpenTelemetry.logger = original_logger | ||
end | ||
end | ||
|
||
describe '#now_in_nano' do | ||
it 'returns current time in nanoseconds with increasing values' do | ||
nano_time = async_metric_stream.now_in_nano | ||
_(nano_time).must_be_instance_of(Integer) | ||
_(nano_time).must_be :>, 0 | ||
|
||
# Should be a reasonable timestamp (not too old, not in future) | ||
current_time_nano = (Time.now.to_r * 1_000_000_000).to_i | ||
_(nano_time).must_be_close_to(current_time_nano, 1_000_000_000) # Within 1 second | ||
|
||
# Test successive calls return increasing values | ||
sleep(0.001) # Small delay | ||
time2 = async_metric_stream.now_in_nano | ||
_(time2).must_be :>, nano_time | ||
end | ||
end | ||
|
||
|
||
describe 'aggregation and view integration' do | ||
it 'supports different aggregation types and accumulation' do | ||
# Test Sum aggregation accumulation | ||
callback_value = 100 | ||
callback_proc = [proc { callback_value }] | ||
stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( | ||
'async_counter', 'description', 'unit', :observable_counter, | ||
meter_provider, instrumentation_scope, aggregation, | ||
callback_proc, timeout, attributes | ||
) | ||
|
||
stream.collect(0, 1000) | ||
metric_data = stream.collect(1000, 2000) | ||
_(metric_data.first.data_points.first.value).must_equal 200 | ||
|
||
# Test LastValue aggregation | ||
last_value_aggregation = OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new | ||
stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( | ||
'async_gauge', 'description', 'units', :observable_gauge, | ||
meter_provider, instrumentation_scope, last_value_aggregation, | ||
callback_proc, timeout, attributes | ||
) | ||
|
||
# Calling it twice but last value should preserve last one instead of sum | ||
stream.collect(0, 1000) | ||
metric_data = stream.collect(0, 1000) | ||
_(metric_data.first.data_points.first.value).must_equal 100 | ||
end | ||
|
||
it 'handles view filtering and drop aggregation' do | ||
# Test view filtering by instrument name (non-matching) | ||
non_matching_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( | ||
'different_counter', | ||
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new | ||
) | ||
|
||
# Test view filtering by instrument type (matching) | ||
type_matching_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( | ||
nil, type: :observable_counter, | ||
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new | ||
) | ||
|
||
meter_provider.instance_variable_get(:@registered_views) << non_matching_view | ||
meter_provider.instance_variable_get(:@registered_views) << type_matching_view | ||
|
||
stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( | ||
'async_counter', 'description', 'unit', :observable_counter, | ||
meter_provider, instrumentation_scope, aggregation, | ||
callback, timeout, attributes | ||
) | ||
|
||
metric_data = stream.collect(0, 1000) | ||
_(metric_data.size).must_equal(1) # Should match type-based view | ||
|
||
# Test Drop aggregation | ||
drop_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new( | ||
'async_counter', | ||
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Drop.new | ||
) | ||
meter_provider.instance_variable_get(:@registered_views).clear | ||
meter_provider.instance_variable_get(:@registered_views) << drop_view | ||
|
||
drop_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( | ||
'async_counter', 'description', 'unit', :observable_counter, | ||
meter_provider, instrumentation_scope, aggregation, | ||
callback, timeout, attributes | ||
) | ||
|
||
dropped_data = drop_stream.collect(0, 1000) | ||
_(dropped_data.size).must_equal(1) | ||
_(dropped_data.first.data_points.first.value).must_equal(0) # Dropped value | ||
end | ||
end | ||
end |
Uh oh!
There was an error while loading. Please reload this page.