@@ -27,32 +27,20 @@ static RESULT_PATH: &str = "actual/metrics.json";
2727
2828/// Initializes the OpenTelemetry metrics pipeline
2929async fn init_metrics ( ) -> SdkMeterProvider {
30- let exporter_builder = MetricExporter :: builder ( ) ;
31-
32- #[ cfg( feature = "tonic-client" ) ]
33- let exporter_builder = exporter_builder. with_tonic ( ) ;
34- #[ cfg( not( feature = "tonic-client" ) ) ]
35- #[ cfg( any(
36- feature = "hyper-client" ,
37- feature = "reqwest-client" ,
38- feature = "reqwest-blocking-client"
39- ) ) ]
40- let exporter_builder = exporter_builder. with_http ( ) ;
41-
42- let exporter = exporter_builder
43- . build ( )
44- . expect ( "Failed to build MetricExporter" ) ;
30+ let exporter = create_exporter ( ) ;
4531
4632 let reader = PeriodicReader :: builder ( exporter)
4733 . with_interval ( Duration :: from_millis ( 100 ) )
4834 . with_timeout ( Duration :: from_secs ( 1 ) )
4935 . build ( ) ;
5036
37+ let resource = Resource :: new ( vec ! [ KeyValue :: new(
38+ opentelemetry_semantic_conventions:: resource:: SERVICE_NAME ,
39+ "metrics-integration-test" ,
40+ ) ] ) ;
41+
5142 let meter_provider = MeterProviderBuilder :: default ( )
52- . with_resource ( Resource :: new ( vec ! [ KeyValue :: new(
53- opentelemetry_semantic_conventions:: resource:: SERVICE_NAME ,
54- "metrics-integration-test" ,
55- ) ] ) )
43+ . with_resource ( resource)
5644 . with_reader ( reader)
5745 . build ( ) ;
5846
@@ -61,6 +49,28 @@ async fn init_metrics() -> SdkMeterProvider {
6149 meter_provider
6250}
6351
52+ ///
53+ /// Creates an exporter using the appropriate HTTP or gRPC client based on
54+ /// the configured features.
55+ ///
56+ fn create_exporter ( ) -> MetricExporter {
57+ let exporter_builder = MetricExporter :: builder ( ) ;
58+
59+ #[ cfg( feature = "tonic-client" ) ]
60+ let exporter_builder = exporter_builder. with_tonic ( ) ;
61+ #[ cfg( not( feature = "tonic-client" ) ) ]
62+ #[ cfg( any(
63+ feature = "hyper-client" ,
64+ feature = "reqwest-client" ,
65+ feature = "reqwest-blocking-client"
66+ ) ) ]
67+ let exporter_builder = exporter_builder. with_http ( ) ;
68+
69+ exporter_builder
70+ . build ( )
71+ . expect ( "Failed to build MetricExporter" )
72+ }
73+
6474///
6575/// Retrieves the latest metrics for the given scope. Each test should use
6676/// its own scope, so that we can easily pull the data for it out from the rest
@@ -72,55 +82,71 @@ pub fn fetch_latest_metrics_for_scope(scope_name: &str) -> Result<Value> {
7282 // Open the file and fetch the contents
7383 let contents = fs:: read_to_string ( test_utils:: METRICS_FILE ) ?;
7484
75- // Find the last complete metrics line. Work backwards until one parses.
85+ // Find the last parseable metrics line that contains the desired scope
7686 let json_line = contents
7787 . lines ( )
7888 . rev ( )
79- . find_map ( |line| serde_json:: from_str :: < Value > ( line) . ok ( ) )
80- . with_context ( || "No valid JSON line found in the metrics file." ) ?;
81-
82- // Parse the JSON and filter metrics strictly by the scope name
83- let mut filtered_json = json_line;
84- if let Some ( resource_metrics) = filtered_json
85- . get_mut ( "resourceMetrics" )
86- . and_then ( |v| v. as_array_mut ( ) )
87- {
88- resource_metrics. retain_mut ( |resource| {
89- if let Some ( scope_metrics) = resource
90- . get_mut ( "scopeMetrics" )
91- . and_then ( |v| v. as_array_mut ( ) )
92- {
93- // Retain only `ScopeMetrics` that match the specified scope_name
94- scope_metrics. retain ( |scope| {
95- scope
96- . get ( "scope" )
97- . and_then ( |s| s. get ( "name" ) )
98- . and_then ( |name| name. as_str ( ) )
99- . map_or ( false , |n| n == scope_name)
100- } ) ;
101-
102- // Keep the resource only if it has any matching `ScopeMetrics`
103- !scope_metrics. is_empty ( )
104- } else {
105- false
106- }
107- } ) ;
108- }
109-
110- Ok ( filtered_json)
89+ . find_map ( |line| {
90+ // Attempt to parse the line as JSON
91+ serde_json:: from_str :: < Value > ( line)
92+ . ok ( )
93+ . and_then ( |mut json_line| {
94+ // Check if it contains the specified scope
95+ if let Some ( resource_metrics) = json_line
96+ . get_mut ( "resourceMetrics" )
97+ . and_then ( |v| v. as_array_mut ( ) )
98+ {
99+ resource_metrics. retain_mut ( |resource| {
100+ if let Some ( scope_metrics) = resource
101+ . get_mut ( "scopeMetrics" )
102+ . and_then ( |v| v. as_array_mut ( ) )
103+ {
104+ scope_metrics. retain ( |scope| {
105+ scope
106+ . get ( "scope" )
107+ . and_then ( |s| s. get ( "name" ) )
108+ . and_then ( |name| name. as_str ( ) )
109+ . map_or ( false , |n| n == scope_name)
110+ } ) ;
111+
112+ // Keep the resource only if it has any matching `ScopeMetrics`
113+ !scope_metrics. is_empty ( )
114+ } else {
115+ false
116+ }
117+ } ) ;
118+
119+ // If any resource metrics remain, return this line
120+ if !resource_metrics. is_empty ( ) {
121+ return Some ( json_line) ;
122+ }
123+ }
124+
125+ None
126+ } )
127+ } )
128+ . with_context ( || {
129+ format ! (
130+ "No valid JSON line containing scope `{}` found." ,
131+ scope_name
132+ )
133+ } ) ?;
134+
135+ Ok ( json_line)
111136}
112137
113- /// Performs setup for metrics tests, including environment setup and data seeding.
114- /// This only needs to be done once for the whole test suite
138+ ///
139+ /// Performs setup for metrics tests
140+ ///
115141async fn setup_metrics_test ( ) {
142+ // Make sure the collector container is running
143+ start_collector_container ( ) . await ;
144+
116145 let mut done = SETUP_DONE . lock ( ) . unwrap ( ) ;
117146 if !* done {
118147 println ! ( "Running setup before any tests..." ) ;
119148 * done = true ; // Mark setup as done
120149
121- // Make sure the collector container is running
122- start_collector_container ( ) . await ;
123-
124150 // Initialise the metrics subsystem
125151 _ = init_metrics ( ) . await ;
126152 }
@@ -158,7 +184,9 @@ pub fn validate_metrics_against_results(scope_name: &str) -> Result<()> {
158184
159185#[ cfg( test) ]
160186mod tests {
187+
161188 use super :: * ;
189+ use opentelemetry:: metrics:: MeterProvider ;
162190
163191 ///
164192 /// JSON doesn't roundtrip through the MetricsData models properly.
@@ -239,6 +267,47 @@ mod tests {
239267
240268 Ok ( ( ) )
241269 }
270+
271+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 4 ) ]
272+ async fn test_flush_on_shutdown ( ) -> Result < ( ) > {
273+ const METER_NAME : & str = "test_flush_on_shutdown" ;
274+
275+ // Set everything up by hand, so that we can shutdown() the exporter
276+ // and make sure our data is flushed through.
277+
278+ // Make sure the collector is running
279+ start_collector_container ( ) . await ;
280+
281+ // Set up the exporter
282+ let exporter = create_exporter ( ) ;
283+ let reader = PeriodicReader :: builder ( exporter)
284+ . with_interval ( Duration :: from_millis ( 100 ) )
285+ . with_timeout ( Duration :: from_secs ( 1 ) )
286+ . build ( ) ;
287+ let resource = Resource :: new ( vec ! [ KeyValue :: new(
288+ opentelemetry_semantic_conventions:: resource:: SERVICE_NAME ,
289+ "metrics-integration-test" ,
290+ ) ] ) ;
291+ let meter_provider = MeterProviderBuilder :: default ( )
292+ . with_resource ( resource)
293+ . with_reader ( reader)
294+ . build ( ) ;
295+
296+ // Send something
297+ let meter = meter_provider. meter ( METER_NAME ) ;
298+ let counter = meter. u64_counter ( "counter_" ) . build ( ) ;
299+ counter. add ( 123 , & [ ] ) ;
300+
301+ // Shutdown
302+ meter_provider. shutdown ( ) ?;
303+
304+ // We still need to sleep, to give otel-collector a chance to flush to disk
305+ tokio:: time:: sleep ( Duration :: from_secs ( 2 ) ) . await ;
306+
307+ validate_metrics_against_results ( METER_NAME ) ?;
308+
309+ Ok ( ( ) )
310+ }
242311}
243312
244313///
@@ -247,5 +316,6 @@ mod tests {
247316///
248317#[ dtor]
249318fn shutdown ( ) {
319+ println ! ( "metrics::shutdown" ) ;
250320 test_utils:: stop_collector_container ( ) ;
251321}
0 commit comments