@@ -803,4 +803,122 @@ mod tests {
803803 "Metrics should be available in exporter."
804804 ) ;
805805 }
806+
807+ async fn some_async_function ( ) -> u64 {
808+ // No dependency on any particular async runtime.
809+ std:: thread:: sleep ( std:: time:: Duration :: from_millis ( 1 ) ) ;
810+ 1
811+ }
812+
813+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
814+ async fn async_inside_observable_callback_from_tokio_multi_with_one_worker ( ) {
815+ async_inside_observable_callback_helper ( ) ;
816+ }
817+
818+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 2 ) ]
819+ async fn async_inside_observable_callback_from_tokio_multi_with_two_worker ( ) {
820+ async_inside_observable_callback_helper ( ) ;
821+ }
822+
823+ #[ tokio:: test( flavor = "current_thread" ) ]
824+ async fn async_inside_observable_callback_from_tokio_current_thread ( ) {
825+ async_inside_observable_callback_helper ( ) ;
826+ }
827+
828+ #[ test]
829+ fn async_inside_observable_callback_from_regular_main ( ) {
830+ async_inside_observable_callback_helper ( ) ;
831+ }
832+
833+ fn async_inside_observable_callback_helper ( ) {
834+ let interval = std:: time:: Duration :: from_millis ( 10 ) ;
835+ let exporter = InMemoryMetricExporter :: default ( ) ;
836+ let reader = PeriodicReader :: builder ( exporter. clone ( ) )
837+ . with_interval ( interval)
838+ . build ( ) ;
839+
840+ let meter_provider = SdkMeterProvider :: builder ( ) . with_reader ( reader) . build ( ) ;
841+ let meter = meter_provider. meter ( "test" ) ;
842+ let _gauge = meter
843+ . u64_observable_gauge ( "my_observable_gauge" )
844+ . with_callback ( |observer| {
845+ // using futures_executor::block_on intentionally and avoiding
846+ // any particular async runtime.
847+ let value = futures_executor:: block_on ( some_async_function ( ) ) ;
848+ observer. observe ( value, & [ ] ) ;
849+ } )
850+ . build ( ) ;
851+
852+ meter_provider. force_flush ( ) . expect ( "flush should succeed" ) ;
853+ let exported_metrics = exporter
854+ . get_finished_metrics ( )
855+ . expect ( "this should not fail" ) ;
856+ assert ! (
857+ !exported_metrics. is_empty( ) ,
858+ "Metrics should be available in exporter."
859+ ) ;
860+ }
861+
862+ async fn some_tokio_async_function ( ) -> u64 {
863+ // Tokio specific async function
864+ tokio:: time:: sleep ( Duration :: from_millis ( 1 ) ) . await ;
865+ 1
866+ }
867+
868+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
869+ #[ ignore] // This test is expected to fail as PeriodicReader (by default) will not work if callbacks involve any particular runtime.
870+
871+ async fn tokio_async_inside_observable_callback_from_tokio_multi_with_one_worker ( ) {
872+ tokio_async_inside_observable_callback_helper ( ) ;
873+ }
874+
875+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 2 ) ]
876+ #[ ignore] // This test is expected to fail as PeriodicReader (by default) will not work if callbacks involve any particular runtime.
877+ async fn tokio_async_inside_observable_callback_from_tokio_multi_with_two_worker ( ) {
878+ // Run this test with stdout enabled to see output.
879+ // cargo test async_inside_observable_callbacks --features=testing -- --nocapture
880+ // Arrange
881+ tokio_async_inside_observable_callback_helper ( ) ;
882+ }
883+
884+ #[ tokio:: test( flavor = "current_thread" ) ]
885+ #[ ignore] // This test is expected to fail as PeriodicReader (by default) will not work if callbacks involve any particular runtime.
886+ async fn tokio_async_inside_observable_callback_from_tokio_current_thread ( ) {
887+ tokio_async_inside_observable_callback_helper ( ) ;
888+ }
889+
890+ #[ test]
891+ // This works!! // TODO: Investigate why this works and others don't.
892+ fn tokio_async_inside_observable_callback_from_regular_main ( ) {
893+ async_inside_observable_callback_helper ( ) ;
894+ }
895+
896+ fn tokio_async_inside_observable_callback_helper ( ) {
897+ let interval = std:: time:: Duration :: from_millis ( 10 ) ;
898+ let exporter = InMemoryMetricExporter :: default ( ) ;
899+ let reader = PeriodicReader :: builder ( exporter. clone ( ) )
900+ . with_interval ( interval)
901+ . build ( ) ;
902+
903+ let meter_provider = SdkMeterProvider :: builder ( ) . with_reader ( reader) . build ( ) ;
904+ let meter = meter_provider. meter ( "test" ) ;
905+ let rt = tokio:: runtime:: Runtime :: new ( ) . unwrap ( ) ;
906+ let _gauge = meter
907+ . u64_observable_gauge ( "my_observable_gauge" )
908+ . with_callback ( move |observer| {
909+ // call tokio specific async function from here
910+ let value = rt. block_on ( some_tokio_async_function ( ) ) ;
911+ observer. observe ( value, & [ ] ) ;
912+ } )
913+ . build ( ) ;
914+
915+ meter_provider. force_flush ( ) . expect ( "flush should succeed" ) ;
916+ let exported_metrics = exporter
917+ . get_finished_metrics ( )
918+ . expect ( "this should not fail" ) ;
919+ assert ! (
920+ !exported_metrics. is_empty( ) ,
921+ "Metrics should be available in exporter."
922+ ) ;
923+ }
806924}
0 commit comments