@@ -86,6 +86,7 @@ pub struct NodeConnectionPool {
8686 conns : Arc < ArcSwap < MaybePoolConnections > > ,
8787 use_keyspace_request_sender : mpsc:: Sender < UseKeyspaceRequest > ,
8888 _refiller_handle : RemoteHandle < ( ) > ,
89+ _keepaliver_handle : Option < RemoteHandle < ( ) > > ,
8990 pool_updated_notify : Arc < Notify > ,
9091}
9192
@@ -99,6 +100,8 @@ impl NodeConnectionPool {
99100 let ( use_keyspace_request_sender, use_keyspace_request_receiver) = mpsc:: channel ( 1 ) ;
100101 let pool_updated_notify = Arc :: new ( Notify :: new ( ) ) ;
101102
103+ let keepalive_interval = pool_config. keepalive_interval ;
104+
102105 let refiller = PoolRefiller :: new (
103106 address,
104107 port,
@@ -108,13 +111,29 @@ impl NodeConnectionPool {
108111 ) ;
109112
110113 let conns = refiller. get_shared_connections ( ) ;
111- let ( fut, handle ) = refiller. run ( use_keyspace_request_receiver) . remote_handle ( ) ;
114+ let ( fut, refiller_handle ) = refiller. run ( use_keyspace_request_receiver) . remote_handle ( ) ;
112115 tokio:: spawn ( fut) ;
113116
117+ let keepaliver_handle = if let Some ( interval) = keepalive_interval {
118+ let keepaliver = Keepaliver {
119+ connections : conns. clone ( ) ,
120+ keepalive_interval : interval,
121+ node_address : address,
122+ } ;
123+
124+ let ( fut, keepaliver_handle) = keepaliver. work ( ) . remote_handle ( ) ;
125+ tokio:: spawn ( fut) ;
126+
127+ Some ( keepaliver_handle)
128+ } else {
129+ None
130+ } ;
131+
114132 Self {
115133 conns,
116134 use_keyspace_request_sender,
117- _refiller_handle : handle,
135+ _refiller_handle : refiller_handle,
136+ _keepaliver_handle : keepaliver_handle,
118137 pool_updated_notify,
119138 }
120139 }
@@ -245,6 +264,82 @@ impl NodeConnectionPool {
245264 }
246265}
247266
267+ struct Keepaliver {
268+ connections : Arc < ArcSwap < MaybePoolConnections > > ,
269+ node_address : IpAddr , // This address is only used to enrich the log messages
270+ keepalive_interval : Duration ,
271+ }
272+
273+ impl Keepaliver {
274+ pub fn load_connections ( & self ) -> Vec < Arc < Connection > > {
275+ use MaybePoolConnections :: * ;
276+ use PoolConnections :: * ;
277+
278+ let pool = self . connections . load_full ( ) ;
279+ match & * pool {
280+ Ready ( NotSharded ( conns) ) => conns. clone ( ) ,
281+ Ready ( Sharded { connections, .. } ) => connections. iter ( ) . flatten ( ) . cloned ( ) . collect ( ) ,
282+ Initializing => vec ! [ ] ,
283+ Broken => {
284+ debug ! ( "Cannot send connection keepalives for node {} as there are no alive connections in the pool" , self . node_address) ;
285+ vec ! [ ]
286+ }
287+ }
288+ }
289+
290+ async fn work ( self ) {
291+ let mut interval = tokio:: time:: interval ( self . keepalive_interval ) ;
292+ interval. tick ( ) . await ; // Use up the first, instant tick.
293+
294+ // Default behaviour (Burst) is not suitable for sending keepalives.
295+ interval. set_missed_tick_behavior ( tokio:: time:: MissedTickBehavior :: Delay ) ;
296+
297+ // Wait for the second tick (so that `self.keepalive_interval` time passes since entering
298+ // this function)
299+ interval. tick ( ) . await ;
300+
301+ loop {
302+ let send_keepalives = self . send_keepalives ( ) ;
303+
304+ tokio:: select! {
305+ _ = send_keepalives => {
306+ // Sending keepalives finished before receiving new tick.
307+ // Wait for the new tick and start over.
308+ interval. tick( ) . await ;
309+ }
310+ _ = interval. tick( ) => {
311+ // New tick arrived before `send_keepalives` was finished.
312+ // Stop polling `send_keepalives` and start over.
313+ //
314+ // `Interval::tick()` is cancellation safe, so it's ok to use it like that.
315+ }
316+ }
317+ }
318+ }
319+
320+ async fn send_keepalives ( & self ) {
321+ let connections = self . load_connections ( ) ;
322+ let mut futures = connections
323+ . into_iter ( )
324+ . map ( Self :: send_keepalive_query)
325+ . collect :: < FuturesUnordered < _ > > ( ) ;
326+
327+ while futures. next ( ) . await . is_some ( ) { }
328+ }
329+
330+ async fn send_keepalive_query ( connection : Arc < Connection > ) {
331+ if let Err ( err) = connection
332+ . query_single_page ( "select key from system.local where key = 'local'" , & [ ] )
333+ . await
334+ {
335+ warn ! (
336+ "Failed to execute keepalive request on a connection {:p} - {}" ,
337+ connection, err
338+ ) ;
339+ }
340+ }
341+ }
342+
248343const EXCESS_CONNECTION_BOUND_PER_SHARD_MULTIPLIER : usize = 10 ;
249344
250345// TODO: Make it configurable through a policy (issue #184)
0 commit comments