Skip to content

Commit 82124e4

Browse files
benjipelletierfacebook-github-bot
authored andcommitted
Add InMemoryMetrics as scoped metric reader (meta-pytorch#817)
Summary: Pull Request resolved: meta-pytorch#817 We likely won't use InMemoryReader as part of our monarch init directly. Instead create a raii wrapper to use it to collect metrics of a specific workload. A consequence of this is that it does remove the other metric providers for that run Reviewed By: eliothedeman Differential Revision: D80021761 fbshipit-source-id: 46fa80176c7e2026fd37839b60973cf05cc7e293
1 parent 46bee46 commit 82124e4

File tree

1 file changed

+179
-84
lines changed

1 file changed

+179
-84
lines changed

hyperactor_telemetry/src/in_memory_reader.rs

Lines changed: 179 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
*/
88

99
use std::collections::HashMap;
10+
use std::sync::Arc;
1011
use std::sync::Weak;
1112

1213
use opentelemetry_sdk::Resource;
@@ -15,60 +16,31 @@ use opentelemetry_sdk::metrics::InstrumentKind;
1516
use opentelemetry_sdk::metrics::ManualReader;
1617
use opentelemetry_sdk::metrics::MetricResult;
1718
use opentelemetry_sdk::metrics::Pipeline;
19+
use opentelemetry_sdk::metrics::SdkMeterProvider;
1820
use opentelemetry_sdk::metrics::Temporality;
1921
use opentelemetry_sdk::metrics::data::ResourceMetrics;
2022
use opentelemetry_sdk::metrics::data::Sum;
2123
use opentelemetry_sdk::metrics::reader::MetricReader;
2224

23-
// Global ManualReader instance for easy access with cumulative temporality
24-
static IN_MEMORY_MANUAL_READER: std::sync::LazyLock<ManualReader> =
25-
std::sync::LazyLock::new(|| {
26-
ManualReader::builder()
27-
.with_temporality(Temporality::Cumulative)
28-
.build()
29-
});
30-
31-
/// InMemoryReader that wraps the global ManualReader and implements MetricReader
32-
#[derive(Debug)]
33-
pub struct InMemoryReader;
34-
35-
impl InMemoryReader {
36-
pub fn new() -> Self {
37-
Self
38-
}
25+
// InMemoryReader that uses a shared ManualReader and implements MetricReader
26+
#[derive(Debug, Clone)]
27+
pub struct InMemoryReader {
28+
manual_reader: Arc<ManualReader>,
3929
}
4030

41-
impl MetricReader for InMemoryReader {
42-
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
43-
IN_MEMORY_MANUAL_READER.register_pipeline(pipeline);
44-
}
45-
46-
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
47-
IN_MEMORY_MANUAL_READER.collect(rm)
48-
}
49-
50-
fn force_flush(&self) -> OTelSdkResult {
51-
IN_MEMORY_MANUAL_READER.force_flush()
52-
}
53-
54-
fn shutdown(&self) -> OTelSdkResult {
55-
IN_MEMORY_MANUAL_READER.shutdown()
56-
}
57-
58-
fn temporality(&self, kind: InstrumentKind) -> Temporality {
59-
IN_MEMORY_MANUAL_READER.temporality(kind)
31+
impl InMemoryReader {
32+
// Create a new InMemoryReader with a specific ManualReader
33+
pub fn new(manual_reader: Arc<ManualReader>) -> Self {
34+
Self { manual_reader }
6035
}
61-
}
6236

63-
// Public API for In Memory Metrics
64-
impl InMemoryReader {
65-
/// Get all counters from the global ManualReader
37+
// Get all counters from the shared ManualReader
6638
pub fn get_all_counters(&self) -> HashMap<String, i64> {
6739
let mut rm = ResourceMetrics {
6840
resource: Resource::builder_empty().build(),
6941
scope_metrics: Vec::new(),
7042
};
71-
let _ = IN_MEMORY_MANUAL_READER.collect(&mut rm);
43+
let _ = self.manual_reader.collect(&mut rm);
7244

7345
// Extract counters directly from the collected metrics
7446
let mut counters = HashMap::new();
@@ -93,65 +65,188 @@ impl InMemoryReader {
9365
}
9466
}
9567

96-
#[cfg(test)]
97-
mod tests {
98-
use opentelemetry_sdk::metrics::SdkMeterProvider;
68+
impl MetricReader for InMemoryReader {
69+
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
70+
self.manual_reader.register_pipeline(pipeline);
71+
}
9972

100-
use super::*;
73+
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
74+
self.manual_reader.collect(rm)
75+
}
10176

102-
#[test]
103-
fn test_get_all_counters() {
77+
fn force_flush(&self) -> OTelSdkResult {
78+
self.manual_reader.force_flush()
79+
}
80+
81+
fn shutdown(&self) -> OTelSdkResult {
82+
self.manual_reader.shutdown()
83+
}
84+
85+
fn temporality(&self, kind: InstrumentKind) -> Temporality {
86+
self.manual_reader.temporality(kind)
87+
}
88+
}
89+
90+
// RAII guard for in-memory metrics collection during testing
91+
//
92+
// Usage:
93+
// let _guard = InMemoryMetrics::new();
94+
//
95+
// // Your code that emits metrics
96+
// my_counter.add(42, &[]);
97+
//
98+
// // Check accumulated metrics
99+
// let counters = _guard.get_counters();
100+
// assert_eq!(counters.get("my_counter"), Some(&42));
101+
pub struct InMemoryMetrics {
102+
in_memory_reader: InMemoryReader,
103+
_provider: SdkMeterProvider,
104+
}
105+
106+
impl InMemoryMetrics {
107+
// Create a new InMemoryMetrics
108+
//
109+
// This will:
110+
// 1. Create a ManualReader as shared state
111+
// 2. Create an InMemoryReader that uses the shared ManualReader
112+
// 3. Create a new SdkMeterProvider with the InMemoryReader
113+
// 4. Set it as the global meter provider
114+
//
115+
// When the guard is dropped, the provider will be shut down.
116+
pub fn new() -> Self {
117+
// Create the manual reader with cumulative temporality - this state
118+
// will only exists for the lifetime of the guard
119+
let manual_reader = Arc::new(
120+
ManualReader::builder()
121+
.with_temporality(Temporality::Cumulative)
122+
.build(),
123+
);
124+
125+
// Create the in-memory reader using the shared manual reader
126+
let in_memory_reader = InMemoryReader::new(Arc::clone(&manual_reader));
127+
128+
// Create a new provider with the in-memory reader
104129
let provider = SdkMeterProvider::builder()
105-
.with_reader(InMemoryReader::new())
130+
.with_reader(in_memory_reader)
106131
.build();
107132

108-
opentelemetry::global::set_meter_provider(provider);
133+
// Set as global provider
134+
opentelemetry::global::set_meter_provider(provider.clone());
109135

110-
// Create static counters using the macro
111-
crate::declare_static_counter!(TEST_COUNTER_1, "test_counter_1");
112-
crate::declare_static_counter!(TEST_COUNTER_2, "test_counter_2");
136+
Self {
137+
in_memory_reader: InMemoryReader::new(Arc::clone(&manual_reader)),
138+
_provider: provider,
139+
}
140+
}
113141

114-
// Bump the counters with different values
115-
TEST_COUNTER_1.add(10, &[]);
116-
TEST_COUNTER_2.add(25, &[]);
117-
TEST_COUNTER_1.add(5, &[]); // Add more to the first counter (total should be 15)
142+
// Get all counters accumulated since this guard was created
143+
pub fn get_counters(&self) -> HashMap<String, i64> {
144+
self.in_memory_reader.get_all_counters()
145+
}
118146

119-
// Get all counters and verify values
120-
let counters = InMemoryReader::new().get_all_counters();
147+
// Get the value of a specific counter by name
148+
pub fn get_counter(&self, name: &str) -> Option<i64> {
149+
self.get_counters().get(name).copied()
150+
}
121151

122-
// The counters should contain our test counters
123-
println!("All counters: {:?}", counters);
152+
// Get a reference to the InMemoryReader for advanced usage
153+
pub fn reader(&self) -> &InMemoryReader {
154+
&self.in_memory_reader
155+
}
156+
}
124157

125-
// Assert that we have counters
126-
assert!(!counters.is_empty(), "Should have some counters");
158+
impl Drop for InMemoryMetrics {
159+
fn drop(&mut self) {
160+
// Shutdown our provider
161+
let _ = self._provider.shutdown();
127162

128-
// Assert specific counter values
129-
// TEST_COUNTER_1 should have 15 (10 + 5)
130-
// TEST_COUNTER_2 should have 25
131-
assert_eq!(
132-
counters.get("test_counter_1"),
133-
Some(&15),
134-
"TEST_COUNTER_1 should be 15"
135-
);
136-
assert_eq!(
137-
counters.get("test_counter_2"),
138-
Some(&25),
139-
"TEST_COUNTER_2 should be 25"
140-
);
163+
// Reset to a no-op provider to prevent metrics from continuing
164+
// to be collected by our in-memory reader after the guard is dropped
165+
let noop_provider = SdkMeterProvider::builder().build();
166+
opentelemetry::global::set_meter_provider(noop_provider);
141167
}
168+
}
169+
170+
#[cfg(test)]
171+
mod tests {
172+
use super::*;
142173

143174
#[test]
144-
fn test_get_all_counters_empty() {
145-
// Get counters when none have been created
146-
let counters = InMemoryReader::new().get_all_counters();
175+
fn test_in_memory_metrics_guard() {
176+
// Use the RAII guard
177+
let guard = InMemoryMetrics::new();
147178

148-
// Should be empty
149-
println!("Empty counters: {:?}", counters);
179+
// Create and use counters
180+
crate::declare_static_counter!(GUARD_TEST_COUNTER, "guard_test_counter");
181+
GUARD_TEST_COUNTER.add(42, &[]);
150182

151-
// This test ensures the function doesn't panic when no counters exist
152-
assert!(
153-
counters.is_empty(),
154-
"Should be empty when no counters created"
155-
);
183+
// Check that we can read the counter value
184+
let counters = guard.get_counters();
185+
assert_eq!(counters.get("guard_test_counter"), Some(&42));
186+
187+
// Test the convenience method
188+
assert_eq!(guard.get_counter("guard_test_counter"), Some(42));
189+
assert_eq!(guard.get_counter("nonexistent_counter"), None);
190+
191+
// Guard will be dropped here, cleaning up automatically
192+
}
193+
194+
#[test]
195+
fn test_multiple_guards_sequential() {
196+
// Test that multiple guards work correctly when used sequentially
197+
{
198+
let guard1 = InMemoryMetrics::new();
199+
crate::declare_static_counter!(COUNTER_1, "counter_1");
200+
COUNTER_1.add(10, &[]);
201+
assert_eq!(guard1.get_counter("counter_1"), Some(10));
202+
} // guard1 dropped here
203+
204+
{
205+
let guard2 = InMemoryMetrics::new();
206+
crate::declare_static_counter!(COUNTER_2, "counter_2");
207+
COUNTER_2.add(20, &[]);
208+
assert_eq!(guard2.get_counter("counter_2"), Some(20));
209+
// counter_1 should not be visible in guard2 since it's a new provider
210+
assert_eq!(guard2.get_counter("counter_1"), None);
211+
} // guard2 dropped here
212+
}
213+
214+
#[test]
215+
fn test_counter_accumulation() {
216+
let guard = InMemoryMetrics::new();
217+
218+
crate::declare_static_counter!(ACCUMULATING_COUNTER, "accumulating_counter");
219+
220+
// Add values multiple times
221+
ACCUMULATING_COUNTER.add(1, &[]);
222+
assert_eq!(guard.get_counter("accumulating_counter"), Some(1));
223+
224+
ACCUMULATING_COUNTER.add(2, &[]);
225+
assert_eq!(guard.get_counter("accumulating_counter"), Some(3));
226+
227+
ACCUMULATING_COUNTER.add(7, &[]);
228+
assert_eq!(guard.get_counter("accumulating_counter"), Some(10));
229+
}
230+
231+
#[test]
232+
fn test_guard_isolation() {
233+
// Test that each guard creates its own isolated ManualReader
234+
let _guard1 = InMemoryMetrics::new();
235+
let _guard2 = InMemoryMetrics::new();
236+
237+
// Create counters in each guard's context
238+
{
239+
// Switch to guard1's provider
240+
let _temp_guard1 = InMemoryMetrics::new(); // This sets guard1's provider as global
241+
crate::declare_static_counter!(ISOLATED_COUNTER_1, "isolated_counter_1");
242+
ISOLATED_COUNTER_1.add(100, &[]);
243+
}
244+
245+
{
246+
// Switch to guard2's provider
247+
let _temp_guard2 = InMemoryMetrics::new(); // This sets guard2's provider as global
248+
crate::declare_static_counter!(ISOLATED_COUNTER_2, "isolated_counter_2");
249+
ISOLATED_COUNTER_2.add(200, &[]);
250+
}
156251
}
157252
}

0 commit comments

Comments
 (0)