@@ -8,6 +8,7 @@ use tokio::sync::{mpsc, oneshot};
88use tokio:: task:: JoinHandle ;
99
1010use crate :: backend:: client:: { client_request_channel, ClientId , RequestTx } ;
11+ use crate :: metrics;
1112
1213/// Backend node representation (host:port string).
1314#[ derive( Debug , Clone , PartialEq , Eq , Hash ) ]
@@ -51,10 +52,55 @@ where
5152 ) ;
5253}
5354
55+ #[ derive( Clone , Copy ) ]
56+ enum SessionKind {
57+ Shared ,
58+ Exclusive ,
59+ }
60+
61+ impl SessionKind {
62+ fn as_str ( self ) -> & ' static str {
63+ match self {
64+ SessionKind :: Shared => "shared" ,
65+ SessionKind :: Exclusive => "exclusive" ,
66+ }
67+ }
68+ }
69+
5470struct SessionHandle < T : BackendRequest > {
5571 tx : RequestTx < SessionCommand < T > > ,
5672 #[ allow( dead_code) ]
5773 join : JoinHandle < ( ) > ,
74+ cluster : Arc < str > ,
75+ backend : BackendNode ,
76+ kind : SessionKind ,
77+ active : bool ,
78+ }
79+
80+ impl < T : BackendRequest > SessionHandle < T > {
81+ fn close ( mut self ) {
82+ if self . active {
83+ metrics:: pool_session_close (
84+ self . cluster . as_ref ( ) ,
85+ self . backend . as_str ( ) ,
86+ self . kind . as_str ( ) ,
87+ ) ;
88+ self . active = false ;
89+ }
90+ }
91+ }
92+
93+ impl < T : BackendRequest > Drop for SessionHandle < T > {
94+ fn drop ( & mut self ) {
95+ if self . active {
96+ metrics:: pool_session_close (
97+ self . cluster . as_ref ( ) ,
98+ self . backend . as_str ( ) ,
99+ self . kind . as_str ( ) ,
100+ ) ;
101+ self . active = false ;
102+ }
103+ }
58104}
59105
60106struct NodeSessions < T : BackendRequest > {
@@ -117,12 +163,22 @@ impl<T: BackendRequest> ConnectionPool<T> {
117163 let node_sessions = guard
118164 . entry ( node. clone ( ) )
119165 . or_insert_with ( || NodeSessions :: new ( self . slots_per_node ) ) ;
166+ metrics:: pool_exclusive_idle (
167+ self . cluster . as_ref ( ) ,
168+ node. as_str ( ) ,
169+ node_sessions. exclusive_idle . len ( ) ,
170+ ) ;
120171 let entry = node_sessions
121172 . shared
122173 . get_mut ( index)
123174 . expect ( "session index within bounds" ) ;
124175 let handle = entry. get_or_insert_with ( || {
125- new_session_handle ( connector. clone ( ) , node. clone ( ) , cluster. clone ( ) )
176+ new_session_handle (
177+ connector. clone ( ) ,
178+ node. clone ( ) ,
179+ cluster. clone ( ) ,
180+ SessionKind :: Shared ,
181+ )
126182 } ) ;
127183 handle. tx . clone ( )
128184 } ;
@@ -140,11 +196,15 @@ impl<T: BackendRequest> ConnectionPool<T> {
140196 {
141197 let mut guard = self . sessions . write ( ) ;
142198 if let Some ( node_sessions) = guard. get_mut ( & node) {
143- node_sessions. shared [ index] = None ;
199+ if let Some ( handle) = node_sessions. shared [ index] . take ( ) {
200+ handle. close ( ) ;
201+ }
144202 if node_sessions. is_empty ( ) {
203+ metrics:: pool_exclusive_idle ( self . cluster . as_ref ( ) , node. as_str ( ) , 0 ) ;
145204 guard. remove ( & node) ;
146205 }
147206 }
207+ metrics:: backend_error ( & self . cluster , node. as_str ( ) , "enqueue_failed" ) ;
148208 return Err ( anyhow ! ( "failed to enqueue backend request: {err}" ) ) ;
149209 }
150210 Ok ( response_rx)
@@ -156,11 +216,22 @@ impl<T: BackendRequest> ConnectionPool<T> {
156216 let node_sessions = guard
157217 . entry ( node. clone ( ) )
158218 . or_insert_with ( || NodeSessions :: new ( self . slots_per_node ) ) ;
159- if let Some ( handle) = node_sessions. exclusive_idle . pop ( ) {
219+ let handle = if let Some ( handle) = node_sessions. exclusive_idle . pop ( ) {
160220 handle
161221 } else {
162- new_session_handle ( self . connector . clone ( ) , node. clone ( ) , self . cluster . clone ( ) )
163- }
222+ new_session_handle (
223+ self . connector . clone ( ) ,
224+ node. clone ( ) ,
225+ self . cluster . clone ( ) ,
226+ SessionKind :: Exclusive ,
227+ )
228+ } ;
229+ metrics:: pool_exclusive_idle (
230+ self . cluster . as_ref ( ) ,
231+ node. as_str ( ) ,
232+ node_sessions. exclusive_idle . len ( ) ,
233+ ) ;
234+ handle
164235 } ;
165236
166237 ExclusiveConnection {
@@ -186,12 +257,24 @@ fn new_session_handle<T: BackendRequest>(
186257 connector : Arc < dyn Connector < T > > ,
187258 node : BackendNode ,
188259 cluster : Arc < str > ,
260+ kind : SessionKind ,
189261) -> SessionHandle < T > {
262+ let backend = node. clone ( ) ;
263+ let cluster_for_metrics = cluster. clone ( ) ;
190264 let ( tx, rx) = client_request_channel ( ) ;
191- let join = spawn_session ( connector, node, cluster, rx) ;
265+ let join = spawn_session ( connector, backend. clone ( ) , cluster. clone ( ) , rx) ;
266+ metrics:: pool_session_open (
267+ cluster_for_metrics. as_ref ( ) ,
268+ backend. as_str ( ) ,
269+ kind. as_str ( ) ,
270+ ) ;
192271 SessionHandle {
193- tx : tx . clone ( ) ,
272+ tx,
194273 join,
274+ cluster : cluster_for_metrics,
275+ backend,
276+ kind,
277+ active : true ,
195278 }
196279}
197280
@@ -213,21 +296,29 @@ impl<'a, T: BackendRequest> ExclusiveConnection<'a, T> {
213296 pub async fn send ( & mut self , mut request : T ) -> Result < oneshot:: Receiver < Result < T :: Response > > > {
214297 request. apply_total_tracker ( & self . pool . cluster ) ;
215298 request. apply_remote_tracker ( & self . pool . cluster ) ;
216- let handle = self
299+ let tx = self
217300 . handle
218301 . as_ref ( )
219- . ok_or_else ( || anyhow ! ( "exclusive connection has been released" ) ) ?;
302+ . ok_or_else ( || anyhow ! ( "exclusive connection has been released" ) ) ?
303+ . tx
304+ . clone ( ) ;
220305
221306 let ( respond_to, response_rx) = oneshot:: channel ( ) ;
222- if let Err ( err) = handle
223- . tx
307+ if let Err ( err) = tx
224308 . send ( SessionCommand {
225309 request,
226310 respond_to,
227311 } )
228312 . await
229313 {
230- self . handle = None ;
314+ if let Some ( handle) = self . handle . take ( ) {
315+ handle. close ( ) ;
316+ }
317+ metrics:: backend_error (
318+ & self . pool . cluster ,
319+ self . node . as_str ( ) ,
320+ "exclusive_enqueue_failed" ,
321+ ) ;
231322 return Err ( anyhow ! ( "failed to enqueue backend request: {err}" ) ) ;
232323 }
233324 Ok ( response_rx)
@@ -246,6 +337,11 @@ impl<'a, T: BackendRequest> Drop for ExclusiveConnection<'a, T> {
246337 . entry ( self . node . clone ( ) )
247338 . or_insert_with ( || NodeSessions :: new ( self . pool . slots_per_node ) ) ;
248339 node_sessions. exclusive_idle . push ( handle) ;
340+ metrics:: pool_exclusive_idle (
341+ self . pool . cluster . as_ref ( ) ,
342+ self . node . as_str ( ) ,
343+ node_sessions. exclusive_idle . len ( ) ,
344+ ) ;
249345 }
250346 }
251347}
0 commit comments