@@ -17,6 +17,8 @@ limitations under the License.
1717use hyperlight_common:: flatbuffer_wrappers:: function_types:: { ParameterValue , ReturnType } ;
1818use rand:: Rng ;
1919use tracing:: { span, Level } ;
20+ use tracing_opentelemetry:: OpenTelemetryLayer ;
21+ use tracing_subscriber:: layer:: SubscriberExt ;
2022use tracing_subscriber:: util:: SubscriberInitExt ;
2123extern crate hyperlight_host;
2224use std:: error:: Error ;
@@ -29,16 +31,19 @@ use hyperlight_host::sandbox_state::sandbox::EvolvableSandbox;
2931use hyperlight_host:: sandbox_state:: transition:: Noop ;
3032use hyperlight_host:: { GuestBinary , MultiUseSandbox , Result as HyperlightResult } ;
3133use hyperlight_testing:: simple_guest_as_string;
32- use opentelemetry:: global:: shutdown_tracer_provider;
34+ use opentelemetry:: global:: { self , shutdown_tracer_provider} ;
3335use opentelemetry:: trace:: TracerProvider ;
3436use opentelemetry:: KeyValue ;
35- use opentelemetry_otlp:: { new_exporter , new_pipeline , WithExportConfig } ;
37+ use opentelemetry_otlp:: { SpanExporter , WithExportConfig } ;
3638use opentelemetry_sdk:: runtime:: Tokio ;
3739use opentelemetry_sdk:: { trace, Resource } ;
38- use tracing_subscriber:: layer:: SubscriberExt ;
40+ use opentelemetry_semantic_conventions:: attribute:: { SERVICE_NAME , SERVICE_VERSION } ;
41+ use opentelemetry_semantic_conventions:: SCHEMA_URL ;
3942use tracing_subscriber:: EnvFilter ;
4043use uuid:: Uuid ;
4144
45+ const ENDPOINT_ADDR : & str = "http://localhost:4317" ;
46+
4247fn fn_writer ( _msg : String ) -> HyperlightResult < i32 > {
4348 Ok ( 0 )
4449}
@@ -47,30 +52,51 @@ fn fn_writer(_msg: String) -> HyperlightResult<i32> {
4752
4853#[ tokio:: main]
4954async fn main ( ) -> Result < ( ) , Box < dyn Error + Send + Sync + ' static > > {
50- let tracer = new_pipeline ( )
51- . tracing ( )
52- . with_exporter (
53- new_exporter ( )
54- . tonic ( )
55- . with_endpoint ( "http://localhost:4317/v1/traces" ) ,
55+ init_tracing_subscriber ( ENDPOINT_ADDR ) ?;
56+
57+ Ok ( run_example ( true ) ?)
58+ }
59+
60+ fn init_tracing_subscriber ( addr : & str ) -> Result < ( ) , Box < dyn Error + Send + Sync + ' static > > {
61+ let exporter = SpanExporter :: builder ( )
62+ . with_tonic ( )
63+ . with_endpoint ( addr)
64+ . build ( ) ?;
65+
66+ let provider = trace:: TracerProvider :: builder ( )
67+ . with_config (
68+ trace:: Config :: default ( ) . with_resource ( Resource :: from_schema_url (
69+ vec ! [
70+ KeyValue :: new( SERVICE_NAME , "hyperlight_otel_example" ) ,
71+ KeyValue :: new( SERVICE_VERSION , env!( "CARGO_PKG_VERSION" ) ) ,
72+ ] ,
73+ SCHEMA_URL ,
74+ ) ) ,
5675 )
57- . with_trace_config ( trace:: Config :: default ( ) . with_resource ( Resource :: new ( vec ! [
58- KeyValue :: new( "service.name" , "hyperlight_otel_example" ) ,
59- ] ) ) )
60- . install_batch ( Tokio )
61- . unwrap ( )
62- . tracer ( "trace-demo" ) ;
76+ . with_batch_exporter ( exporter, Tokio )
77+ . build ( ) ;
78+
79+ global:: set_tracer_provider ( provider. clone ( ) ) ;
80+ let tracer = provider. tracer ( "trace-demo" ) ;
81+
82+ let otel_layer = OpenTelemetryLayer :: new ( tracer) ;
6383
64- let otel_layer = tracing_opentelemetry:: OpenTelemetryLayer :: new ( tracer) ;
84+ // Try using the environment otherwise set default filters
85+ let filter = EnvFilter :: try_from_default_env ( ) . unwrap_or_else ( |_| {
86+ EnvFilter :: from_default_env ( )
87+ . add_directive ( "hyperlight_host=info" . parse ( ) . unwrap ( ) )
88+ . add_directive ( "tracing=info" . parse ( ) . unwrap ( ) )
89+ } ) ;
6590
66- tracing_subscriber:: Registry :: default ( )
67- . with ( EnvFilter :: from_default_env ( ) )
91+ tracing_subscriber:: registry ( )
92+ . with ( filter )
6893 . with ( otel_layer)
6994 . try_init ( ) ?;
7095
71- Ok ( run_example ( ) ? )
96+ Ok ( ( ) )
7297}
73- fn run_example ( ) -> HyperlightResult < ( ) > {
98+
99+ fn run_example ( wait_input : bool ) -> HyperlightResult < ( ) > {
74100 // Get the path to a simple guest binary.
75101 let hyperlight_guest_path =
76102 simple_guest_as_string ( ) . expect ( "Cannot find the guest binary at the expected location." ) ;
@@ -168,9 +194,12 @@ fn run_example() -> HyperlightResult<()> {
168194 join_handles. push ( handle) ;
169195 }
170196
171- println ! ( "Press enter to exit..." ) ;
172- let mut input = String :: new ( ) ;
173- stdin ( ) . read_line ( & mut input) ?;
197+ if wait_input {
198+ println ! ( "Press enter to exit..." ) ;
199+ let mut input = String :: new ( ) ;
200+ stdin ( ) . read_line ( & mut input) ?;
201+ }
202+
174203 * should_exit. try_lock ( ) . unwrap ( ) = true ;
175204 for join_handle in join_handles {
176205 let result = join_handle. join ( ) ;
@@ -180,3 +209,52 @@ fn run_example() -> HyperlightResult<()> {
180209
181210 Ok ( ( ) )
182211}
212+
213+ #[ cfg( test) ]
214+ mod test {
215+ use hyperlight_host:: { HyperlightError , Result } ;
216+ use tokio:: io:: AsyncReadExt ;
217+ use tokio:: net:: { TcpListener , TcpStream } ;
218+
219+ use super :: * ;
220+
221+ const TESTER_ADDR : & str = "127.0.0.1:4317" ;
222+
223+ async fn handle ( mut stream : TcpStream ) -> Result < ( ) > {
224+ let mut buf = Vec :: with_capacity ( 128 ) ;
225+ let size = stream. read_buf ( & mut buf) . await ?;
226+
227+ if size > 0 {
228+ Ok ( ( ) )
229+ } else {
230+ Err ( HyperlightError :: Error ( "Cannot read req body" . to_string ( ) ) )
231+ }
232+ }
233+
234+ async fn check_otl_connection ( addr : & str ) -> Result < ( ) > {
235+ let listener = TcpListener :: bind ( addr) . await ?;
236+
237+ let ( stream, _) = listener. accept ( ) . await ?;
238+
239+ handle ( stream) . await
240+ }
241+
242+ #[ tokio:: test]
243+ async fn test_subscriber ( ) {
244+ // Create task that generates spans
245+ let task = tokio:: spawn ( async move {
246+ let _ = init_tracing_subscriber ( ENDPOINT_ADDR ) ;
247+
248+ // No need to wait for input, just generate some spans and exit
249+ let _ = run_example ( false ) ;
250+ } ) ;
251+
252+ // Create server that listens and checks to see if traces are received
253+ let result = check_otl_connection ( TESTER_ADDR ) . await ;
254+
255+ // Abort task in case it doesn't finish
256+ task. abort ( ) ;
257+
258+ assert ! ( result. is_ok( ) ) ;
259+ }
260+ }
0 commit comments