Skip to content

Commit 0571ffc

Browse files
authored
Merge pull request #395 from havaker/keepalive
Keepalive queries
2 parents d195672 + 0d09fd5 commit 0571ffc

File tree

6 files changed

+143
-16
lines changed

6 files changed

+143
-16
lines changed

scylla/src/transport/cluster.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use itertools::Itertools;
1414
use std::collections::{BTreeMap, HashMap};
1515
use std::net::SocketAddr;
1616
use std::sync::Arc;
17+
use std::time::Duration;
1718
use tracing::{debug, warn};
1819

1920
/// Cluster manages up to date information and connections to database nodes.
@@ -101,6 +102,7 @@ impl Cluster {
101102
metadata_reader: MetadataReader::new(
102103
initial_peers,
103104
pool_config.connection_config.clone(),
105+
pool_config.keepalive_interval,
104106
server_events_sender,
105107
fetch_schema_metadata,
106108
),
@@ -310,7 +312,7 @@ impl ClusterData {
310312

311313
impl ClusterWorker {
312314
pub async fn work(mut self) {
313-
use tokio::time::{Duration, Instant};
315+
use tokio::time::Instant;
314316

315317
let refresh_duration = Duration::from_secs(60); // Refresh topology every 60 seconds
316318
let mut last_refresh_time = Instant::now();

scylla/src/transport/connection.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -196,14 +196,6 @@ pub struct ConnectionConfig {
196196
// should be Some only in control connections,
197197
pub event_sender: Option<mpsc::Sender<Event>>,
198198
pub default_consistency: Consistency,
199-
/*
200-
These configuration options will be added in the future:
201-
202-
pub tcp_keepalive: bool,
203-
204-
pub load_balancing: Option<String>,
205-
pub retry_policy: Option<String>,
206-
*/
207199
}
208200

209201
impl Default for ConnectionConfig {

scylla/src/transport/connection_pool.rs

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub struct PoolConfig {
4747
pub connection_config: ConnectionConfig,
4848
pub pool_size: PoolSize,
4949
pub can_use_shard_aware_port: bool,
50+
pub keepalive_interval: Option<Duration>,
5051
}
5152

5253
impl Default for PoolConfig {
@@ -55,6 +56,7 @@ impl Default for PoolConfig {
5556
connection_config: Default::default(),
5657
pool_size: Default::default(),
5758
can_use_shard_aware_port: true,
59+
keepalive_interval: None,
5860
}
5961
}
6062
}
@@ -137,6 +139,7 @@ pub struct NodeConnectionPool {
137139
conns: Arc<ArcSwap<MaybePoolConnections>>,
138140
use_keyspace_request_sender: mpsc::Sender<UseKeyspaceRequest>,
139141
_refiller_handle: RemoteHandle<()>,
142+
_keepaliver_handle: Option<RemoteHandle<()>>,
140143
pool_updated_notify: Arc<Notify>,
141144
}
142145

@@ -150,6 +153,8 @@ impl NodeConnectionPool {
150153
let (use_keyspace_request_sender, use_keyspace_request_receiver) = mpsc::channel(1);
151154
let pool_updated_notify = Arc::new(Notify::new());
152155

156+
let keepalive_interval = pool_config.keepalive_interval;
157+
153158
let refiller = PoolRefiller::new(
154159
address,
155160
port,
@@ -159,13 +164,29 @@ impl NodeConnectionPool {
159164
);
160165

161166
let conns = refiller.get_shared_connections();
162-
let (fut, handle) = refiller.run(use_keyspace_request_receiver).remote_handle();
167+
let (fut, refiller_handle) = refiller.run(use_keyspace_request_receiver).remote_handle();
163168
tokio::spawn(fut);
164169

170+
let keepaliver_handle = if let Some(interval) = keepalive_interval {
171+
let keepaliver = Keepaliver {
172+
connections: conns.clone(),
173+
keepalive_interval: interval,
174+
node_address: address,
175+
};
176+
177+
let (fut, keepaliver_handle) = keepaliver.work().remote_handle();
178+
tokio::spawn(fut);
179+
180+
Some(keepaliver_handle)
181+
} else {
182+
None
183+
};
184+
165185
Self {
166186
conns,
167187
use_keyspace_request_sender,
168-
_refiller_handle: handle,
188+
_refiller_handle: refiller_handle,
189+
_keepaliver_handle: keepaliver_handle,
169190
pool_updated_notify,
170191
}
171192
}
@@ -315,6 +336,82 @@ impl NodeConnectionPool {
315336
}
316337
}
317338

339+
struct Keepaliver {
340+
connections: Arc<ArcSwap<MaybePoolConnections>>,
341+
node_address: IpAddr, // This address is only used to enrich the log messages
342+
keepalive_interval: Duration,
343+
}
344+
345+
impl Keepaliver {
346+
pub fn load_connections(&self) -> Vec<Arc<Connection>> {
347+
use MaybePoolConnections::*;
348+
use PoolConnections::*;
349+
350+
let pool = self.connections.load_full();
351+
match &*pool {
352+
Ready(NotSharded(conns)) => conns.clone(),
353+
Ready(Sharded { connections, .. }) => connections.iter().flatten().cloned().collect(),
354+
Initializing => vec![],
355+
Broken => {
356+
debug!("Cannot send connection keepalives for node {} as there are no alive connections in the pool", self.node_address);
357+
vec![]
358+
}
359+
}
360+
}
361+
362+
async fn work(self) {
363+
let mut interval = tokio::time::interval(self.keepalive_interval);
364+
interval.tick().await; // Use up the first, instant tick.
365+
366+
// Default behaviour (Burst) is not suitable for sending keepalives.
367+
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
368+
369+
// Wait for the second tick (so that `self.keepalive_interval` time passes since entering
370+
// this function)
371+
interval.tick().await;
372+
373+
loop {
374+
let send_keepalives = self.send_keepalives();
375+
376+
tokio::select! {
377+
_ = send_keepalives => {
378+
// Sending keepalives finished before receiving new tick.
379+
// Wait for the new tick and start over.
380+
interval.tick().await;
381+
}
382+
_ = interval.tick() => {
383+
// New tick arrived before `send_keepalives` was finished.
384+
// Stop polling `send_keepalives` and start over.
385+
//
386+
// `Interval::tick()` is cancellation safe, so it's ok to use it like that.
387+
}
388+
}
389+
}
390+
}
391+
392+
async fn send_keepalives(&self) {
393+
let connections = self.load_connections();
394+
let mut futures = connections
395+
.into_iter()
396+
.map(Self::send_keepalive_query)
397+
.collect::<FuturesUnordered<_>>();
398+
399+
while futures.next().await.is_some() {}
400+
}
401+
402+
async fn send_keepalive_query(connection: Arc<Connection>) {
403+
if let Err(err) = connection
404+
.query_single_page("select key from system.local where key = 'local'", &[])
405+
.await
406+
{
407+
warn!(
408+
"Failed to execute keepalive request on a connection {:p} - {}",
409+
connection, err
410+
);
411+
}
412+
}
413+
}
414+
318415
const EXCESS_CONNECTION_BOUND_PER_SHARD_MULTIPLIER: usize = 10;
319416

320417
// TODO: Make it configurable through a policy (issue #184)

scylla/src/transport/session.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,9 @@ pub struct SessionConfig {
107107

108108
/// If true, full schema is fetched with every metadata refresh.
109109
pub fetch_schema_metadata: bool,
110-
/*
111-
These configuration options will be added in the future:
112110

113-
114-
pub tcp_keepalive: bool,
115-
*/
111+
/// Interval of sending keepalive requests
112+
pub keepalive_interval: Option<Duration>,
116113
}
117114

118115
/// Describes database server known on Session startup.
@@ -153,6 +150,7 @@ impl SessionConfig {
153150
disallow_shard_aware_port: false,
154151
default_consistency: Consistency::LocalQuorum,
155152
fetch_schema_metadata: true,
153+
keepalive_interval: None,
156154
}
157155
}
158156

@@ -220,6 +218,7 @@ impl SessionConfig {
220218
connection_config: self.get_connection_config(),
221219
pool_size: self.connection_pool_size.clone(),
222220
can_use_shard_aware_port: !self.disallow_shard_aware_port,
221+
keepalive_interval: self.keepalive_interval,
223222
}
224223
}
225224

scylla/src/transport/session_builder.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::time::Duration;
1313
use crate::statement::Consistency;
1414
#[cfg(feature = "ssl")]
1515
use openssl::ssl::SslContext;
16+
use tracing::warn;
1617

1718
/// SessionBuilder is used to create new Session instances
1819
/// # Example
@@ -472,6 +473,33 @@ impl SessionBuilder {
472473
self.config.fetch_schema_metadata = fetch;
473474
self
474475
}
476+
477+
/// Set the keepalive interval.
478+
/// The default is `None`, it corresponds to no keepalive messages being send.
479+
///
480+
/// # Example
481+
/// ```
482+
/// # use scylla::{Session, SessionBuilder};
483+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
484+
/// let session: Session = SessionBuilder::new()
485+
/// .known_node("127.0.0.1:9042")
486+
/// .keepalive_interval(std::time::Duration::from_secs(42))
487+
/// .build()
488+
/// .await?;
489+
/// # Ok(())
490+
/// # }
491+
/// ```
492+
pub fn keepalive_interval(mut self, interval: Duration) -> Self {
493+
if interval <= Duration::from_secs(1) {
494+
warn!(
495+
"Setting the keepalive interval to low values ({:?}) is not recommended as it can have a negative impact on performance. Consider setting it above 1 second.",
496+
interval
497+
);
498+
}
499+
500+
self.config.keepalive_interval = Some(interval);
501+
self
502+
}
475503
}
476504

477505
/// Creates a [`SessionBuilder`] with default configuration, same as [`SessionBuilder::new`]

scylla/src/transport/topology.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@ use std::fmt::Formatter;
1515
use std::net::{IpAddr, SocketAddr};
1616
use std::num::NonZeroUsize;
1717
use std::str::FromStr;
18+
use std::time::Duration;
1819
use strum_macros::EnumString;
1920
use tokio::sync::mpsc;
2021
use tracing::{debug, error, trace, warn};
2122

2223
/// Allows to read current metadata from the cluster
2324
pub(crate) struct MetadataReader {
2425
connection_config: ConnectionConfig,
26+
keepalive_interval: Option<Duration>,
27+
2528
control_connection_address: SocketAddr,
2629
control_connection: NodeConnectionPool,
2730

@@ -153,6 +156,7 @@ impl MetadataReader {
153156
pub fn new(
154157
known_peers: &[SocketAddr],
155158
mut connection_config: ConnectionConfig,
159+
keepalive_interval: Option<Duration>,
156160
server_event_sender: mpsc::Sender<Event>,
157161
fetch_schema: bool,
158162
) -> Self {
@@ -168,11 +172,13 @@ impl MetadataReader {
168172
let control_connection = Self::make_control_connection_pool(
169173
control_connection_address,
170174
connection_config.clone(),
175+
keepalive_interval,
171176
);
172177

173178
MetadataReader {
174179
control_connection_address,
175180
control_connection,
181+
keepalive_interval,
176182
connection_config,
177183
known_peers: known_peers.into(),
178184
fetch_schema,
@@ -222,6 +228,7 @@ impl MetadataReader {
222228
self.control_connection = Self::make_control_connection_pool(
223229
self.control_connection_address,
224230
self.connection_config.clone(),
231+
self.keepalive_interval,
225232
);
226233

227234
debug!(
@@ -263,9 +270,11 @@ impl MetadataReader {
263270
fn make_control_connection_pool(
264271
addr: SocketAddr,
265272
connection_config: ConnectionConfig,
273+
keepalive_interval: Option<Duration>,
266274
) -> NodeConnectionPool {
267275
let pool_config = PoolConfig {
268276
connection_config,
277+
keepalive_interval,
269278

270279
// We want to have only one connection to receive events from
271280
pool_size: PoolSize::PerHost(NonZeroUsize::new(1).unwrap()),

0 commit comments

Comments
 (0)