@@ -76,6 +76,11 @@ use tokio::sync::{oneshot, watch, Notify, RwLock};
7676use tokio:: time:: timeout;
7777use tokio_util:: sync:: CancellationToken ;
7878use tracing:: { instrument, Instrument } ;
79+ use tracing_opentelemetry:: OpenTelemetrySpanExt ;
80+ use opentelemetry:: trace:: {
81+ SpanContext , SpanId , TraceContextExt , TraceFlags , TraceId ,
82+ } ;
83+ use opentelemetry:: { Context as OtelContext } ;
7984
8085#[ automock]
8186#[ async_trait]
@@ -327,13 +332,42 @@ impl WorkerProcessing for WorkerProcessor {
327332 let records = SerializedRecordBatchStream :: write ( schema. as_ref ( ) , records) ?;
328333 Ok ( ( schema, records, data_loaded_size) )
329334 } ;
330- let span = trace_id_and_span_id. map ( |( t, s) | {
331- tracing:: info_span!(
332- "Process on select worker" ,
333- cube_dd_trace_id = t,
334- cube_dd_parent_span_id = s
335- )
336- } ) ;
335+
336+ let span = match std:: env:: var ( "CUBESTORE_TRACING_TYPE" )
337+ . unwrap_or ( "datadog" . to_string ( ) )
338+ . to_lowercase ( )
339+ . as_str ( )
340+ {
341+ "otel" => {
342+ trace_id_and_span_id. map ( |( t, s) | {
343+ let trace_id = TraceId :: from ( t) ;
344+ let span_id = SpanId :: from ( s) ;
345+ let span_context = SpanContext :: new (
346+ trace_id,
347+ span_id,
348+ TraceFlags :: SAMPLED ,
349+ true ,
350+ Default :: default ( ) ,
351+ ) ;
352+
353+ let context = OtelContext :: new ( ) . with_remote_span_context ( span_context) ;
354+ let span = tracing:: info_span!( "Process on select worker" ) ;
355+
356+ span. set_parent ( context) ;
357+ span
358+ } )
359+ } ,
360+ _ => {
361+ trace_id_and_span_id. map ( |( t, s) | {
362+ tracing:: info_span!(
363+ "Process on select worker" ,
364+ cube_dd_trace_id = t,
365+ cube_dd_parent_span_id = s
366+ )
367+ } )
368+ }
369+ } ;
370+
337371 if let Some ( span) = span {
338372 future. instrument ( span) . await
339373 } else {
0 commit comments