Skip to content

Commit c39e7a6

Browse files
fix: use condition signal to replace sleep and remove timeout.timeout… (#1826)
* fix: use condition signal to replace sleep and remove timeout.timeout (use exporter timeout function - http.timeout) * lint * add doc --------- Co-authored-by: Kayla Reopelle <[email protected]>
1 parent e590548 commit c39e7a6

File tree

6 files changed

+63
-14
lines changed

6 files changed

+63
-14
lines changed

metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,22 @@ def initialize(export_interval_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_INTER
3333
@thread = nil
3434
@continue = false
3535
@mutex = Mutex.new
36+
@condition = ConditionVariable.new
3637
@export_mutex = Mutex.new
3738

3839
start
3940
end
4041

42+
# Shuts the @thread down and set @continue to false; it will block
43+
# until the shutdown thread is finished.
44+
#
45+
# @param [optional Numeric] timeout An optional timeout in seconds.
46+
# @return [Integer] SUCCESS if no error occurred, FAILURE if a
47+
# non-specific failure occurred.
4148
def shutdown(timeout: nil)
4249
thread = lock do
4350
@continue = false # force termination in next iteration
51+
@condition.signal
4452
@thread
4553
end
4654
thread&.join(@export_interval)
@@ -52,15 +60,35 @@ def shutdown(timeout: nil)
5260
Export::FAILURE
5361
end
5462

63+
# Export all metrics to the configured `Exporter` that have not yet
64+
# been exported.
65+
#
66+
# This method should only be called in cases where it is absolutely
67+
# necessary, such as when using some FaaS providers that may suspend
68+
# the process after an invocation, but before the `PeriodicMetricReader` exports
69+
# the completed metrics.
70+
#
71+
# @param [optional Numeric] timeout An optional timeout in seconds.
72+
# @return [Integer] SUCCESS if no error occurred, FAILURE if a
73+
# non-specific failure occurred.
5574
def force_flush(timeout: nil)
56-
export(timeout: timeout)
75+
export(timeout:)
5776
Export::SUCCESS
5877
rescue StandardError
5978
Export::FAILURE
6079
end
6180

81+
# Check both @thread and @continue object to determine if current
82+
# PeriodicMetricReader is still alive. If one of them is true/alive,
83+
# then PeriodicMetricReader is determined as alive
84+
def alive?
85+
@continue || @thread.alive?
86+
end
87+
6288
private
6389

90+
# Start a thread that continously export metrics within fixed duration.
91+
# The wait mechanism is using to check @mutex lock with conditional variable
6492
def start
6593
@continue = true
6694
if @exporter.nil?
@@ -70,19 +98,21 @@ def start
7098
else
7199
@thread = Thread.new do
72100
while @continue
73-
sleep(@export_interval)
74-
begin
75-
Timeout.timeout(@export_timeout) do
76-
export(timeout: @export_timeout)
77-
end
78-
rescue Timeout::Error => e
79-
OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.')
101+
lock do
102+
@condition.wait(@mutex, @export_interval)
103+
export(timeout: @export_timeout)
80104
end
81105
end
82106
end
83107
end
84108
end
85109

110+
# Helper function for the defined exporter to export metrics.
111+
# It only exports if the collected metrics are not an empty array (collect returns an Array).
112+
#
113+
# @param [optional Numeric] timeout An optional timeout in seconds.
114+
# @return [Integer] SUCCESS if no error occurred, FAILURE if a
115+
# non-specific failure occurred
86116
def export(timeout: nil)
87117
@export_mutex.synchronize do
88118
collected_metrics = collect

metrics_sdk/test/integration/periodic_metric_reader_test.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,24 @@
8181
_(snapshot.size).must_equal(1)
8282
_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
8383
end
84+
85+
it 'shutdown break the export interval cycle' do
86+
OpenTelemetry::SDK.configure
87+
88+
metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
89+
periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 1_000_000, export_timeout_millis: 10_000, exporter: metric_exporter)
90+
91+
OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader)
92+
93+
_(periodic_metric_reader.alive?).must_equal true
94+
95+
sleep 5 # make sure the work thread start
96+
97+
Timeout.timeout(2) do # Fail if this block takes more than 2 seconds
98+
periodic_metric_reader.shutdown
99+
end
100+
101+
_(periodic_metric_reader.alive?).must_equal false
102+
end
84103
end
85104
end

metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/drop_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
describe OpenTelemetry::SDK::Metrics::Aggregation::LastValue do
1010
let(:data_points) { {} }
11-
let(:drop_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Drop.new(aggregation_temporality: aggregation_temporality) }
11+
let(:drop_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Drop.new(aggregation_temporality:) }
1212
let(:aggregation_temporality) { :delta }
1313

1414
# Time in nano

metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram_test.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
let(:data_points) { {} }
1111
let(:ebh) do
1212
OpenTelemetry::SDK::Metrics::Aggregation::ExplicitBucketHistogram.new(
13-
aggregation_temporality: aggregation_temporality,
14-
boundaries: boundaries,
15-
record_min_max: record_min_max
13+
aggregation_temporality:,
14+
boundaries:,
15+
record_min_max:
1616
)
1717
end
1818
let(:boundaries) { [0, 5, 10, 25, 50, 75, 100, 250, 500, 1000] }

metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/last_value_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
describe OpenTelemetry::SDK::Metrics::Aggregation::LastValue do
1010
let(:data_points) { {} }
11-
let(:last_value_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new(aggregation_temporality: aggregation_temporality) }
11+
let(:last_value_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new(aggregation_temporality:) }
1212
let(:aggregation_temporality) { :delta }
1313

1414
# Time in nano

metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/sum_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
describe OpenTelemetry::SDK::Metrics::Aggregation::Sum do
1010
let(:data_points) { {} }
11-
let(:sum_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Sum.new(aggregation_temporality: aggregation_temporality, monotonic: monotonic) }
11+
let(:sum_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Sum.new(aggregation_temporality:, monotonic:) }
1212
let(:aggregation_temporality) { :delta }
1313
let(:monotonic) { false }
1414

0 commit comments

Comments
 (0)