Skip to content

Commit 8ba003b

Browse files
feat: add view for asych instrument (#1887)
* feat: view for asych instrument * refactor * Apply suggestions from code review Co-authored-by: Kayla Reopelle <[email protected]> --------- Co-authored-by: Kayla Reopelle <[email protected]>
1 parent d067071 commit 8ba003b

File tree

3 files changed

+175
-57
lines changed

3 files changed

+175
-57
lines changed

metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb

Lines changed: 29 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,9 @@ module Metrics
1010
module State
1111
# @api private
1212
#
13-
# The MetricStream class provides SDK internal functionality that is not a part of the
14-
# public API.
15-
class AsynchronousMetricStream
16-
attr_reader :name, :description, :unit, :instrument_kind, :instrumentation_scope, :data_points
17-
13+
# The AsynchronousMetricStream class provides SDK internal functionality that is not a part of the
14+
# public API. It extends MetricStream to support asynchronous instruments.
15+
class AsynchronousMetricStream < MetricStream
1816
def initialize(
1917
name,
2018
description,
@@ -27,20 +25,14 @@ def initialize(
2725
timeout,
2826
attributes
2927
)
30-
@name = name
31-
@description = description
32-
@unit = unit
33-
@instrument_kind = instrument_kind
34-
@meter_provider = meter_provider
35-
@instrumentation_scope = instrumentation_scope
36-
@aggregation = aggregation
28+
# Call parent constructor with common parameters
29+
super(name, description, unit, instrument_kind, meter_provider, instrumentation_scope, aggregation)
30+
31+
# Initialize asynchronous-specific attributes
3732
@callback = callback
3833
@start_time = now_in_nano
3934
@timeout = timeout
4035
@attributes = attributes
41-
@data_points = {}
42-
43-
@mutex = Mutex.new
4436
end
4537

4638
# When collect, if there are asynchronous SDK Instruments involved, their callback functions will be triggered.
@@ -49,47 +41,36 @@ def initialize(
4941
def collect(start_time, end_time)
5042
invoke_callback(@timeout, @attributes)
5143

52-
@mutex.synchronize do
53-
MetricData.new(
54-
@name,
55-
@description,
56-
@unit,
57-
@instrument_kind,
58-
@meter_provider.resource,
59-
@instrumentation_scope,
60-
@aggregation.collect(start_time, end_time, @data_points),
61-
@aggregation.aggregation_temporality,
62-
start_time,
63-
end_time
64-
)
65-
end
44+
# Call parent collect method for the core collection logic
45+
super(start_time, end_time)
6646
end
6747

6848
def invoke_callback(timeout, attributes)
69-
@mutex.synchronize do
70-
Timeout.timeout(timeout || 30) do
71-
@callback.each do |cb|
72-
value = cb.call
73-
@aggregation.update(value, attributes, @data_points)
49+
if @registered_views.empty?
50+
@mutex.synchronize do
51+
Timeout.timeout(timeout || 30) do
52+
@callback.each do |cb|
53+
value = cb.call
54+
@default_aggregation.update(value, attributes, @data_points)
55+
end
56+
end
57+
end
58+
else
59+
@registered_views.each do |view|
60+
@mutex.synchronize do
61+
Timeout.timeout(timeout || 30) do
62+
@callback.each do |cb|
63+
value = cb.call
64+
merged_attributes = attributes || {}
65+
merged_attributes.merge!(view.attribute_keys)
66+
view.aggregation.update(value, merged_attributes, @data_points) if view.valid_aggregation?
67+
end
68+
end
7469
end
7570
end
7671
end
7772
end
7873

79-
def to_s
80-
instrument_info = +''
81-
instrument_info << "name=#{@name}"
82-
instrument_info << " description=#{@description}" if @description
83-
instrument_info << " unit=#{@unit}" if @unit
84-
@data_points.map do |attributes, value|
85-
metric_stream_string = +''
86-
metric_stream_string << instrument_info
87-
metric_stream_string << " attributes=#{attributes}" if attributes
88-
metric_stream_string << " #{value}"
89-
metric_stream_string
90-
end.join("\n")
91-
end
92-
9374
def now_in_nano
9475
(Time.now.to_r * 1_000_000_000).to_i
9576
end

metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
metric_exporter.pull
2424
last_snapshot = metric_exporter.metric_snapshots
2525

26-
# puts "last_snapshot.inspect: #{last_snapshot.inspect}"
2726
_(last_snapshot[0].name).must_equal('counter')
2827
_(last_snapshot[0].unit).must_equal('smidgen')
2928
_(last_snapshot[0].description).must_equal('a small amount of something')
@@ -42,7 +41,6 @@
4241
metric_exporter.pull
4342
last_snapshot = metric_exporter.metric_snapshots
4443

45-
# puts "last_snapshot.inspect: #{last_snapshot.inspect}"
4644
_(last_snapshot[0].name).must_equal('counter')
4745
_(last_snapshot[0].unit).must_equal('smidgen')
4846
_(last_snapshot[0].description).must_equal('a small amount of something')
@@ -103,11 +101,7 @@
103101
metric_exporter.pull
104102
last_snapshot = metric_exporter.metric_snapshots
105103

106-
_(last_snapshot[0].name).must_equal('counter')
107-
_(last_snapshot[0].unit).must_equal('smidgen')
108-
_(last_snapshot[0].description).must_equal('a small amount of something')
109-
_(last_snapshot[0].instrumentation_scope.name).must_equal('test')
110-
_(last_snapshot[0].data_points.size).must_equal 0
104+
_(last_snapshot.size).must_equal 0
111105
end
112106

113107
it 'creation of instruments with more than one callabck' do
@@ -134,7 +128,7 @@
134128

135129
metric_exporter.pull
136130
last_snapshot = metric_exporter.metric_snapshots
137-
_(last_snapshot[0].data_points.size).must_equal 0
131+
_(last_snapshot.size).must_equal 0
138132
end
139133

140134
it 'creation of instruments with invalid argument result no callback' do
@@ -144,6 +138,6 @@
144138

145139
metric_exporter.pull
146140
last_snapshot = metric_exporter.metric_snapshots
147-
_(last_snapshot[0].data_points.size).must_equal 0
141+
_(last_snapshot.size).must_equal 0
148142
end
149143
end

metrics_sdk/test/opentelemetry/sdk/metrics/view/registered_view_test.rb

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,149 @@
9090
end
9191
end
9292

93+
describe '#registered_view with asynchronous counters' do
94+
before { reset_metrics_sdk }
95+
96+
it 'emits asynchronous counter metrics with no data_points if view is drop' do
97+
OpenTelemetry::SDK.configure
98+
99+
metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
100+
OpenTelemetry.meter_provider.add_metric_reader(metric_exporter)
101+
102+
meter = OpenTelemetry.meter_provider.meter('test')
103+
OpenTelemetry.meter_provider.add_view('async_counter', aggregation: ::OpenTelemetry::SDK::Metrics::Aggregation::Drop.new)
104+
105+
callback = proc { 42 }
106+
meter.create_observable_counter('async_counter', unit: 'smidgen', description: 'an async counter', callback: callback)
107+
108+
metric_exporter.pull
109+
last_snapshot = metric_exporter.metric_snapshots
110+
111+
_(last_snapshot).wont_be_empty
112+
_(last_snapshot[0].name).must_equal('async_counter')
113+
_(last_snapshot[0].unit).must_equal('smidgen')
114+
_(last_snapshot[0].description).must_equal('an async counter')
115+
_(last_snapshot[0].instrumentation_scope.name).must_equal('test')
116+
117+
_(last_snapshot[0].data_points[0].value).must_equal 0
118+
_(last_snapshot[0].data_points[0].start_time_unix_nano).must_equal 0
119+
_(last_snapshot[0].data_points[0].time_unix_nano).must_equal 0
120+
end
121+
122+
it 'emits asynchronous counter metrics with only last value in data_points if view is last_value' do
123+
OpenTelemetry::SDK.configure
124+
125+
metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
126+
OpenTelemetry.meter_provider.add_metric_reader(metric_exporter)
127+
128+
meter = OpenTelemetry.meter_provider.meter('test')
129+
OpenTelemetry.meter_provider.add_view('async_counter', aggregation: ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new)
130+
131+
# Create a callback that returns different values each time it's called
132+
call_count = 0
133+
callback = proc do
134+
call_count += 1
135+
final_count = call_count * 10
136+
final_count
137+
end
138+
139+
meter.create_observable_counter('async_counter', unit: 'smidgen', description: 'an async counter', callback: callback)
140+
141+
# Trigger multiple collections to simulate multiple callback invocations
142+
3.times { metric_exporter.pull }
143+
last_snapshot = metric_exporter.metric_snapshots
144+
145+
# Reason that use 3rd from last_snapshot, because in_memory_metrics_pull exporter
146+
# will store each collected metrics into its own data store unit (special case for the type of exporter)
147+
_(last_snapshot[2].data_points).wont_be_empty
148+
_(last_snapshot[2].data_points[0].value).must_equal 30
149+
end
150+
151+
it 'emits asynchronous counter metrics with sum of values if view is drop but not matching to instrument' do
152+
OpenTelemetry::SDK.configure
153+
154+
metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
155+
OpenTelemetry.meter_provider.add_metric_reader(metric_exporter)
156+
157+
meter = OpenTelemetry.meter_provider.meter('test')
158+
# View name doesn't match the instrument name
159+
OpenTelemetry.meter_provider.add_view('retnuoc_cnysa', aggregation: ::OpenTelemetry::SDK::Metrics::Aggregation::Drop.new)
160+
161+
callback = proc { 15 }
162+
meter.create_observable_counter('async_counter', unit: 'smidgen', description: 'an async counter', callback: callback)
163+
164+
metric_exporter.pull
165+
last_snapshot = metric_exporter.metric_snapshots
166+
167+
_(last_snapshot[0].data_points).wont_be_empty
168+
# Since view doesn't match, it should use default aggregation (sum for counters)
169+
_(last_snapshot[0].data_points[0].value).must_equal 15
170+
end
171+
172+
it 'emits asynchronous counter metrics with multiple registered views' do
173+
OpenTelemetry::SDK.configure
174+
175+
metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
176+
OpenTelemetry.meter_provider.add_metric_reader(metric_exporter)
177+
178+
meter = OpenTelemetry.meter_provider.meter('test')
179+
# Add multiple views for the same instrument
180+
OpenTelemetry.meter_provider.add_view('async_counter', aggregation: ::OpenTelemetry::SDK::Metrics::Aggregation::Sum.new)
181+
OpenTelemetry.meter_provider.add_view('async_counter', aggregation: ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new)
182+
183+
callback = proc { 25 }
184+
meter.create_observable_counter('async_counter', unit: 'smidgen', description: 'an async counter', callback: callback)
185+
186+
metric_exporter.pull
187+
last_snapshot = metric_exporter.metric_snapshots
188+
189+
# Should have multiple metric data entries (one for each view)
190+
_(last_snapshot.size).must_be :>=, 2
191+
192+
# All should have the same instrument metadata
193+
last_snapshot.each do |snapshot|
194+
_(snapshot.name).must_equal('async_counter')
195+
_(snapshot.unit).must_equal('smidgen')
196+
_(snapshot.description).must_equal('an async counter')
197+
_(snapshot.instrumentation_scope.name).must_equal('test')
198+
_(snapshot.data_points).wont_be_empty
199+
end
200+
end
201+
202+
it 'emits asynchronous counter metrics with view attribute filtering' do
203+
OpenTelemetry::SDK.configure
204+
205+
metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
206+
OpenTelemetry.meter_provider.add_metric_reader(metric_exporter)
207+
208+
meter = OpenTelemetry.meter_provider.meter('test')
209+
210+
# Create a view that adds specific attributes
211+
view_with_attributes = OpenTelemetry::SDK::Metrics::View::RegisteredView.new(
212+
'async_counter',
213+
aggregation: ::OpenTelemetry::SDK::Metrics::Aggregation::Sum.new,
214+
attribute_keys: { 'environment' => 'test', 'service' => 'metrics' }
215+
)
216+
OpenTelemetry.meter_provider.instance_variable_get(:@registered_views) << view_with_attributes
217+
218+
callback = proc { 35 }
219+
observable_counter = meter.create_observable_counter('async_counter', unit: 'smidgen', description: 'an async counter', callback: callback)
220+
observable_counter.add_attributes({ 'original' => 'value' })
221+
222+
metric_exporter.pull
223+
last_snapshot = metric_exporter.metric_snapshots
224+
225+
_(last_snapshot[0].data_points).wont_be_empty
226+
_(last_snapshot[0].data_points[0].value).must_equal 35
227+
228+
# Check that view attributes are merged with original attributes
229+
attributes = last_snapshot[0].data_points[0].attributes
230+
_(attributes['environment']).must_equal 'test'
231+
_(attributes['service']).must_equal 'metrics'
232+
_(attributes['original']).must_equal 'value'
233+
end
234+
end
235+
93236
describe '#registered_view select instrument' do
94237
let(:registered_view) { OpenTelemetry::SDK::Metrics::View::RegisteredView.new(nil, aggregation: ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new) }
95238
let(:instrumentation_scope) do

0 commit comments

Comments
 (0)