Skip to content

Commit f80c74e

Browse files
committed
session: made run_query() subject of timeouts
The whole run_query() was made a one big async block, which is now polled together with a timeout using tokio timeout. Once a timeout elapses, the query is cancelled and QueryError::RequestTimeout is returned.
1 parent f50667b commit f80c74e

File tree

1 file changed

+79
-64
lines changed

1 file changed

+79
-64
lines changed

scylla/src/transport/session.rs

Lines changed: 79 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ pub struct Session {
6363
metrics: Arc<Metrics>,
6464
default_consistency: Consistency,
6565
auto_await_schema_agreement_timeout: Option<Duration>,
66-
#[allow(dead_code)]
6766
request_timeout: Option<Duration>,
6867
}
6968

@@ -1113,82 +1112,98 @@ impl Session {
11131112
ConnFut: Future<Output = Result<Arc<Connection>, QueryError>>,
11141113
QueryFut: Future<Output = Result<ResT, QueryError>>,
11151114
{
1116-
let cluster_data = self.cluster.get_data();
1117-
let query_plan = self.load_balancer.plan(&statement_info, &cluster_data);
1118-
1119-
// If a speculative execution policy is used to run query, query_plan has to be shared
1120-
// between different async functions. This struct helps to wrap query_plan in mutex so it
1121-
// can be shared safely.
1122-
struct SharedPlan<I>
1123-
where
1124-
I: Iterator<Item = Arc<Node>>,
1125-
{
1126-
iter: std::sync::Mutex<I>,
1127-
}
1128-
1129-
impl<I> Iterator for &SharedPlan<I>
1130-
where
1131-
I: Iterator<Item = Arc<Node>>,
1132-
{
1133-
type Item = Arc<Node>;
1134-
1135-
fn next(&mut self) -> Option<Self::Item> {
1136-
self.iter.lock().unwrap().next()
1115+
let runner = async {
1116+
let cluster_data = self.cluster.get_data();
1117+
let query_plan = self.load_balancer.plan(&statement_info, &cluster_data);
1118+
1119+
// If a speculative execution policy is used to run query, query_plan has to be shared
1120+
// between different async functions. This struct helps to wrap query_plan in mutex so it
1121+
// can be shared safely.
1122+
struct SharedPlan<I>
1123+
where
1124+
I: Iterator<Item = Arc<Node>>,
1125+
{
1126+
iter: std::sync::Mutex<I>,
11371127
}
1138-
}
11391128

1140-
let retry_policy = match &statement_config.retry_policy {
1141-
Some(policy) => policy,
1142-
None => &self.retry_policy,
1143-
};
1129+
impl<I> Iterator for &SharedPlan<I>
1130+
where
1131+
I: Iterator<Item = Arc<Node>>,
1132+
{
1133+
type Item = Arc<Node>;
11441134

1145-
#[allow(clippy::unnecessary_lazy_evaluations)]
1146-
let speculative_policy = statement_config
1147-
.speculative_execution_policy
1148-
.as_ref()
1149-
.or_else(|| self.speculative_execution_policy.as_ref());
1135+
fn next(&mut self) -> Option<Self::Item> {
1136+
self.iter.lock().unwrap().next()
1137+
}
1138+
}
11501139

1151-
match speculative_policy {
1152-
Some(speculative) if statement_config.is_idempotent => {
1153-
let shared_query_plan = SharedPlan {
1154-
iter: std::sync::Mutex::new(query_plan),
1155-
};
1140+
let retry_policy = match &statement_config.retry_policy {
1141+
Some(policy) => policy,
1142+
None => &self.retry_policy,
1143+
};
11561144

1157-
let execute_query_generator = || {
1158-
self.execute_query(
1159-
&shared_query_plan,
1145+
#[allow(clippy::unnecessary_lazy_evaluations)]
1146+
let speculative_policy = statement_config
1147+
.speculative_execution_policy
1148+
.as_ref()
1149+
.or_else(|| self.speculative_execution_policy.as_ref());
1150+
1151+
match speculative_policy {
1152+
Some(speculative) if statement_config.is_idempotent => {
1153+
let shared_query_plan = SharedPlan {
1154+
iter: std::sync::Mutex::new(query_plan),
1155+
};
1156+
1157+
let execute_query_generator = || {
1158+
self.execute_query(
1159+
&shared_query_plan,
1160+
statement_config.is_idempotent,
1161+
statement_config.consistency,
1162+
retry_policy.new_session(),
1163+
&choose_connection,
1164+
&do_query,
1165+
)
1166+
};
1167+
1168+
let context = speculative_execution::Context {
1169+
metrics: self.metrics.clone(),
1170+
};
1171+
1172+
speculative_execution::execute(
1173+
speculative.as_ref(),
1174+
&context,
1175+
execute_query_generator,
1176+
)
1177+
.await
1178+
}
1179+
_ => self
1180+
.execute_query(
1181+
query_plan,
11601182
statement_config.is_idempotent,
11611183
statement_config.consistency,
11621184
retry_policy.new_session(),
11631185
&choose_connection,
11641186
&do_query,
11651187
)
1166-
};
1167-
1168-
let context = speculative_execution::Context {
1169-
metrics: self.metrics.clone(),
1170-
};
1171-
1172-
speculative_execution::execute(
1173-
speculative.as_ref(),
1174-
&context,
1175-
execute_query_generator,
1176-
)
1177-
.await
1188+
.await
1189+
.unwrap_or(Err(QueryError::ProtocolError(
1190+
"Empty query plan - driver bug!",
1191+
))),
11781192
}
1179-
_ => self
1180-
.execute_query(
1181-
query_plan,
1182-
statement_config.is_idempotent,
1183-
statement_config.consistency,
1184-
retry_policy.new_session(),
1185-
&choose_connection,
1186-
&do_query,
1187-
)
1193+
};
1194+
1195+
let effective_timeout = statement_config.request_timeout.or(self.request_timeout);
1196+
match effective_timeout {
1197+
Some(timeout) => tokio::time::timeout(timeout, runner)
11881198
.await
1189-
.unwrap_or(Err(QueryError::ProtocolError(
1190-
"Empty query plan - driver bug!",
1191-
))),
1199+
.unwrap_or_else(|e| {
1200+
Err(QueryError::RequestTimeout(format!(
1201+
"Request took longer than {}ms: {}",
1202+
timeout.as_millis(),
1203+
e
1204+
)))
1205+
}),
1206+
None => runner.await,
11921207
}
11931208
}
11941209

0 commit comments

Comments
 (0)