@@ -866,34 +866,28 @@ mod tests {
866866 }
867867
868868 #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
869- #[ ignore] // This test is expected to fail as PeriodicReader (by default) will not work if callbacks involve any particular runtime.
870869
871870 async fn tokio_async_inside_observable_callback_from_tokio_multi_with_one_worker ( ) {
872- tokio_async_inside_observable_callback_helper ( ) ;
871+ tokio_async_inside_observable_callback_helper ( true ) ;
873872 }
874873
875874 #[ tokio:: test( flavor = "multi_thread" , worker_threads = 2 ) ]
876- #[ ignore] // This test is expected to fail as PeriodicReader (by default) will not work if callbacks involve any particular runtime.
877875 async fn tokio_async_inside_observable_callback_from_tokio_multi_with_two_worker ( ) {
878- // Run this test with stdout enabled to see output.
879- // cargo test async_inside_observable_callbacks --features=testing -- --nocapture
880- // Arrange
881- tokio_async_inside_observable_callback_helper ( ) ;
876+ tokio_async_inside_observable_callback_helper ( true ) ;
882877 }
883878
884879 #[ tokio:: test( flavor = "current_thread" ) ]
885- #[ ignore] // This test is expected to fail as PeriodicReader (by default) will not work if callbacks involve any particular runtime .
880+ #[ ignore] //TODO: Investigate if this can be fixed .
886881 async fn tokio_async_inside_observable_callback_from_tokio_current_thread ( ) {
887- tokio_async_inside_observable_callback_helper ( ) ;
882+ tokio_async_inside_observable_callback_helper ( true ) ;
888883 }
889884
890885 #[ test]
891- // This works!! // TODO: Investigate why this works and others don't.
892886 fn tokio_async_inside_observable_callback_from_regular_main ( ) {
893- async_inside_observable_callback_helper ( ) ;
887+ tokio_async_inside_observable_callback_helper ( false ) ;
894888 }
895889
896- fn tokio_async_inside_observable_callback_helper ( ) {
890+ fn tokio_async_inside_observable_callback_helper ( use_current_tokio_runtime : bool ) {
897891 let interval = std:: time:: Duration :: from_millis ( 10 ) ;
898892 let exporter = InMemoryMetricExporter :: default ( ) ;
899893 let reader = PeriodicReader :: builder ( exporter. clone ( ) )
@@ -902,15 +896,33 @@ mod tests {
902896
903897 let meter_provider = SdkMeterProvider :: builder ( ) . with_reader ( reader) . build ( ) ;
904898 let meter = meter_provider. meter ( "test" ) ;
905- let rt = tokio:: runtime:: Runtime :: new ( ) . unwrap ( ) ;
906- let _gauge = meter
907- . u64_observable_gauge ( "my_observable_gauge" )
908- . with_callback ( move |observer| {
909- // call tokio specific async function from here
910- let value = rt. block_on ( some_tokio_async_function ( ) ) ;
911- observer. observe ( value, & [ ] ) ;
912- } )
913- . build ( ) ;
899+
900+ if use_current_tokio_runtime {
901+ let rt = tokio:: runtime:: Handle :: current ( ) . clone ( ) ;
902+ let _gauge = meter
903+ . u64_observable_gauge ( "my_observable_gauge" )
904+ . with_callback ( move |observer| {
905+ // call tokio specific async function from here
906+ let value = rt. block_on ( some_tokio_async_function ( ) ) ;
907+ observer. observe ( value, & [ ] ) ;
908+ } )
909+ . build ( ) ;
910+ // rt here is a reference to the current tokio runtime.
911+ // Droppng it occurs when the tokio::main itself ends.
912+ } else {
913+ let rt = tokio:: runtime:: Runtime :: new ( ) . unwrap ( ) ;
914+ let _gauge = meter
915+ . u64_observable_gauge ( "my_observable_gauge" )
916+ . with_callback ( move |observer| {
917+ // call tokio specific async function from here
918+ let value = rt. block_on ( some_tokio_async_function ( ) ) ;
919+ observer. observe ( value, & [ ] ) ;
920+ } )
921+ . build ( ) ;
922+ // rt is not dropped here as it is moved to the closure,
923+ // and is dropped only when MeterProvider itself is dropped.
924+ // This works when called from normal main.
925+ } ;
914926
915927 meter_provider. force_flush ( ) . expect ( "flush should succeed" ) ;
916928 let exported_metrics = exporter
0 commit comments