11use super :: EnabledCollector ;
22use linkerd_app_core:: { control:: ControlAddr , proxy:: http:: Body , Error } ;
3- use linkerd_opentelemetry:: {
4- self as opentelemetry, metrics,
5- proto:: {
6- tonic:: common:: v1:: { any_value, AnyValue , KeyValue } ,
7- transform:: common:: tonic:: ResourceAttributesWithSchema ,
8- } ,
9- semconv,
10- } ;
3+ use linkerd_opentelemetry:: { self as opentelemetry, metrics, otel:: KeyValue , sdk, semconv} ;
114use std:: {
125 collections:: HashMap ,
136 time:: { SystemTime , UNIX_EPOCH } ,
@@ -29,7 +22,7 @@ pub(super) fn create_collector<S>(
2922 legacy_metrics : metrics:: Registry ,
3023) -> EnabledCollector
3124where
32- S : GrpcService < TonicBody > + Clone + Send + ' static ,
25+ S : GrpcService < TonicBody > + Clone + Send + Sync + ' static ,
3326 S :: Error : Into < Error > ,
3427 S :: Future : Send ,
3528 S :: ResponseBody : Body < Data = tonic:: codegen:: Bytes > + Send + ' static ,
@@ -38,37 +31,33 @@ where
3831 let ( span_sink, spans_rx) = mpsc:: channel ( crate :: trace_collector:: SPAN_BUFFER_CAPACITY ) ;
3932 let spans_rx = ReceiverStream :: new ( spans_rx) ;
4033
41- let mut resources = ResourceAttributesWithSchema :: default ( ) ;
42-
43- resources
44- . attributes
45- . 0
46- . push ( ( std:: process:: id ( ) as i64 ) . with_key ( semconv:: attribute:: PROCESS_PID ) ) ;
47-
48- resources. attributes . 0 . push (
49- SystemTime :: now ( )
50- . duration_since ( UNIX_EPOCH )
51- . map ( |d| d. as_secs ( ) as i64 )
52- . unwrap_or_else ( |e| -( e. duration ( ) . as_secs ( ) as i64 ) )
53- . with_key ( "process.start_timestamp" ) ,
54- ) ;
55- resources. attributes . 0 . push (
56- attributes
57- . hostname
58- . unwrap_or_default ( )
59- . with_key ( semconv:: attribute:: HOST_NAME ) ,
60- ) ;
61-
62- resources. attributes . 0 . extend (
63- attributes
64- . extra
65- . into_iter ( )
66- . map ( |( key, value) | value. with_key ( & key) ) ,
67- ) ;
34+ let resource = sdk:: Resource :: builder ( )
35+ . with_attribute ( KeyValue :: new (
36+ semconv:: attribute:: PROCESS_PID ,
37+ std:: process:: id ( ) as i64 ,
38+ ) )
39+ . with_attribute ( KeyValue :: new (
40+ "process.start_timestamp" ,
41+ SystemTime :: now ( )
42+ . duration_since ( UNIX_EPOCH )
43+ . map ( |d| d. as_secs ( ) as i64 )
44+ . unwrap_or_else ( |e| -( e. duration ( ) . as_secs ( ) as i64 ) ) ,
45+ ) )
46+ . with_attribute ( KeyValue :: new (
47+ semconv:: attribute:: HOST_NAME ,
48+ attributes. hostname . unwrap_or_default ( ) ,
49+ ) )
50+ . with_attributes (
51+ attributes
52+ . extra
53+ . into_iter ( )
54+ . map ( |( k, v) | KeyValue :: new ( k, v) ) ,
55+ )
56+ . build ( ) ;
6857
6958 let addr = addr. clone ( ) ;
7059 let task = Box :: pin (
71- opentelemetry:: export_spans ( svc, spans_rx, resources , legacy_metrics)
60+ opentelemetry:: export_spans ( svc, spans_rx, resource , legacy_metrics)
7261 . instrument ( tracing:: debug_span!( "opentelemetry" , peer. addr = %addr) . or_current ( ) ) ,
7362 ) ;
7463
7867 span_sink,
7968 }
8069}
81-
82- trait IntoAnyValue
83- where
84- Self : Sized ,
85- {
86- fn into_any_value ( self ) -> AnyValue ;
87-
88- fn with_key ( self , key : & str ) -> KeyValue {
89- KeyValue {
90- key : key. to_string ( ) ,
91- value : Some ( self . into_any_value ( ) ) ,
92- }
93- }
94- }
95-
96- impl IntoAnyValue for String {
97- fn into_any_value ( self ) -> AnyValue {
98- AnyValue {
99- value : Some ( any_value:: Value :: StringValue ( self ) ) ,
100- }
101- }
102- }
103-
104- impl IntoAnyValue for & str {
105- fn into_any_value ( self ) -> AnyValue {
106- AnyValue {
107- value : Some ( any_value:: Value :: StringValue ( self . to_string ( ) ) ) ,
108- }
109- }
110- }
111-
112- impl IntoAnyValue for i64 {
113- fn into_any_value ( self ) -> AnyValue {
114- AnyValue {
115- value : Some ( any_value:: Value :: IntValue ( self ) ) ,
116- }
117- }
118- }
0 commit comments