Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ module State
# The AsynchronousMetricStream class provides SDK internal functionality that is not a part of the
# public API. It extends MetricStream to support asynchronous instruments.
class AsynchronousMetricStream < MetricStream
DEFAULT_TIMEOUT = 30

def initialize(
name,
description,
Expand Down Expand Up @@ -48,28 +50,49 @@ def collect(start_time, end_time)
def invoke_callback(timeout, attributes)
if @registered_views.empty?
@mutex.synchronize do
Timeout.timeout(timeout || 30) do
@callback.each do |cb|
value = cb.call
@default_aggregation.update(value, attributes, @data_points)
end
@callback.each do |cb|
value = safe_guard_callback(cb, timeout: timeout)
@default_aggregation.update(value, attributes, @data_points) if value.is_a?(Numeric)
end
end
else
@registered_views.each do |view|
@mutex.synchronize do
Timeout.timeout(timeout || 30) do
@callback.each do |cb|
value = cb.call
merged_attributes = attributes || {}
merged_attributes.merge!(view.attribute_keys)
view.aggregation.update(value, merged_attributes, @data_points) if view.valid_aggregation?
end
@callback.each do |cb|
value = safe_guard_callback(cb, timeout: timeout)
next unless value.is_a?(Numeric) # ignore if value is not valid number

merged_attributes = attributes || {}
merged_attributes.merge!(view.attribute_keys)
view.aggregation.update(value, merged_attributes, @data_points) if view.valid_aggregation?
end
end
end
end
end

private

def safe_guard_callback(callback, timeout: DEFAULT_TIMEOUT)
result = nil
thread = Thread.new do
result = callback.call
rescue StandardError => e
OpenTelemetry.logger.error("Error invoking callback: #{e.message}")
result = :error
end

unless thread.join(timeout)
thread.kill
OpenTelemetry.logger.error("Timeout while invoking callback after #{timeout} seconds")
return nil
end

result == :error ? nil : result
rescue StandardError => e
OpenTelemetry.logger.error("Unexpected error in callback execution: #{e.message}")
nil
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def collect(start_time, end_time)
end
end

# view will modify the data_point that is not suitable when there are multiple views
def update(value, attributes)
if @registered_views.empty?
@mutex.synchronize { @default_aggregation.update(value, attributes, @data_points) }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require 'test_helper'

describe OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream do
let(:meter_provider) { OpenTelemetry::SDK::Metrics::MeterProvider.new }
let(:instrumentation_scope) { OpenTelemetry::SDK::InstrumentationScope.new('test_scope', '1.0.0') }
let(:aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Sum.new }
let(:callback) { [proc { 42 }] }
let(:timeout) { 10 }
let(:attributes) { { 'environment' => 'test' } }
let(:async_metric_stream) do
OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
'async_counter',
'An async counter',
'count',
:observable_counter,
meter_provider,
instrumentation_scope,
aggregation,
callback,
timeout,
attributes
)
end

describe '#initialize' do
it 'initializes with provided parameters and async-specific attributes' do
_(async_metric_stream.name).must_equal('async_counter')
_(async_metric_stream.description).must_equal('An async counter')
_(async_metric_stream.unit).must_equal('count')
_(async_metric_stream.instrument_kind).must_equal(:observable_counter)
_(async_metric_stream.instrumentation_scope).must_equal(instrumentation_scope)
_(async_metric_stream.data_points).must_be_instance_of(Hash)
_(async_metric_stream.data_points).must_be_empty

# Verify async-specific attributes
_(async_metric_stream.instance_variable_get(:@callback)).must_equal(callback)
_(async_metric_stream.instance_variable_get(:@timeout)).must_equal(timeout)
_(async_metric_stream.instance_variable_get(:@start_time)).must_be_instance_of(Integer)
_(async_metric_stream.instance_variable_get(:@start_time)).must_be :>, 0
end

it 'finds and registers matching views during initialization' do
view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new(
'async_counter',
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new
)
meter_provider.instance_variable_get(:@registered_views) << view

stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
'async_counter',
'An async counter',
'count',
:observable_counter,
meter_provider,
instrumentation_scope,
aggregation,
callback,
timeout,
attributes
)

registered_views = stream.instance_variable_get(:@registered_views)
_(registered_views.size).must_equal(1)
_(registered_views.first.aggregation.class).must_equal ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue
end
end

describe '#collect' do
it 'invokes callback and handles various collection scenarios' do
# Test basic collection with callback value and attributes
metric_data_array = async_metric_stream.collect(0, 1000)
_(metric_data_array).must_be_instance_of(Array)
_(metric_data_array.size).must_equal(1)

metric_data = metric_data_array.first
_(metric_data).must_be_instance_of(OpenTelemetry::SDK::Metrics::State::MetricData)
_(metric_data.name).must_equal('async_counter')
_(metric_data.start_time_unix_nano).must_equal(0)
_(metric_data.time_unix_nano).must_equal(1000)
_(metric_data.data_points.first.value).must_equal(42)
_(metric_data.data_points.first.attributes).must_equal(attributes)

# Test empty collection when callback returns nil
empty_callback = [proc { nil }]
empty_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
'async_counter', 'description', 'unit', :observable_counter,
meter_provider, instrumentation_scope, aggregation,
empty_callback, timeout, {}
)
_(empty_stream.collect(0, 1000)).must_be_empty

