55//!
66#![ cfg( unix) ]
77
8- use anyhow:: { Context , Result } ;
8+ use anyhow:: { Context , Ok , Result } ;
99use ctor:: dtor;
1010use integration_test_runner:: metrics_asserter:: { read_metrics_from_json, MetricsAsserter } ;
1111use integration_test_runner:: test_utils;
12- use integration_test_runner:: test_utils:: start_collector_container;
1312use opentelemetry:: KeyValue ;
1413use opentelemetry_otlp:: MetricExporter ;
1514use opentelemetry_proto:: tonic:: metrics:: v1:: MetricsData ;
1615use opentelemetry_sdk:: metrics:: { MeterProviderBuilder , PeriodicReader , SdkMeterProvider } ;
1716use opentelemetry_sdk:: Resource ;
1817use serde_json:: Value ;
19- use std:: error:: Error ;
2018use std:: fs;
2119use std:: fs:: File ;
20+ use std:: io:: Read ;
2221use std:: time:: Duration ;
2322
2423const SLEEP_DURATION : Duration = Duration :: from_secs ( 5 ) ;
2524
2625static RESULT_PATH : & str = "actual/metrics.json" ;
2726
28- /// Initializes the OpenTelemetry metrics pipeline
29- fn init_metrics ( ) -> SdkMeterProvider {
30- let exporter = create_exporter ( ) ;
31-
32- let reader = PeriodicReader :: builder ( exporter) . build ( ) ;
33-
34- let resource = Resource :: builder_empty ( )
35- . with_service_name ( "metrics-integration-test" )
36- . build ( ) ;
37-
38- let meter_provider = MeterProviderBuilder :: default ( )
39- . with_resource ( resource)
40- . with_reader ( reader)
41- . build ( ) ;
42-
43- opentelemetry:: global:: set_meter_provider ( meter_provider. clone ( ) ) ;
44-
45- meter_provider
46- }
47-
4827///
4928/// Creates an exporter using the appropriate HTTP or gRPC client based on
5029/// the configured features.
@@ -67,6 +46,23 @@ fn create_exporter() -> MetricExporter {
6746 . expect ( "Failed to build MetricExporter" )
6847}
6948
49+ /// Initializes the OpenTelemetry metrics pipeline
50+ fn init_meter_provider ( ) -> SdkMeterProvider {
51+ let exporter = create_exporter ( ) ;
52+ let reader = PeriodicReader :: builder ( exporter)
53+ . with_interval ( Duration :: from_secs ( 2 ) )
54+ . build ( ) ;
55+ let resource = Resource :: builder_empty ( )
56+ . with_service_name ( "metrics-integration-test" )
57+ . build ( ) ;
58+ let meter_provider = MeterProviderBuilder :: default ( )
59+ . with_resource ( resource)
60+ . with_reader ( reader)
61+ . build ( ) ;
62+ opentelemetry:: global:: set_meter_provider ( meter_provider. clone ( ) ) ;
63+ meter_provider
64+ }
65+
7066///
7167/// Retrieves the latest metrics for the given scope. Each test should use
7268/// its own scope, so that we can easily pull the data for it out from the rest
@@ -132,17 +128,40 @@ pub fn fetch_latest_metrics_for_scope(scope_name: &str) -> Result<Value> {
132128}
133129
134130///
135- /// Performs setup for metrics tests
131+ /// Performs setup for metrics tests using the Tokio runtime.
136132///
137- async fn setup_metrics_test ( ) -> Result < SdkMeterProvider , Box < dyn Error > > {
138- // Make sure the collector container is running
139- start_collector_container ( ) . await ?;
140-
133+ async fn setup_metrics_tokio ( ) -> SdkMeterProvider {
134+ let _ = test_utils:: start_collector_container ( ) . await ;
141135 // Truncate results
142136 _ = File :: create ( RESULT_PATH ) . expect ( "it's good" ) ;
143137
144- let meter_provider = init_metrics ( ) ;
145- Ok ( meter_provider)
138+ init_meter_provider ( )
139+ }
140+
141+ ///
142+ /// Performs setup for metrics tests.
143+ ///
144+ fn setup_metrics_non_tokio (
145+ initialize_metric_in_tokio : bool ,
146+ ) -> ( SdkMeterProvider , tokio:: runtime:: Runtime ) {
147+ let rt = tokio:: runtime:: Runtime :: new ( ) . expect ( "Failed to create Tokio runtime" ) ;
148+ let meter_provider: SdkMeterProvider = if initialize_metric_in_tokio {
149+ // Initialize the logger provider inside the Tokio runtime
150+ rt. block_on ( async {
151+ // Setup the collector container inside Tokio runtime
152+ let _ = test_utils:: start_collector_container ( ) . await ;
153+ init_meter_provider ( )
154+ } )
155+ } else {
156+ rt. block_on ( async {
157+ let _ = test_utils:: start_collector_container ( ) . await ;
158+ } ) ;
159+
160+ // Initialize the logger provider outside the Tokio runtime
161+ init_meter_provider ( )
162+ } ;
163+
164+ ( meter_provider, rt)
146165}
147166
148167///
@@ -172,6 +191,19 @@ pub fn validate_metrics_against_results(scope_name: &str) -> Result<()> {
172191 Ok ( ( ) )
173192}
174193
194+ ///
195+ /// Check that the results contain the given string.
196+ ///
197+ pub fn assert_metrics_results_contains ( expected_content : & str ) -> Result < ( ) > {
198+ // let contents = fs::read_to_string(test_utils::METRICS_FILE)?;
199+ let file = File :: open ( test_utils:: METRICS_FILE ) ?;
200+ let mut contents = String :: new ( ) ;
201+ let mut reader = std:: io:: BufReader :: new ( & file) ;
202+ reader. read_to_string ( & mut contents) ?;
203+ assert ! ( contents. contains( expected_content) ) ;
204+ Ok ( ( ) )
205+ }
206+
175207///
176208/// TODO - the HTTP metrics exporters except reqwest-blocking-client do not seem
177209/// to work at the moment.
@@ -180,9 +212,8 @@ pub fn validate_metrics_against_results(scope_name: &str) -> Result<()> {
180212#[ cfg( test) ]
181213#[ cfg( any( feature = "tonic-client" , feature = "reqwest-blocking-client" ) ) ]
182214mod metrictests {
183-
184215 use super :: * ;
185- use opentelemetry :: metrics :: MeterProvider ;
216+ use uuid :: Uuid ;
186217
187218 ///
188219 /// Validate JSON/Protobuf models roundtrip correctly.
@@ -210,156 +241,81 @@ mod metrictests {
210241
211242 #[ tokio:: test( flavor = "multi_thread" , worker_threads = 4 ) ]
212243 async fn counter_tokio_multi_thread ( ) -> Result < ( ) > {
213- metric_helper ( ) . await
244+ metric_helper_tokio ( ) . await
214245 }
215246
216247 #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
217248 async fn counter_tokio_multi_thread_one_worker ( ) -> Result < ( ) > {
218- metric_helper ( ) . await
249+ metric_helper_tokio ( ) . await
219250 }
220251
221252 #[ tokio:: test( flavor = "current_thread" ) ]
222253 #[ ignore] // TODO: Investigate why this test is failing
223254 async fn counter_tokio_current ( ) -> Result < ( ) > {
224- metric_helper ( ) . await
255+ metric_helper_tokio ( ) . await
256+ }
257+
258+ #[ test]
259+ fn counter_non_tokio ( ) -> Result < ( ) > {
260+ metric_helper_non_tokio ( )
261+ }
262+
263+ async fn metric_helper_tokio ( ) -> Result < ( ) > {
264+ let meter_provider = setup_metrics_tokio ( ) . await ;
265+ emit_and_validate_metrics ( meter_provider)
225266 }
226267
227- async fn metric_helper ( ) -> Result < ( ) > {
228- let meter_provider = setup_metrics_test ( )
229- . await
230- . expect ( "MeterProvider must be available for test to run" ) ;
268+ fn metric_helper_non_tokio ( ) -> Result < ( ) > {
269+ let ( meter_provider, _rt) = setup_metrics_non_tokio ( true ) ;
231270 const METER_NAME : & str = "test_meter" ;
271+ const INSTRUMENT_NAME : & str = "test_counter" ;
232272
233273 // Add data to u64_counter
234274 let meter = opentelemetry:: global:: meter_provider ( ) . meter ( METER_NAME ) ;
235-
236- let counter = meter. u64_counter ( "test_counter" ) . build ( ) ;
275+ let expected_uuid = Uuid :: new_v4 ( ) . to_string ( ) ;
276+ let counter = meter. u64_counter ( INSTRUMENT_NAME ) . build ( ) ;
237277 counter. add (
238278 10 ,
239279 & [
240- KeyValue :: new ( "mykey1" , "myvalue1" ) ,
280+ KeyValue :: new ( "mykey1" , expected_uuid . clone ( ) ) ,
241281 KeyValue :: new ( "mykey2" , "myvalue2" ) ,
242282 ] ,
243283 ) ;
244284
245285 meter_provider. shutdown ( ) ?;
246286 // We still need to sleep, to give otel-collector a chance to flush to disk
247- tokio:: time:: sleep ( SLEEP_DURATION ) . await ;
248-
249- // Validate metrics against results file
250- validate_metrics_against_results ( METER_NAME ) ?;
287+ std:: thread:: sleep ( SLEEP_DURATION ) ;
251288
252- Ok ( ( ) )
289+ // Validate metrics against results file This is not the extensive
290+ // validation of output, but good enough to confirm that metrics have
291+ // been accepted by OTel Collector.
292+ assert_metrics_results_contains ( & expected_uuid)
253293 }
254294
255- #[ tokio:: test( flavor = "multi_thread" , worker_threads = 4 ) ]
256- async fn test_u64_counter ( ) -> Result < ( ) > {
257- let meter_provider = setup_metrics_test ( )
258- . await
259- . expect ( "MeterProvider must be available for test to run" ) ;
260- const METER_NAME : & str = "test_u64_counter_meter" ;
295+ fn emit_and_validate_metrics ( meter_provider : SdkMeterProvider ) -> Result < ( ) > {
296+ const METER_NAME : & str = "test_meter" ;
297+ const INSTRUMENT_NAME : & str = "test_counter" ;
261298
262299 // Add data to u64_counter
263300 let meter = opentelemetry:: global:: meter_provider ( ) . meter ( METER_NAME ) ;
264-
265- let counter = meter. u64_counter ( "counter_u64" ) . build ( ) ;
301+ let expected_uuid = Uuid :: new_v4 ( ) . to_string ( ) ;
302+ let counter = meter. u64_counter ( INSTRUMENT_NAME ) . build ( ) ;
266303 counter. add (
267304 10 ,
268305 & [
269- KeyValue :: new ( "mykey1" , "myvalue1" ) ,
306+ KeyValue :: new ( "mykey1" , expected_uuid . clone ( ) ) ,
270307 KeyValue :: new ( "mykey2" , "myvalue2" ) ,
271308 ] ,
272309 ) ;
273310
274311 meter_provider. shutdown ( ) ?;
275312 // We still need to sleep, to give otel-collector a chance to flush to disk
276- tokio:: time:: sleep ( SLEEP_DURATION ) . await ;
277-
278- // Validate metrics against results file
279- validate_metrics_against_results ( METER_NAME ) ?;
280-
281- Ok ( ( ) )
282- }
283-
284- #[ tokio:: test( flavor = "multi_thread" , worker_threads = 4 ) ]
285- async fn test_histogram ( ) -> Result < ( ) > {
286- let meter_provider = setup_metrics_test ( )
287- . await
288- . expect ( "MeterProvider must be available for test to run" ) ;
289- const METER_NAME : & str = "test_histogram_meter" ;
290-
291- // Add data to histogram
292- let meter = opentelemetry:: global:: meter_provider ( ) . meter ( METER_NAME ) ;
293- let histogram = meter. u64_histogram ( "example_histogram" ) . build ( ) ;
294- histogram. record ( 42 , & [ KeyValue :: new ( "mykey3" , "myvalue4" ) ] ) ;
295-
296- meter_provider. shutdown ( ) ?;
297- // We still need to sleep, to give otel-collector a chance to flush to disk
298- tokio:: time:: sleep ( SLEEP_DURATION ) . await ;
299-
300- validate_metrics_against_results ( METER_NAME ) ?;
301-
302- Ok ( ( ) )
303- }
304-
305- #[ tokio:: test( flavor = "multi_thread" , worker_threads = 4 ) ]
306- async fn test_up_down_counter ( ) -> Result < ( ) > {
307- let meter_provider = setup_metrics_test ( )
308- . await
309- . expect ( "MeterProvider must be available for test to run" ) ;
310- const METER_NAME : & str = "test_up_down_meter" ;
311-
312- // Add data to up_down_counter
313- let meter = opentelemetry:: global:: meter_provider ( ) . meter ( METER_NAME ) ;
314- let up_down_counter = meter. i64_up_down_counter ( "example_up_down_counter" ) . build ( ) ;
315- up_down_counter. add ( -1 , & [ KeyValue :: new ( "mykey5" , "myvalue5" ) ] ) ;
316-
317- meter_provider. shutdown ( ) ?;
318- // We still need to sleep, to give otel-collector a chance to flush to disk
319- tokio:: time:: sleep ( SLEEP_DURATION ) . await ;
320-
321- validate_metrics_against_results ( METER_NAME ) ?;
322-
323- Ok ( ( ) )
324- }
325-
326- #[ tokio:: test( flavor = "multi_thread" , worker_threads = 4 ) ]
327- #[ ignore]
328- async fn test_flush_on_shutdown ( ) -> Result < ( ) > {
329- const METER_NAME : & str = "test_flush_on_shutdown" ;
330-
331- // Set everything up by hand, so that we can shutdown() the exporter
332- // and make sure our data is flushed through.
333-
334- // Make sure the collector is running
335- start_collector_container ( ) . await ?;
336-
337- // Set up the exporter
338- let exporter = create_exporter ( ) ;
339- let reader = PeriodicReader :: builder ( exporter)
340- . with_interval ( Duration :: from_secs ( 30 ) )
341- . with_timeout ( Duration :: from_secs ( 1 ) )
342- . build ( ) ;
343- let resource = Resource :: builder_empty ( )
344- . with_service_name ( "metrics-integration-test" )
345- . build ( ) ;
346- let meter_provider = MeterProviderBuilder :: default ( )
347- . with_resource ( resource)
348- . with_reader ( reader)
349- . build ( ) ;
350-
351- // Send something
352- let meter = meter_provider. meter ( METER_NAME ) ;
353- let counter = meter. u64_counter ( "counter_" ) . build ( ) ;
354- counter. add ( 123 , & [ ] ) ;
355-
356- // Shutdown
357- meter_provider. shutdown ( ) ?;
358-
359- // We still need to sleep, to give otel-collector a chance to flush to disk
360- tokio:: time:: sleep ( SLEEP_DURATION ) . await ;
313+ std:: thread:: sleep ( SLEEP_DURATION ) ;
361314
362- validate_metrics_against_results ( METER_NAME ) ?;
315+ // Validate metrics against results file This is not the extensive
316+ // validation of output, but good enough to confirm that metrics have
317+ // been accepted by OTel Collector.
318+ assert_metrics_results_contains ( & expected_uuid) ?;
363319
364320 Ok ( ( ) )
365321 }
0 commit comments