Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exporter/otlp-metrics/test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
OpenTelemetry.logger = Logger.new(File::NULL)

module MockSum
def collect(start_time, end_time, data_points)
def collect(start_time, end_time)
start_time = 1_699_593_427_329_946_585 # rubocop:disable Lint/ShadowedArgument
end_time = 1_699_593_427_329_946_586 # rubocop:disable Lint/ShadowedArgument
super
Expand Down
12 changes: 8 additions & 4 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/drop.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ module Metrics
module Aggregation
# Contains the implementation of the Drop aggregation
class Drop
def collect(start_time, end_time, data_points)
data_points.values.map!(&:dup)
def initialize
@data_points = {}
end

def update(increment, attributes, data_points)
data_points[attributes] = NumberDataPoint.new(
def collect(start_time, end_time)
@data_points.values.map!(&:dup)
end

def update(increment, attributes)
@data_points[attributes] = NumberDataPoint.new(
{},
0,
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ def initialize(
@aggregation_temporality = AggregationTemporality.determine_temporality(aggregation_temporality: aggregation_temporality, default: :cumulative)
@boundaries = boundaries && !boundaries.empty? ? boundaries.sort : nil
@record_min_max = record_min_max
@data_points = {}
end

def collect(start_time, end_time, data_points)
def collect(start_time, end_time)
if @aggregation_temporality.delta?
# Set timestamps and 'move' data point values to result.
hdps = data_points.values.map! do |hdp|
hdps = @data_points.values.map! do |hdp|
hdp.start_time_unix_nano = start_time
hdp.time_unix_nano = end_time
hdp
end
data_points.clear
@data_points.clear
hdps
else
# Update timestamps and take a snapshot.
data_points.values.map! do |hdp|
@data_points.values.map! do |hdp|
hdp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation.
hdp.time_unix_nano = end_time
hdp = hdp.dup
Expand All @@ -50,14 +51,14 @@ def collect(start_time, end_time, data_points)
end
end

def update(amount, attributes, data_points)
hdp = data_points.fetch(attributes) do
def update(amount, attributes)
hdp = @data_points.fetch(attributes) do
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end

data_points[attributes] = HistogramDataPoint.new(
@data_points[attributes] = HistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
nil, # :time_unix_nano
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,24 @@ def initialize(
@zero_count = 0
@size = validate_size(max_size)
@scale = validate_scale(max_scale)
@data_points = {}

@mapping = new_mapping(@scale)
end

def collect(start_time, end_time, data_points)
def collect(start_time, end_time)
if @aggregation_temporality.delta?
# Set timestamps and 'move' data point values to result.
hdps = data_points.values.map! do |hdp|
hdps = @data_points.values.map! do |hdp|
hdp.start_time_unix_nano = start_time
hdp.time_unix_nano = end_time
hdp
end
data_points.clear
@data_points.clear
hdps
else
# Update timestamps and take a snapshot.
data_points.values.map! do |hdp|
@data_points.values.map! do |hdp|
hdp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation.
hdp.time_unix_nano = end_time
hdp = hdp.dup
Expand All @@ -70,15 +71,14 @@ def collect(start_time, end_time, data_points)
end

# rubocop:disable Metrics/MethodLength
def update(amount, attributes, data_points)
# fetch or initialize the ExponentialHistogramDataPoint
hdp = data_points.fetch(attributes) do
def update(amount, attributes)
hdp = @data_points.fetch(attributes) do
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end

data_points[attributes] = ExponentialHistogramDataPoint.new(
@data_points[attributes] = ExponentialHistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
0, # :time_unix_nano
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,22 @@ module Metrics
module Aggregation
# Contains the implementation of the LastValue aggregation
class LastValue
def collect(start_time, end_time, data_points)
ndps = data_points.values.map! do |ndp|
def initialize
@data_points = {}
end

def collect(start_time, end_time)
ndps = @data_points.values.map! do |ndp|
ndp.start_time_unix_nano = start_time
ndp.time_unix_nano = end_time
ndp
end
data_points.clear
@data_points.clear
ndps
end

def update(increment, attributes, data_points)
data_points[attributes] = NumberDataPoint.new(
def update(increment, attributes)
@data_points[attributes] = NumberDataPoint.new(
attributes,
nil,
nil,
Expand Down
14 changes: 8 additions & 6 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@ class Sum
def initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :cumulative), monotonic: false, instrument_kind: nil)
@aggregation_temporality = AggregationTemporality.determine_temporality(aggregation_temporality: aggregation_temporality, instrument_kind: instrument_kind, default: :cumulative)
@monotonic = monotonic
@data_points = {}
end

def collect(start_time, end_time, data_points)
def collect(start_time, end_time)
if @aggregation_temporality.delta?
# Set timestamps and 'move' data point values to result.
ndps = data_points.values.map! do |ndp|
ndps = @data_points.values.map! do |ndp|
ndp.start_time_unix_nano = start_time
ndp.time_unix_nano = end_time
ndp
end
data_points.clear
@data_points.clear
ndps
else
# Update timestamps and take a snapshot.
data_points.values.map! do |ndp|
@data_points.values.map! do |ndp|
ndp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation.
ndp.time_unix_nano = end_time
ndp.dup
Expand All @@ -40,10 +41,11 @@ def monotonic?
@monotonic
end

def update(increment, attributes, data_points)
# no double exporting so when view exist, then we only export the metric_data processed by view
def update(increment, attributes)
return if @monotonic && increment < 0

ndp = data_points[attributes] || data_points[attributes] = NumberDataPoint.new(
ndp = @data_points[attributes] || @data_points[attributes] = NumberDataPoint.new(
attributes,
nil,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ module State
# The AsynchronousMetricStream class provides SDK internal functionality that is not a part of the
# public API. It extends MetricStream to support asynchronous instruments.
class AsynchronousMetricStream < MetricStream
DEFAULT_TIMEOUT = 30

def initialize(
name,
description,
Expand Down Expand Up @@ -48,23 +50,21 @@ def collect(start_time, end_time)
def invoke_callback(timeout, attributes)
if @registered_views.empty?
@mutex.synchronize do
Timeout.timeout(timeout || 30) do
@callback.each do |cb|
value = cb.call
@default_aggregation.update(value, attributes, @data_points)
end
@callback.each do |cb|
value = safe_guard_callback(cb, timeout: timeout)
@default_aggregation.update(value, attributes) if value.is_a?(Numeric)
end
end
else
@registered_views.each do |view|
@mutex.synchronize do
Timeout.timeout(timeout || 30) do
@callback.each do |cb|
value = cb.call
merged_attributes = attributes || {}
merged_attributes.merge!(view.attribute_keys)
view.aggregation.update(value, merged_attributes, @data_points) if view.valid_aggregation?
end
@callback.each do |cb|
value = safe_guard_callback(cb, timeout: timeout)
next unless value.is_a?(Numeric) # ignore if value is not valid number

merged_attributes = attributes || {}
merged_attributes.merge!(view.attribute_keys)
view.aggregation.update(value, merged_attributes) if view.valid_aggregation?
end
end
end
Expand All @@ -74,6 +74,29 @@ def invoke_callback(timeout, attributes)
def now_in_nano
(Time.now.to_r * 1_000_000_000).to_i
end

private

def safe_guard_callback(callback, timeout: DEFAULT_TIMEOUT)
result = nil
thread = Thread.new do
result = callback.call
rescue StandardError => e
OpenTelemetry.logger.error("Error invoking callback: #{e.message}")
result = :error
end

unless thread.join(timeout)
thread.kill
OpenTelemetry.logger.error("Timeout while invoking callback after #{timeout} seconds")
return nil
end

result == :error ? nil : result
rescue StandardError => e
OpenTelemetry.logger.error("Unexpected error in callback execution: #{e.message}")
nil
end
end
end
end
Expand Down
31 changes: 15 additions & 16 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module State
# The MetricStream class provides SDK internal functionality that is not a part of the
# public API.
class MetricStream
attr_reader :name, :description, :unit, :instrument_kind, :instrumentation_scope, :data_points
attr_reader :name, :description, :unit, :instrument_kind, :instrumentation_scope

def initialize(
name,
Expand All @@ -31,7 +31,6 @@ def initialize(
@meter_provider = meter_provider
@instrumentation_scope = instrumentation_scope
@default_aggregation = aggregation
@data_points = {}
@registered_views = []

find_registered_view
Expand All @@ -42,28 +41,30 @@ def collect(start_time, end_time)
@mutex.synchronize do
metric_data = []

# data points are required to export over OTLP
return metric_data if @data_points.empty?

if @registered_views.empty?
metric_data << aggregate_metric_data(start_time, end_time)
else
@registered_views.each { |view| metric_data << aggregate_metric_data(start_time, end_time, aggregation: view.aggregation) }
end

# reject metric_data with empty data_points
# data points are required to export over OTLP
metric_data.reject! { |metric| metric.data_points.empty? }
return [] if metric_data.empty?

metric_data
end
end

def update(value, attributes)
if @registered_views.empty?
@mutex.synchronize { @default_aggregation.update(value, attributes, @data_points) }
@mutex.synchronize { @default_aggregation.update(value, attributes) }
else
@registered_views.each do |view|
@mutex.synchronize do
attributes ||= {}
attributes.merge!(view.attribute_keys)
view.aggregation.update(value, attributes, @data_points) if view.valid_aggregation?
view.aggregation.update(value, attributes) if view.valid_aggregation?
end
end
end
Expand All @@ -81,7 +82,7 @@ def aggregate_metric_data(start_time, end_time, aggregation: nil)
@instrument_kind,
@meter_provider.resource,
@instrumentation_scope,
aggregator.collect(start_time, end_time, @data_points),
aggregator.collect(start_time, end_time),
aggregation_temporality,
start_time,
end_time,
Expand All @@ -96,17 +97,15 @@ def find_registered_view
end

def to_s
instrument_info = +''
instrument_info = []
instrument_info << "name=#{@name}"
instrument_info << " description=#{@description}" if @description
instrument_info << " unit=#{@unit}" if @unit
@data_points.map do |attributes, value|
metric_stream_string = +''
metric_stream_string << instrument_info
metric_stream_string << " attributes=#{attributes}" if attributes
metric_stream_string << " #{value}"
metric_stream_string
end.join("\n")
instrument_info << " instrument_kind=#{@instrument_kind}" if @instrument_kind
instrument_info << " instrumentation_scope=#{@instrumentation_scope.name}@#{@instrumentation_scope.version}" if @instrumentation_scope
instrument_info << " default_aggregation=#{@default_aggregation.class}" if @default_aggregation
instrument_info << " registered_views=#{@registered_views.map { |view| "name=#{view.name}, aggregation=#{view.aggregation.class}" }.join('; ')}" unless @registered_views.empty?
instrument_info.join('.')
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
require 'test_helper'

describe OpenTelemetry::SDK::Metrics::Aggregation::Drop do
let(:data_points) { {} }
let(:drop_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Drop.new }
let(:aggregation_temporality) { :delta }

Expand All @@ -20,20 +19,20 @@
end

it 'sets the timestamps' do
drop_aggregation.update(0, {}, data_points)
ndp = drop_aggregation.collect(start_time, end_time, data_points)[0]
drop_aggregation.update(0, {})
ndp = drop_aggregation.collect(start_time, end_time)[0]
_(ndp.start_time_unix_nano).must_equal(0)
_(ndp.time_unix_nano).must_equal(0)
end

it 'aggregates and collects should collect no value for all collection' do
drop_aggregation.update(1, {}, data_points)
drop_aggregation.update(2, {}, data_points)
drop_aggregation.update(1, {})
drop_aggregation.update(2, {})

drop_aggregation.update(2, { 'foo' => 'bar' }, data_points)
drop_aggregation.update(2, { 'foo' => 'bar' }, data_points)
drop_aggregation.update(2, { 'foo' => 'bar' })
drop_aggregation.update(2, { 'foo' => 'bar' })

ndps = drop_aggregation.collect(start_time, end_time, data_points)
ndps = drop_aggregation.collect(start_time, end_time)

_(ndps.size).must_equal(2)
_(ndps[0].value).must_equal(0)
Expand Down
Loading
Loading