Skip to content

Commit c449270

Browse files
authored
Merge pull request #522 from wprzytula/timeouts
Implemented client-side per-session and per-statement configurable timeouts
2 parents 2dc8432 + 01fe25f commit c449270

File tree

10 files changed

+244
-73
lines changed

10 files changed

+244
-73
lines changed

.github/workflows/authenticate_test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@ jobs:
2424
steps:
2525
- uses: actions/checkout@v2
2626
- name: Run tests
27-
run: cargo test --verbose -- --ignored
27+
run: cargo test --verbose authenticate_superuser -- --ignored

scylla-cql/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ scylla-macros = { version = "0.1.1", path = "../scylla-macros"}
1414
byteorder = "1.3.4"
1515
bytes = "1.0.1"
1616
num_enum = "0.5"
17-
tokio = { version = "1.12", features = ["io-util"] }
17+
tokio = { version = "1.12", features = ["io-util", "time"] }
1818
snap = "1.0"
1919
uuid = "1.0"
2020
thiserror = "1.0"

scylla-cql/src/errors.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ pub enum QueryError {
4040

4141
#[error("Unable to allocate stream id")]
4242
UnableToAllocStreamId,
43+
44+
/// Client timeout occurred before any response arrived
45+
#[error("Request timeout: {0}")]
46+
RequestTimeout(String),
4347
}
4448

4549
/// An error sent from the database in response to a query
@@ -292,6 +296,11 @@ pub enum NewSessionError {
292296

293297
#[error("Unable to allocate stream id")]
294298
UnableToAllocStreamId,
299+
300+
/// Client timeout occurred before a response arrived for some query
301+
/// during `Session` creation.
302+
#[error("Client timeout: {0}")]
303+
RequestTimeout(String),
295304
}
296305

297306
/// Invalid keyspace name given to `Session::use_keyspace()`
@@ -340,6 +349,12 @@ impl From<FrameError> for QueryError {
340349
}
341350
}
342351

352+
impl From<tokio::time::error::Elapsed> for QueryError {
353+
fn from(timer_error: tokio::time::error::Elapsed) -> QueryError {
354+
QueryError::RequestTimeout(format!("{}", timer_error))
355+
}
356+
}
357+
343358
impl From<std::io::Error> for NewSessionError {
344359
fn from(io_error: std::io::Error) -> NewSessionError {
345360
NewSessionError::IoError(Arc::new(io_error))
@@ -359,6 +374,7 @@ impl From<QueryError> for NewSessionError {
359374
NewSessionError::TooManyOrphanedStreamIds(ids)
360375
}
361376
QueryError::UnableToAllocStreamId => NewSessionError::UnableToAllocStreamId,
377+
QueryError::RequestTimeout(msg) => NewSessionError::RequestTimeout(msg),
362378
}
363379
}
364380
}

scylla/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ smallvec = "1.8.0"
4747
[dev-dependencies]
4848
criterion = "0.3"
4949
tracing-subscriber = "0.3.14"
50+
assert_matches = "1.5.0"
5051

5152
[[bench]]
5253
name = "benchmark"

scylla/src/statement/mod.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::Arc;
1+
use std::{sync::Arc, time::Duration};
22

33
use crate::transport::retry_policy::RetryPolicy;
44
use crate::transport::speculative_execution::SpeculativeExecutionPolicy;
@@ -21,6 +21,7 @@ pub struct StatementConfig {
2121

2222
pub tracing: bool,
2323
pub timestamp: Option<i64>,
24+
pub request_timeout: Option<Duration>,
2425
}
2526

2627
impl Default for StatementConfig {
@@ -33,23 +34,20 @@ impl Default for StatementConfig {
3334
speculative_execution_policy: None,
3435
tracing: false,
3536
timestamp: None,
37+
request_timeout: None,
3638
}
3739
}
3840
}
3941

4042
impl Clone for StatementConfig {
4143
fn clone(&self) -> Self {
4244
Self {
43-
consistency: self.consistency,
44-
serial_consistency: self.serial_consistency,
45-
is_idempotent: self.is_idempotent,
4645
retry_policy: self
4746
.retry_policy
4847
.as_ref()
4948
.map(|policy| policy.clone_boxed()),
5049
speculative_execution_policy: self.speculative_execution_policy.clone(),
51-
tracing: self.tracing,
52-
timestamp: self.timestamp,
50+
..*self
5351
}
5452
}
5553
}

scylla/src/statement/prepared_statement.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use bytes::{BufMut, Bytes, BytesMut};
22
use smallvec::{smallvec, SmallVec};
33
use std::convert::TryInto;
4+
use std::time::Duration;
45
use thiserror::Error;
56
use uuid::Uuid;
67

@@ -247,6 +248,19 @@ impl PreparedStatement {
247248
self.config.timestamp
248249
}
249250

