Skip to content

Commit b1d95b4

Browse files
fix: add test case for metric_store and metric_view (#1894)
* test: test case for metric_stream and store * update test case * refine the test case; safeguard the callback * remove typo * replace unsafe timeout.timeout with thread join time * test fix * sleep 0.2 * skip some test for truffleruby * update test case * lint * skip thread related test * remove unstable test due to thread management * lint --------- Co-authored-by: Kayla Reopelle <[email protected]>
1 parent 0e26bcf commit b1d95b4

File tree

5 files changed

+658
-14
lines changed

5 files changed

+658
-14
lines changed

metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ module State
1313
# The AsynchronousMetricStream class provides SDK internal functionality that is not a part of the
1414
# public API. It extends MetricStream to support asynchronous instruments.
1515
class AsynchronousMetricStream < MetricStream
16+
DEFAULT_TIMEOUT = 30
17+
1618
def initialize(
1719
name,
1820
description,
@@ -48,28 +50,49 @@ def collect(start_time, end_time)
4850
def invoke_callback(timeout, attributes)
4951
if @registered_views.empty?
5052
@mutex.synchronize do
51-
Timeout.timeout(timeout || 30) do
52-
@callback.each do |cb|
53-
value = cb.call
54-
@default_aggregation.update(value, attributes, @data_points)
55-
end
53+
@callback.each do |cb|
54+
value = safe_guard_callback(cb, timeout: timeout)
55+
@default_aggregation.update(value, attributes, @data_points) if value.is_a?(Numeric)
5656
end
5757
end
5858
else
5959
@registered_views.each do |view|
6060
@mutex.synchronize do
61-
Timeout.timeout(timeout || 30) do
62-
@callback.each do |cb|
63-
value = cb.call
64-
merged_attributes = attributes || {}
65-
merged_attributes.merge!(view.attribute_keys)
66-
view.aggregation.update(value, merged_attributes, @data_points) if view.valid_aggregation?
67-
end
61+
@callback.each do |cb|
62+
value = safe_guard_callback(cb, timeout: timeout)
63+
next unless value.is_a?(Numeric) # ignore if value is not valid number
64+
65+
merged_attributes = attributes || {}
66+
merged_attributes.merge!(view.attribute_keys)
67+
view.aggregation.update(value, merged_attributes, @data_points) if view.valid_aggregation?
6868
end
6969
end
7070
end
7171
end
7272
end
73+
74+
private
75+
76+
def safe_guard_callback(callback, timeout: DEFAULT_TIMEOUT)
77+
result = nil
78+
thread = Thread.new do
79+
result = callback.call
80+
rescue StandardError => e
81+
OpenTelemetry.logger.error("Error invoking callback: #{e.message}")
82+
result = :error
83+
end
84+
85+
unless thread.join(timeout)
86+
thread.kill
87+
OpenTelemetry.logger.error("Timeout while invoking callback after #{timeout} seconds")
88+
return nil
89+
end
90+
91+
result == :error ? nil : result
92+
rescue StandardError => e
93+
OpenTelemetry.logger.error("Unexpected error in callback execution: #{e.message}")
94+
nil
95+
end
7396
end
7497
end
7598
end

metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def collect(start_time, end_time)
5555
end
5656
end
5757

58+
# view will modify the data_point that is not suitable when there are multiple views
5859
def update(value, attributes)
5960
if @registered_views.empty?
6061
@mutex.synchronize { @default_aggregation.update(value, attributes, @data_points) }
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright The OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
require 'test_helper'
8+
9+
describe OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream do
10+
let(:meter_provider) { OpenTelemetry::SDK::Metrics::MeterProvider.new }
11+
let(:instrumentation_scope) { OpenTelemetry::SDK::InstrumentationScope.new('test_scope', '1.0.0') }
12+
let(:aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Sum.new }
13+
let(:callback) { [proc { 42 }] }
14+
let(:timeout) { 10 }
15+
let(:attributes) { { 'environment' => 'test' } }
16+
let(:async_metric_stream) do
17+
OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
18+
'async_counter',
19+
'An async counter',
20+
'count',
21+
:observable_counter,
22+
meter_provider,
23+
instrumentation_scope,
24+
aggregation,
25+
callback,
26+
timeout,
27+
attributes
28+
)
29+
end
30+
31+
describe '#initialize' do
32+
it 'initializes with provided parameters and async-specific attributes' do
33+
_(async_metric_stream.name).must_equal('async_counter')
34+
_(async_metric_stream.description).must_equal('An async counter')
35+
_(async_metric_stream.unit).must_equal('count')
36+
_(async_metric_stream.instrument_kind).must_equal(:observable_counter)
37+
_(async_metric_stream.instrumentation_scope).must_equal(instrumentation_scope)
38+
_(async_metric_stream.data_points).must_be_instance_of(Hash)
39+
_(async_metric_stream.data_points).must_be_empty
40+
41+
# Verify async-specific attributes
42+
_(async_metric_stream.instance_variable_get(:@callback)).must_equal(callback)
43+
_(async_metric_stream.instance_variable_get(:@timeout)).must_equal(timeout)
44+
_(async_metric_stream.instance_variable_get(:@start_time)).must_be_instance_of(Integer)
45+
_(async_metric_stream.instance_variable_get(:@start_time)).must_be :>, 0
46+
end
47+
48+
it 'finds and registers matching views during initialization' do
49+
view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new(
50+
'async_counter',
51+
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new
52+
)
53+
meter_provider.instance_variable_get(:@registered_views) << view
54+
55+
stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
56+
'async_counter',
57+
'An async counter',
58+
'count',
59+
:observable_counter,
60+
meter_provider,
61+
instrumentation_scope,
62+
aggregation,
63+
callback,
64+
timeout,
65+
attributes
66+
)
67+
68+
registered_views = stream.instance_variable_get(:@registered_views)
69+
_(registered_views.size).must_equal(1)
70+
_(registered_views.first.aggregation.class).must_equal ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue
71+
end
72+
end
73+
74+
describe '#collect' do
75+
it 'invokes callback and handles various collection scenarios' do
76+
# Test basic collection with callback value and attributes
77+
metric_data_array = async_metric_stream.collect(0, 1000)
78+
_(metric_data_array).must_be_instance_of(Array)
79+
_(metric_data_array.size).must_equal(1)
80+
81+
metric_data = metric_data_array.first
82+
_(metric_data).must_be_instance_of(OpenTelemetry::SDK::Metrics::State::MetricData)
83+
_(metric_data.name).must_equal('async_counter')
84+
_(metric_data.start_time_unix_nano).must_equal(0)
85+
_(metric_data.time_unix_nano).must_equal(1000)
86+
_(metric_data.data_points.first.value).must_equal(42)
87+
_(metric_data.data_points.first.attributes).must_equal(attributes)
88+
89+
# Test empty collection when callback returns nil
90+
empty_callback = [proc { nil }]
91+
empty_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
92+
'async_counter', 'description', 'unit', :observable_counter,
93+
meter_provider, instrumentation_scope, aggregation,
94+
empty_callback, timeout, {}
95+
)
96+
_(empty_stream.collect(0, 1000)).must_be_empty
97+
98+
# Test multiple callbacks accumulation
99+
multi_callbacks = [proc { 10 }, proc { 20 }, proc { 30 }]
100+
multi_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
101+
'async_counter', 'description', 'unit', :observable_counter,
102+
meter_provider, instrumentation_scope, aggregation,
103+
multi_callbacks, timeout, attributes
104+
)
105+
multi_result = multi_stream.collect(0, 1000)
106+
_(multi_result.first.data_points.first.value).must_equal(60) # 10 + 20 + 30
107+
end
108+
109+
it 'handles multiple registered views with attribute merging' do
110+
view1 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new(
111+
'async_counter',
112+
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Sum.new
113+
)
114+
view2 = OpenTelemetry::SDK::Metrics::View::RegisteredView.new(
115+
'async_counter',
116+
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new,
117+
attribute_keys: { 'environment' => 'production', 'service' => 'metrics' }
118+
)
119+
120+
meter_provider.instance_variable_get(:@registered_views) << view1
121+
meter_provider.instance_variable_get(:@registered_views) << view2
122+
123+
stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
124+
'async_counter', 'description', 'unit', :observable_counter,
125+
meter_provider, instrumentation_scope, aggregation,
126+
callback, timeout, { 'original' => 'value' }
127+
)
128+
129+
metric_data_array = stream.collect(0, 1000)
130+
_(metric_data_array.size).must_equal(2)
131+
132+
# Verify view with attribute merging
133+
view_with_attrs = metric_data_array.find { |md| md.data_points.first.attributes.key?('service') }
134+
_(view_with_attrs).wont_be_nil
135+
attrs = view_with_attrs.data_points.first.attributes
136+
_(attrs['environment']).must_equal('production')
137+
_(attrs['service']).must_equal('metrics')
138+
_(attrs['original']).must_equal('value')
139+
end
140+
141+
it 'handles callback exceptions' do
142+
error_callback = [proc { raise StandardError, 'Callback error' }]
143+
error_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
144+
'async_counter', 'description', 'unit', :observable_counter,
145+
meter_provider, instrumentation_scope, aggregation,
146+
error_callback, timeout, attributes
147+
)
148+
149+
# Capture the logged output
150+
original_logger = OpenTelemetry.logger
151+
log_output = StringIO.new
152+
OpenTelemetry.logger = Logger.new(log_output)
153+
error_stream.collect(0, 1000)
154+
assert_includes log_output.string, 'Error invoking callback: Callback error'
155+
OpenTelemetry.logger = original_logger
156+
end
157+
end
158+
159+
describe '#invoke_callback' do
160+
it 'executes callbacks with timeout and handles thread safety with multiple callback' do
161+
# Test multiple callbacks in array
162+
multi_callbacks = [
163+
proc { 10 },
164+
proc { 20 },
165+
proc { 30 }
166+
]
167+
multi_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
168+
'async_counter', 'description', 'unit', :observable_counter,
169+
meter_provider, instrumentation_scope, aggregation,
170+
multi_callbacks, timeout, attributes
171+
)
172+
multi_stream.invoke_callback(timeout, attributes)
173+
174+
# Test thread safety
175+
thread_count = 0
176+
thread_callback = [proc {
177+
thread_count += 1
178+
42
179+
}]
180+
thread_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
181+
'async_counter', 'description', 'unit', :observable_counter,
182+
meter_provider, instrumentation_scope, aggregation,
183+
thread_callback, timeout, attributes
184+
)
185+
186+
metric_data = nil
187+
threads = Array.new(5) do
188+
# Thread.new { thread_stream.invoke_callback(timeout, attributes) }
189+
Thread.new { metric_data = thread_stream.collect(0, 10_000) }
190+
end
191+
threads.each(&:join)
192+
193+
_(thread_count).must_equal(5)
194+
_(metric_data.first.data_points.first.value).must_equal 210
195+
_(metric_data.first.data_points.first.attributes['environment']).must_equal 'test'
196+
_(metric_data.first.start_time_unix_nano).must_equal 0
197+
_(metric_data.first.time_unix_nano).must_equal 10_000
198+
end
199+
end
200+
201+
describe 'aggregation and view integration' do
202+
it 'supports different aggregation types and accumulation' do
203+
# Test Sum aggregation accumulation
204+
callback_value = 100
205+
callback_proc = [proc { callback_value }]
206+
stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
207+
'async_counter', 'description', 'unit', :observable_counter,
208+
meter_provider, instrumentation_scope, aggregation,
209+
callback_proc, timeout, attributes
210+
)
211+
212+
stream.collect(0, 1000)
213+
metric_data = stream.collect(1000, 2000)
214+
_(metric_data.first.data_points.first.value).must_equal 200
215+
216+
# Test LastValue aggregation
217+
last_value_aggregation = OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new
218+
stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
219+
'async_gauge', 'description', 'units', :observable_gauge,
220+
meter_provider, instrumentation_scope, last_value_aggregation,
221+
callback_proc, timeout, attributes
222+
)
223+
224+
# Calling it twice but last value should preserve last one instead of sum
225+
stream.collect(0, 1000)
226+
metric_data = stream.collect(0, 1000)
227+
_(metric_data.first.data_points.first.value).must_equal 100
228+
end
229+
230+
it 'handles view filtering and drop aggregation' do
231+
# Test view filtering by instrument name (non-matching)
232+
non_matching_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new(
233+
'different_counter',
234+
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new
235+
)
236+
237+
# Test view filtering by instrument type (matching)
238+
type_matching_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new(
239+
nil, type: :observable_counter,
240+
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new
241+
)
242+
243+
meter_provider.instance_variable_get(:@registered_views) << non_matching_view
244+
meter_provider.instance_variable_get(:@registered_views) << type_matching_view
245+
246+
stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
247+
'async_counter', 'description', 'unit', :observable_counter,
248+
meter_provider, instrumentation_scope, aggregation,
249+
callback, timeout, attributes
250+
)
251+
252+
metric_data = stream.collect(0, 1000)
253+
_(metric_data.size).must_equal(1) # Should match type-based view
254+
255+
# Test Drop aggregation
256+
drop_view = OpenTelemetry::SDK::Metrics::View::RegisteredView.new(
257+
'async_counter',
258+
aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Drop.new
259+
)
260+
meter_provider.instance_variable_get(:@registered_views).clear
261+
meter_provider.instance_variable_get(:@registered_views) << drop_view
262+
263+
drop_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
264+
'async_counter', 'description', 'unit', :observable_counter,
265+
meter_provider, instrumentation_scope, aggregation,
266+
callback, timeout, attributes
267+
)
268+
269+
dropped_data = drop_stream.collect(0, 1000)
270+
_(dropped_data.size).must_equal(1)
271+
_(dropped_data.first.data_points.first.value).must_equal(0) # Dropped value
272+
end
273+
end
274+
end

0 commit comments

Comments
 (0)