# Test multiple callbacks accumulation
multi_callbacks = [proc { 10 }, proc { 20 }, proc { 30 }]
multi_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
'async_counter', 'description', 'unit', :observable_counter,
meter_provider, instrumentation_scope, aggregation,
multi_callbacks, timeout, attributes
)
multi_result = multi_stream.collect(0, 1000)
_(multi_result.first.data_points.first.value).must_equal(60) # 10 + 20 + 30
end

it 'handles multiple registered views with attribute merging' do
view1 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new(
'async_counter',
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Sum.new
)
view2 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new(
'async_counter',
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new,
attribute_keys: { 'environment' => 'production', 'service' => 'metrics' }
)

meter_provider.instance_variable_get(:@registered_views) << view1
meter_provider.instance_variable_get(:@registered_views) << view2

stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
'async_counter', 'description', 'unit', :observable_counter,
meter_provider, instrumentation_scope, aggregation,
callback, timeout, { 'original' => 'value' }
)

metric_data_array = stream.collect(0, 1000)
_(metric_data_array.size).must_equal(2)

# Verify view with attribute merging
view_with_attrs = metric_data_array.find { |md| md.data_points.first.attributes.key?('service') }
_(view_with_attrs).wont_be_nil
attrs = view_with_attrs.data_points.first.attributes
_(attrs['environment']).must_equal('production')
_(attrs['service']).must_equal('metrics')
_(attrs['original']).must_equal('value')
end

it 'handles callback exceptions' do
error_callback = [proc { raise StandardError, 'Callback error' }]
error_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
'async_counter', 'description', 'unit', :observable_counter,
meter_provider, instrumentation_scope, aggregation,
error_callback, timeout, attributes
)

# Capture the logged output
original_logger = OpenTelemetry.logger
log_output = StringIO.new
OpenTelemetry.logger = Logger.new(log_output)
error_stream.collect(0, 1000)
assert_includes log_output.string, 'Error invoking callback: Callback error'
OpenTelemetry.logger = original_logger
end
end

describe '#invoke_callback' do
it 'executes callbacks with timeout and handles thread safety with multiple callback' do
# Test multiple callbacks in array
multi_callbacks = [
proc { 10 },
proc { 20 },
proc { 30 }
]
multi_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
'async_counter', 'description', 'unit', :observable_counter,
meter_provider, instrumentation_scope, aggregation,
multi_callbacks, timeout, attributes
)
multi_stream.invoke_callback(timeout, attributes)

# Test thread safety
thread_count = 0
thread_callback = [proc {
thread_count += 1
42
}]
thread_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
'async_counter', 'description', 'unit', :observable_counter,
meter_provider, instrumentation_scope, aggregation,
thread_callback, timeout, attributes
)

metric_data = nil
threads = Array.new(5) do
# Thread.new { thread_stream.invoke_callback(timeout, attributes) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Thread.new { metric_data = thread_stream.collect(0, 10_000) }
end
threads.each(&:join)

_(thread_count).must_equal(5)
_(metric_data.first.data_points.first.value).must_equal 210
_(metric_data.first.data_points.first.attributes['environment']).must_equal 'test'
_(metric_data.first.start_time_unix_nano).must_equal 0
_(metric_data.first.time_unix_nano).must_equal 10_000
end
end

describe 'aggregation and view integration' do
it 'supports different aggregation types and accumulation' do
# Test Sum aggregation accumulation
callback_value = 100
callback_proc = [proc { callback_value }]
stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
'async_counter', 'description', 'unit', :observable_counter,
meter_provider, instrumentation_scope, aggregation,
callback_proc, timeout, attributes
)

stream.collect(0, 1000)
metric_data = stream.collect(1000, 2000)
_(metric_data.first.data_points.first.value).must_equal 200

# Test LastValue aggregation
last_value_aggregation = OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new
stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
'async_gauge', 'description', 'units', :observable_gauge,
meter_provider, instrumentation_scope, last_value_aggregation,
callback_proc, timeout, attributes
)

# Calling it twice but last value should preserve last one instead of sum
stream.collect(0, 1000)
metric_data = stream.collect(0, 1000)
_(metric_data.first.data_points.first.value).must_equal 100
end

it 'handles view filtering and drop aggregation' do
# Test view filtering by instrument name (non-matching)
non_matching_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new(
'different_counter',
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new
)

# Test view filtering by instrument type (matching)
type_matching_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new(
nil, type: :observable_counter,
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new
)

meter_provider.instance_variable_get(:@registered_views) << non_matching_view
meter_provider.instance_variable_get(:@registered_views) << type_matching_view

stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
'async_counter', 'description', 'unit', :observable_counter,
meter_provider, instrumentation_scope, aggregation,
callback, timeout, attributes
)

metric_data = stream.collect(0, 1000)
_(metric_data.size).must_equal(1) # Should match type-based view

# Test Drop aggregation
drop_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new(
'async_counter',
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Drop.new
)
meter_provider.instance_variable_get(:@registered_views).clear
meter_provider.instance_variable_get(:@registered_views) << drop_view

drop_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
'async_counter', 'description', 'unit', :observable_counter,
meter_provider, instrumentation_scope, aggregation,
callback, timeout, attributes
)

dropped_data = drop_stream.collect(0, 1000)
_(dropped_data.size).must_equal(1)
_(dropped_data.first.data_points.first.value).must_equal(0) # Dropped value
end
end
end
Loading