@@ -5,7 +5,10 @@ use std::{
55} ;
66
77use notify:: { RecursiveMode , Watcher } ;
8- use opentelemetry:: { KeyValue , metrics:: Counter } ;
8+ use opentelemetry:: {
9+ KeyValue ,
10+ metrics:: { Counter , Histogram } ,
11+ } ;
912use pool_nts:: {
1013 AlgorithmDescription , BufferBorrowingReader , ClientRequest , ErrorCode , ErrorResponse ,
1114 FixedKeyRequest , KeyExchangeResponse , MAX_MESSAGE_SIZE , NoAgreementResponse , NtsError ,
@@ -27,6 +30,7 @@ use crate::{
2730 error:: PoolError ,
2831 haproxy:: parse_haproxy_header,
2932 servers:: { ConnectionType , Server , ServerConnection , ServerManager } ,
33+ telemetry:: TIMING_HISTOGRAM_BUCKET_BOUNDARIES ,
3034 util:: load_certificates,
3135} ;
3236
@@ -44,7 +48,9 @@ struct NtsPoolKe<S> {
4448 server_tls : RwLock < TlsAcceptor > ,
4549 monitoring_keys : RwLock < Arc < HashSet < String > > > ,
4650 session_counter : Counter < u64 > ,
51+ session_duration : Histogram < f64 > ,
4752 upstream_session_counter : Counter < u64 > ,
53+ upstream_session_duration : Histogram < f64 > ,
4854 server_manager : S ,
4955}
5056
@@ -63,17 +69,33 @@ impl<S: ServerManager + 'static> NtsPoolKe<S> {
6369 . with_description ( "number of ke sessions with clients" )
6470 . build ( ) ;
6571
72+ let session_duration = meter
73+ . f64_histogram ( "session_duration" )
74+ . with_description ( "Duration of the session" )
75+ . with_unit ( "s" )
76+ . with_boundaries ( TIMING_HISTOGRAM_BUCKET_BOUNDARIES . to_vec ( ) )
77+ . build ( ) ;
78+
6679 let upstream_session_counter = meter
6780 . u64_counter ( "upstream_cookie_sessions" )
6881 . with_description ( "number of ke sessions with upstream for getting cookies" )
6982 . build ( ) ;
7083
84+ let upstream_session_duration = meter
85+ . f64_histogram ( "upstream_get_cookie_duration" )
86+ . with_description ( "Duration to get cookies from upstream time source" )
87+ . with_unit ( "s" )
88+ . with_boundaries ( TIMING_HISTOGRAM_BUCKET_BOUNDARIES . to_vec ( ) )
89+ . build ( ) ;
90+
7191 Ok ( NtsPoolKe {
7292 config,
7393 server_tls,
7494 monitoring_keys,
7595 session_counter,
96+ session_duration,
7697 upstream_session_counter,
98+ upstream_session_duration,
7799 server_manager,
78100 } )
79101 }
@@ -105,6 +127,7 @@ impl<S: ServerManager + 'static> NtsPoolKe<S> {
105127 _ = shutdown. recv( ) => { break ; }
106128 accept_result = listener. accept( ) => { accept_result? }
107129 } ;
130+ let start_time = std:: time:: Instant :: now ( ) ;
108131 let self_clone = self . clone ( ) ;
109132
110133 tracker. spawn ( async move {
@@ -116,6 +139,13 @@ impl<S: ServerManager + 'static> NtsPoolKe<S> {
116139 . await
117140 {
118141 Err ( _) => {
142+ self_clone. session_duration . record (
143+ start_time. elapsed ( ) . as_secs_f64 ( ) ,
144+ & [
145+ KeyValue :: new ( "outcome" , "timeout" ) ,
146+ KeyValue :: new ( "is_monitor" , is_monitor) ,
147+ ] ,
148+ ) ;
119149 :: tracing:: debug!( ?source_address, "NTS Pool KE timed out" ) ;
120150 self_clone. session_counter . add (
121151 1 ,
@@ -126,6 +156,13 @@ impl<S: ServerManager + 'static> NtsPoolKe<S> {
126156 ) ;
127157 }
128158 Ok ( Err ( err) ) => {
159+ self_clone. session_duration . record (
160+ start_time. elapsed ( ) . as_secs_f64 ( ) ,
161+ & [
162+ KeyValue :: new ( "outcome" , "error" ) ,
163+ KeyValue :: new ( "is_monitor" , is_monitor) ,
164+ ] ,
165+ ) ;
129166 :: tracing:: debug!( ?err, ?source_address, "NTS Pool KE failed" ) ;
130167 self_clone. session_counter . add (
131168 1 ,
@@ -136,6 +173,13 @@ impl<S: ServerManager + 'static> NtsPoolKe<S> {
136173 ) ;
137174 }
138175 Ok ( Ok ( ( ) ) ) => {
176+ self_clone. session_duration . record (
177+ start_time. elapsed ( ) . as_secs_f64 ( ) ,
178+ & [
179+ KeyValue :: new ( "outcome" , "success" ) ,
180+ KeyValue :: new ( "is_monitor" , is_monitor) ,
181+ ] ,
182+ ) ;
139183 :: tracing:: debug!( ?source_address, "NTS Pool KE completed" ) ;
140184 self_clone. session_counter . add (
141185 1 ,
@@ -539,14 +583,29 @@ impl<S: ServerManager + 'static> NtsPoolKe<S> {
539583 }
540584 }
541585
542- // TODO: Implement connection reuse
586+ let start_time = std :: time :: Instant :: now ( ) ;
543587 match tokio:: time:: timeout ( self . config . timesource_timeout , async {
544588 let server_stream = server. connect ( connection_type) . await ?;
545589 workaround_lifetime_bug ( buffer, request, server_stream) . await
546590 } )
547591 . await
548592 {
549593 Ok ( v) => {
594+ self . upstream_session_duration . record (
595+ start_time. elapsed ( ) . as_secs_f64 ( ) ,
596+ & [
597+ KeyValue :: new (
598+ "outcome" ,
599+ match v {
600+ Ok ( _) => "success" ,
601+ Err ( _) => "error" ,
602+ } ,
603+ ) ,
604+ KeyValue :: new ( "server" , server. name ( ) . clone ( ) ) ,
605+ KeyValue :: new ( "uuid" , server. uuid ( ) . clone ( ) ) ,
606+ KeyValue :: new ( "is_monitor" , for_monitor) ,
607+ ] ,
608+ ) ;
550609 self . upstream_session_counter . add (
551610 1 ,
552611 & [
@@ -565,6 +624,15 @@ impl<S: ServerManager + 'static> NtsPoolKe<S> {
565624 v
566625 }
567626 Err ( _) => {
627+ self . upstream_session_duration . record (
628+ start_time. elapsed ( ) . as_secs_f64 ( ) ,
629+ & [
630+ KeyValue :: new ( "outcome" , "timeout" ) ,
631+ KeyValue :: new ( "server" , server. name ( ) . clone ( ) ) ,
632+ KeyValue :: new ( "uuid" , server. uuid ( ) . clone ( ) ) ,
633+ KeyValue :: new ( "is_monitor" , for_monitor) ,
634+ ] ,
635+ ) ;
568636 self . upstream_session_counter . add (
569637 1 ,
570638 & [
0 commit comments