Skip to content

Commit bd01ee9

Browse files
fix: Recover periodic metric readers after forking (#1823)
* feat: hook into Process forking to reset metric readers * fix: recover PeriodicMetricReader in forked processes * tests: guard against errors in process forking Add a little bit of guarding to help surface errors in future should problems cause the suite to fail * feat: remove the before_hook as it is unused * skip forking tests on windows and java platforms --------- Co-authored-by: Kayla Reopelle <[email protected]>
1 parent 676e057 commit bd01ee9

File tree

7 files changed

+176
-1
lines changed

7 files changed

+176
-1
lines changed

metrics_sdk/lib/opentelemetry/sdk/metrics.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ module Metrics
1616
require 'opentelemetry/sdk/metrics/aggregation'
1717
require 'opentelemetry/sdk/metrics/configuration_patch'
1818
require 'opentelemetry/sdk/metrics/export'
19+
require 'opentelemetry/sdk/metrics/fork_hooks'
1920
require 'opentelemetry/sdk/metrics/instrument'
2021
require 'opentelemetry/sdk/metrics/meter'
2122
require 'opentelemetry/sdk/metrics/meter_provider'

metrics_sdk/lib/opentelemetry/sdk/metrics/configuration_patch.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ def initialize
2525
def metrics_configuration_hook
2626
OpenTelemetry.meter_provider = Metrics::MeterProvider.new(resource: @resource)
2727
configure_metric_readers
28+
attach_fork_hooks!
2829
end
2930

3031
def configure_metric_readers
@@ -52,6 +53,10 @@ def wrapped_metric_exporters_from_env
5253
end
5354
end
5455
end
56+
57+
def attach_fork_hooks!
58+
ForkHooks.attach!
59+
end
5560
end
5661
end
5762
end

metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ def force_flush(timeout: nil)
7878
Export::FAILURE
7979
end
8080

81+
def after_fork
82+
@exporter.reset if @exporter.respond_to?(:reset)
83+
collect # move past previously reported metrics from parent process
84+
@thread = nil
85+
start
86+
end
87+
8188
# Check both @thread and @continue object to determine if current
8289
# PeriodicMetricReader is still alive. If one of them is true/alive,
8390
# then PeriodicMetricReader is determined as alive
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright The OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module SDK
9+
module Metrics
10+
# ForkHooks implements methods to run callbacks before and after forking a Process by overriding Process::_fork
11+
# This is used to ensure that the PeriodicMetricReader is restarted after forking
12+
module ForkHooks
13+
def self.attach!
14+
return if @fork_hooks_attached
15+
16+
Process.singleton_class.prepend(ForkHooks)
17+
@fork_hooks_attached = true
18+
end
19+
20+
def self.after_fork
21+
::OpenTelemetry.meter_provider.metric_readers.each do |reader|
22+
reader.after_fork if reader.respond_to?(:after_fork)
23+
end
24+
end
25+
26+
def _fork
27+
parent_pid = Process.pid
28+
super.tap do
29+
ForkHooks.after_fork unless Process.pid == parent_pid
30+
end
31+
end
32+
end
33+
end
34+
end
35+
end

metrics_sdk/test/integration/periodic_metric_reader_test.rb

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
# SPDX-License-Identifier: Apache-2.0
66

77
require 'test_helper'
8+
require 'json'
89

910
describe OpenTelemetry::SDK do
1011
describe '#periodic_metric_reader' do
@@ -82,6 +83,46 @@
8283
_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
8384
end
8485

86+
unless Gem.win_platform? || %w[jruby truffleruby].include?(RUBY_ENGINE) # forking is not available on these platforms or runtimes
87+
it 'is restarted after forking' do
88+
OpenTelemetry::SDK.configure
89+
90+
metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
91+
periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter)
92+
93+
OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader)
94+
95+
read, write = IO.pipe
96+
97+
pid = fork do
98+
meter = OpenTelemetry.meter_provider.meter('test')
99+
counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something')
100+
101+
counter.add(1)
102+
counter.add(2, attributes: { 'a' => 'b' })
103+
counter.add(2, attributes: { 'a' => 'b' })
104+
counter.add(3, attributes: { 'b' => 'c' })
105+
counter.add(4, attributes: { 'd' => 'e' })
106+
107+
sleep(8)
108+
snapshot = metric_exporter.metric_snapshots
109+
110+
json = snapshot.map { |reading| { name: reading.name } }.to_json
111+
write.puts json
112+
end
113+
114+
Timeout.timeout(10) do
115+
Process.waitpid(pid)
116+
end
117+
118+
periodic_metric_reader.shutdown
119+
snapshot = JSON.parse(read.gets.chomp)
120+
_(snapshot.size).must_equal(1)
121+
_(snapshot[0]).must_equal('name' => 'counter')
122+
_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
123+
end
124+
end
125+
85126
it 'shutdown break the export interval cycle' do
86127
OpenTelemetry::SDK.configure
87128

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright The OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
return if Gem.win_platform? || %w[jruby truffleruby].include?(RUBY_ENGINE) # forking is not available on these platforms or runtimes
8+
9+
require 'test_helper'
10+
require 'json'
11+
12+
describe OpenTelemetry::SDK::Metrics::ForkHooks do
13+
def fork_with_fork_hooks(after_fork_lambda)
14+
with_pipe do |inner_read_io, inner_write_io|
15+
child_pid = fork do # fork twice to avoid prepending fork in the parent process
16+
setup_fork_hooks(after_fork_lambda) do
17+
grandchild_pid = fork {}
18+
Timeout.timeout(5) { Process.waitpid(grandchild_pid) }
19+
message = { 'child_pid' => Process.pid, 'grandchild_pid' => grandchild_pid }.to_json
20+
inner_write_io.puts message
21+
rescue StandardError => e
22+
message = { 'error' => e.message }.to_json
23+
inner_write_io.puts message
24+
end
25+
end
26+
Timeout.timeout(10) { Process.waitpid(child_pid) }
27+
received_from_child = JSON.parse(inner_read_io.gets.chomp)
28+
refute_includes(received_from_child, 'error')
29+
grandchild_pid = received_from_child['grandchild_pid']
30+
refute_equal(child_pid, Process.pid)
31+
refute_equal(child_pid, grandchild_pid)
32+
[child_pid, grandchild_pid]
33+
end
34+
end
35+
36+
def setup_fork_hooks(after_hook)
37+
OpenTelemetry::SDK::Metrics::ForkHooks.stub(:after_fork, after_hook) do
38+
Process.singleton_class.prepend(OpenTelemetry::SDK::Metrics::ForkHooks)
39+
yield if block_given?
40+
end
41+
end
42+
43+
def with_pipe
44+
read_io, write_io = IO.pipe
45+
yield(read_io, write_io)
46+
ensure
47+
read_io.close unless read_io.closed?
48+
write_io.close unless write_io.closed?
49+
end
50+
51+
it 'runs the after_hook after forking' do
52+
with_pipe do |after_fork_read_io, after_fork_write_io|
53+
after_fork_lambda = proc do
54+
message = { 'after_fork_pid' => Process.pid }.to_json
55+
after_fork_write_io.puts message
56+
end
57+
58+
forking_pid, forked_pid = fork_with_fork_hooks(after_fork_lambda)
59+
pid_from_after_fork = JSON.parse(after_fork_read_io.gets.chomp)['after_fork_pid'].to_i
60+
61+
refute_equal(pid_from_after_fork, Process.pid)
62+
refute_equal(pid_from_after_fork, forking_pid)
63+
assert_equal(forked_pid, pid_from_after_fork)
64+
end
65+
end
66+
67+
it 'calls after_fork on metric readers' do
68+
reader1 = Class.new do
69+
attr_reader :after_fork_called
70+
71+
def after_fork
72+
@after_fork_called = true
73+
end
74+
end.new
75+
76+
reader2 = OpenStruct.new
77+
78+
meter_provider = OpenTelemetry::SDK::Metrics::MeterProvider.new
79+
meter_provider.add_metric_reader(reader1)
80+
meter_provider.add_metric_reader(reader2)
81+
::OpenTelemetry.stub(:meter_provider, meter_provider) do
82+
OpenTelemetry::SDK::Metrics::ForkHooks.after_fork
83+
end
84+
assert(reader1.after_fork_called)
85+
end
86+
end

metrics_sdk/test/test_helper.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def reset_metrics_sdk
2020
:@meter_provider,
2121
OpenTelemetry::Internal::ProxyMeterProvider.new
2222
)
23-
23+
OpenTelemetry::SDK::Metrics::ForkHooks.instance_variable_set(:@fork_hooks_attached, false)
2424
OpenTelemetry.logger = Logger.new(File::NULL)
2525
OpenTelemetry.error_handler = nil
2626
end

0 commit comments

Comments
 (0)