251+
/// Sets the client-side timeout for this statement.
252+
/// If not None, the driver will stop waiting for the request
253+
/// to finish after `timeout` passed.
254+
/// Otherwise, default session client timeout will be applied.
255+
pub fn set_request_timeout(&mut self, timeout: Option<Duration>) {
256+
self.config.request_timeout = timeout
257+
}
258+
259+
/// Gets client timeout associated with this query
260+
pub fn get_request_timeout(&self) -> Option<Duration> {
261+
self.config.request_timeout
262+
}
263+
250264
/// Sets the name of the partitioner used for this statement.
251265
pub(crate) fn set_partitioner_name(&mut self, partitioner_name: Option<&str>) {
252266
self.partitioner_name = match partitioner_name {

scylla/src/statement/query.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::StatementConfig;
22
use crate::frame::types::{Consistency, SerialConsistency};
33
use crate::transport::retry_policy::RetryPolicy;
4+
use std::time::Duration;
45

56
/// CQL query statement.
67
///
@@ -117,6 +118,19 @@ impl Query {
117118
pub fn get_timestamp(&self) -> Option<i64> {
118119
self.config.timestamp
119120
}
121+
122+
/// Sets the client-side timeout for this statement.
123+
/// If not None, the driver will stop waiting for the request
124+
/// to finish after `timeout` passed.
125+
/// Otherwise, default session client timeout will be applied.
126+
pub fn set_request_timeout(&mut self, timeout: Option<Duration>) {
127+
self.config.request_timeout = timeout
128+
}
129+
130+
/// Gets client timeout associated with this query
131+
pub fn get_request_timeout(&self) -> Option<Duration> {
132+
self.config.request_timeout
133+
}
120134
}
121135

122136
impl From<String> for Query {

scylla/src/transport/session.rs

Lines changed: 88 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ pub struct Session {
6464
metrics: Arc<Metrics>,
6565
default_consistency: Consistency,
6666
auto_await_schema_agreement_timeout: Option<Duration>,
67+
request_timeout: Option<Duration>,
6768
}
6869

6970
/// This implementation deliberately omits some details from Cluster in order
@@ -121,7 +122,7 @@ pub struct SessionConfig {
121122
pub auth_password: Option<String>,
122123

123124
pub schema_agreement_interval: Duration,
124-
pub connect_timeout: std::time::Duration,
125+
pub connect_timeout: Duration,
125126

126127
/// Size of the per-node connection pool, i.e. how many connections the driver should keep to each node.
127128
/// The default is `PerShard(1)`, which is the recommended setting for Scylla clusters.
@@ -142,6 +143,10 @@ pub struct SessionConfig {
142143
/// Controls the timeout for the automatic wait for schema agreement after sending a schema-altering statement.
143144
/// If `None`, the automatic schema agreement is disabled.
144145
pub auto_await_schema_agreement_timeout: Option<Duration>,
146+
147+
/// Controls the client-side timeout for queries.
148+
/// If `None`, the queries have no timeout (the driver will block indefinitely).
149+
pub request_timeout: Option<Duration>,
145150
}
146151

147152
/// Describes database server known on Session startup.
@@ -177,13 +182,14 @@ impl SessionConfig {
177182
ssl_context: None,
178183
auth_username: None,
179184
auth_password: None,
180-
connect_timeout: std::time::Duration::from_secs(5),
185+
connect_timeout: Duration::from_secs(5),
181186
connection_pool_size: Default::default(),
182187
disallow_shard_aware_port: false,
183188
default_consistency: Consistency::LocalQuorum,
184189
fetch_schema_metadata: true,
185190
keepalive_interval: None,
186191
auto_await_schema_agreement_timeout: Some(std::time::Duration::from_secs(60)),
192+
request_timeout: Some(Duration::from_secs(30)),
187193
}
188194
}
189195

@@ -375,6 +381,7 @@ impl Session {
375381
metrics: Arc::new(Metrics::new()),
376382
default_consistency: config.default_consistency,
377383
auto_await_schema_agreement_timeout: config.auto_await_schema_agreement_timeout,
384+
request_timeout: config.request_timeout,
378385
};
379386

380387
if let Some(keyspace_name) = config.used_keyspace {
@@ -1109,82 +1116,98 @@ impl Session {
11091116
QueryFut: Future<Output = Result<ResT, QueryError>>,
11101117
ResT: AllowedRunQueryResTType,
11111118
{
1112-
let cluster_data = self.cluster.get_data();
1113-
let query_plan = self.load_balancer.plan(&statement_info, &cluster_data);
1114-
1115-
// If a speculative execution policy is used to run query, query_plan has to be shared
1116-
// between different async functions. This struct helps to wrap query_plan in mutex so it
1117-
// can be shared safely.
1118-
struct SharedPlan<I>
1119-
where
1120-
I: Iterator<Item = Arc<Node>>,
1121-
{
1122-
iter: std::sync::Mutex<I>,
1123-
}
1124-
1125-
impl<I> Iterator for &SharedPlan<I>
1126-
where
1127-
I: Iterator<Item = Arc<Node>>,
1128-
{
1129-
type Item = Arc<Node>;
1130-
1131-
fn next(&mut self) -> Option<Self::Item> {
1132-
self.iter.lock().unwrap().next()
1119+
let runner = async {
1120+
let cluster_data = self.cluster.get_data();
1121+
let query_plan = self.load_balancer.plan(&statement_info, &cluster_data);
1122+
1123+
// If a speculative execution policy is used to run query, query_plan has to be shared
1124+
// between different async functions. This struct helps to wrap query_plan in mutex so it
1125+
// can be shared safely.
1126+
struct SharedPlan<I>
1127+
where
1128+
I: Iterator<Item = Arc<Node>>,
1129+
{
1130+
iter: std::sync::Mutex<I>,
11331131
}
1134-
}
11351132

1136-
let retry_policy = match &statement_config.retry_policy {
1137-
Some(policy) => policy,
1138-
None => &self.retry_policy,
1139-
};
1133+
impl<I> Iterator for &SharedPlan<I>
1134+
where
1135+
I: Iterator<Item = Arc<Node>>,
1136+
{
1137+
type Item = Arc<Node>;
11401138

1141-
#[allow(clippy::unnecessary_lazy_evaluations)]
1142-
let speculative_policy = statement_config
1143-
.speculative_execution_policy
1144-
.as_ref()
1145-
.or_else(|| self.speculative_execution_policy.as_ref());
1139+
fn next(&mut self) -> Option<Self::Item> {
1140+
self.iter.lock().unwrap().next()
1141+
}
1142+
}
11461143

1147-
match speculative_policy {
1148-
Some(speculative) if statement_config.is_idempotent => {
1149-
let shared_query_plan = SharedPlan {
1150-
iter: std::sync::Mutex::new(query_plan),
1151-
};
1144+
let retry_policy = match &statement_config.retry_policy {
1145+
Some(policy) => policy,
1146+
None => &self.retry_policy,
1147+
};
11521148

1153-
let execute_query_generator = || {
1154-
self.execute_query(
1155-
&shared_query_plan,
1149+
#[allow(clippy::unnecessary_lazy_evaluations)]
1150+
let speculative_policy = statement_config
1151+
.speculative_execution_policy
1152+
.as_ref()
1153+
.or_else(|| self.speculative_execution_policy.as_ref());
1154+
1155+
match speculative_policy {
1156+
Some(speculative) if statement_config.is_idempotent => {
1157+
let shared_query_plan = SharedPlan {
1158+
iter: std::sync::Mutex::new(query_plan),
1159+
};
1160+
1161+
let execute_query_generator = || {
1162+
self.execute_query(
1163+
&shared_query_plan,
1164+
statement_config.is_idempotent,
1165+
statement_config.consistency,
1166+
retry_policy.new_session(),
1167+
&choose_connection,
1168+
&do_query,
1169+
)
1170+
};
1171+
1172+
let context = speculative_execution::Context {
1173+
metrics: self.metrics.clone(),
1174+
};
1175+
1176+
speculative_execution::execute(
1177+
speculative.as_ref(),
1178+
&context,
1179+
execute_query_generator,
1180+
)
1181+
.await
1182+
}
1183+
_ => self
1184+
.execute_query(
1185+
query_plan,
11561186
statement_config.is_idempotent,
11571187
statement_config.consistency,
11581188
retry_policy.new_session(),
11591189
&choose_connection,
11601190
&do_query,
11611191
)
1162-
};
1163-
1164-
let context = speculative_execution::Context {
1165-
metrics: self.metrics.clone(),
1166-
};
1167-
1168-
speculative_execution::execute(
1169-
speculative.as_ref(),
1170-
&context,
1171-
execute_query_generator,
1172-
)
1173-
.await
1192+
.await
1193+
.unwrap_or(Err(QueryError::ProtocolError(
1194+
"Empty query plan - driver bug!",
1195+
))),
11741196
}
1175-
_ => self
1176-
.execute_query(
1177-
query_plan,
1178-
statement_config.is_idempotent,
1179-
statement_config.consistency,
1180-
retry_policy.new_session(),
1181-
&choose_connection,
1182-
&do_query,
1183-
)
1197+
};
1198+
1199+
let effective_timeout = statement_config.request_timeout.or(self.request_timeout);
1200+
match effective_timeout {
1201+
Some(timeout) => tokio::time::timeout(timeout, runner)
11841202
.await
1185-
.unwrap_or(Err(QueryError::ProtocolError(
1186-
"Empty query plan - driver bug!",
1187-
))),
1203+
.unwrap_or_else(|e| {
1204+
Err(QueryError::RequestTimeout(format!(
1205+
"Request took longer than {}ms: {}",
1206+
timeout.as_millis(),
1207+
e
1208+
)))
1209+
}),
1210+
None => runner.await,
11881211
}
11891212
}
11901213

0 commit comments

Comments
 (0)