Skip to content

Commit 78fb4de

Browse files
authored
Merge pull request #1457 from wprzytula/request-timeout-improvements
Request timeout improvements
2 parents 26987da + 35c7c10 commit 78fb4de

File tree

4 files changed

+149
-72
lines changed

4 files changed

+149
-72
lines changed

scylla/src/client/execution_profile.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,8 @@ pub struct ExecutionProfileBuilder {
240240
}
241241

242242
impl ExecutionProfileBuilder {
243-
/// Changes client-side timeout.
243+
/// Changes client-side timeout for executing statements.
244+
/// If set to None, the driver will wait indefinitely for a response from the server.
244245
/// The default is 30 seconds.
245246
///
246247
/// # Example

scylla/src/client/pager.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> {
144144
page_query: QueryFunc,
145145

146146
load_balancing_policy: Arc<dyn LoadBalancingPolicy>,
147-
statement_info: RoutingInfo<'a>,
147+
routing_info: RoutingInfo<'a>,
148148
query_is_idempotent: bool,
149149
query_consistency: Consistency,
150150
retry_session: Box<dyn RetrySession>,
@@ -170,7 +170,7 @@ where
170170
// Contract: this function MUST send at least one item through self.sender
171171
async fn work(mut self, cluster_state: Arc<ClusterState>) -> PageSendAttemptedProof {
172172
let load_balancer = Arc::clone(&self.load_balancing_policy);
173-
let statement_info = self.statement_info.clone();
173+
let statement_info = self.routing_info.clone();
174174
let query_plan =
175175
load_balancing::Plan::new(load_balancer.as_ref(), &statement_info, &cluster_state);
176176

@@ -359,7 +359,7 @@ where
359359
self.log_attempt_success();
360360
self.log_request_success();
361361
self.load_balancing_policy
362-
.on_request_success(&self.statement_info, elapsed, node);
362+
.on_request_success(&self.routing_info, elapsed, node);
363363

364364
request_span.record_raw_rows_fields(&rows);
365365

@@ -396,7 +396,7 @@ where
396396
#[cfg(feature = "metrics")]
397397
self.metrics.inc_failed_paged_queries();
398398
self.load_balancing_policy.on_request_failure(
399-
&self.statement_info,
399+
&self.routing_info,
400400
elapsed,
401401
node,
402402
&err,
@@ -424,7 +424,7 @@ where
424424
let err =
425425
RequestAttemptError::UnexpectedResponse(response.response.to_response_kind());
426426
self.load_balancing_policy.on_request_failure(
427-
&self.statement_info,
427+
&self.routing_info,
428428
elapsed,
429429
node,
430430
&err,
@@ -786,7 +786,7 @@ If you are using this API, you are probably doing something wrong."
786786
let worker = PagerWorker {
787787
sender: sender.into(),
788788
page_query,
789-
statement_info: routing_info,
789+
routing_info,
790790
query_is_idempotent: statement.config.is_idempotent,
791791
query_consistency: consistency,
792792
load_balancing_policy,
@@ -914,7 +914,7 @@ If you are using this API, you are probably doing something wrong."
914914
let worker = PagerWorker {
915915
sender: sender.into(),
916916
page_query,
917-
statement_info,
917+
routing_info: statement_info,
918918
query_is_idempotent: config.prepared.config.is_idempotent,
919919
query_consistency: consistency,
920920
load_balancing_policy,

scylla/src/client/session.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2031,7 +2031,14 @@ impl Session {
20312031
|_: tokio::time::error::Elapsed| {
20322032
#[cfg(feature = "metrics")]
20332033
self.metrics.inc_request_timeouts();
2034-
Err(RequestError::RequestTimeout(timeout))
2034+
2035+
let timeout_error = RequestError::RequestTimeout(timeout);
2036+
trace!(
2037+
parent: request_span.span(),
2038+
error = %timeout_error,
2039+
"Request timed out"
2040+
);
2041+
Err(timeout_error)
20352042
},
20362043
),
20372044
None => runner.await,
@@ -2146,15 +2153,15 @@ impl Session {
21462153
};
21472154

21482155
// Use retry policy to decide what to do next
2149-
let query_info = RequestInfo {
2156+
let request_info = RequestInfo {
21502157
error: &request_error,
21512158
is_idempotent: context.is_idempotent,
21522159
consistency: context
21532160
.consistency_set_on_statement
21542161
.unwrap_or(execution_profile.consistency),
21552162
};
21562163

2157-
let retry_decision = context.retry_session.decide_should_retry(query_info);
2164+
let retry_decision = context.retry_session.decide_should_retry(request_info);
21582165
trace!(
21592166
parent: &span,
21602167
retry_decision = ?retry_decision
Lines changed: 130 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,75 +1,144 @@
1-
use std::time::Duration;
1+
use std::{sync::Arc, time::Duration};
22

33
use assert_matches::assert_matches;
44
use scylla::{
5-
client::execution_profile::ExecutionProfile, errors::ExecutionError, statement::Statement,
5+
client::{execution_profile::ExecutionProfile, session_builder::SessionBuilder},
6+
errors::ExecutionError,
7+
statement::{
8+
Statement,
9+
batch::{Batch, BatchType},
10+
},
11+
};
12+
use scylla_proxy::{
13+
Condition, ProxyError, Reaction as _, RequestOpcode, RequestReaction, RequestRule, WorkerError,
614
};
715

8-
use crate::utils::{create_new_session_builder, setup_tracing};
16+
use crate::utils::{setup_tracing, test_with_3_node_cluster};
917

10-
#[ignore = "works on remote Scylla instances only (local ones are too fast)"]
18+
#[cfg_attr(scylla_cloud_tests, ignore)]
1119
#[tokio::test]
1220
async fn test_request_timeout() {
1321
setup_tracing();
1422

15-
let fast_timeouting_profile_handle = ExecutionProfile::builder()
16-
.request_timeout(Some(Duration::from_millis(1)))
17-
.build()
18-
.into_handle();
19-
20-
{
21-
let session = create_new_session_builder().build().await.unwrap();
22-
23-
let mut query: Statement = Statement::new("SELECT * FROM system_schema.tables");
24-
query.set_request_timeout(Some(Duration::from_millis(1)));
25-
match session.query_unpaged(query, &[]).await {
26-
Ok(_) => panic!("the query should have failed due to a client-side timeout"),
27-
Err(e) => assert_matches!(e, ExecutionError::RequestTimeout(_)),
23+
let res = test_with_3_node_cluster(
24+
scylla_proxy::ShardAwareness::QueryNode,
25+
|proxy_uris, translation_map, mut running_proxy| async move {
26+
let short_timeout = Duration::from_millis(1);
27+
let very_long_timeout = Duration::from_secs(10000);
28+
let query_str = "SELECT host_id FROM system.local WHERE key='local'";
29+
30+
let create_session_builder = || {
31+
SessionBuilder::new()
32+
.known_node(proxy_uris[0].as_str())
33+
.address_translator(Arc::new(translation_map.clone()))
34+
};
35+
36+
running_proxy.running_nodes.iter_mut().for_each(|node| {
37+
node.change_request_rules(Some(vec![
38+
RequestRule(
39+
Condition::any([
40+
Condition::RequestOpcode(RequestOpcode::Query),
41+
Condition::RequestOpcode(RequestOpcode::Execute),
42+
Condition::RequestOpcode(RequestOpcode::Batch),
43+
])
44+
.and(Condition::not(Condition::ConnectionRegisteredAnyEvent)),
45+
RequestReaction::delay(10 * short_timeout)
46+
)
47+
]));
48+
});
49+
50+
let session = create_session_builder()
51+
.build()
52+
.await
53+
.unwrap();
54+
55+
// Case 1: per-statement timeouts.
56+
{
57+
let mut query: Statement = Statement::new(query_str);
58+
query.set_request_timeout(Some(short_timeout));
59+
match session.query_unpaged(query, &[]).await {
60+
Ok(_) => panic!("the query should have failed due to a client-side timeout"),
61+
Err(e) => assert_matches!(e, ExecutionError::RequestTimeout(_)),
62+
}
63+
64+
let mut batch: Batch = Batch::new(BatchType::Logged);
65+
batch.set_request_timeout(Some(short_timeout));
66+
match session.batch(&batch, &[][..] as &[()]).await {
67+
Ok(_) => panic!("the batch should have failed due to a client-side timeout"),
68+
Err(e) => assert_matches!(e, ExecutionError::RequestTimeout(_)),
69+
}
70+
71+
let mut prepared = session
72+
.prepare(query_str)
73+
.await
74+
.unwrap();
75+
76+
prepared.set_request_timeout(Some(short_timeout));
77+
match session.execute_unpaged(&prepared, &[]).await {
78+
Ok(_) => {
79+
panic!("the prepared query should have failed due to a client-side timeout")
80+
}
81+
Err(e) => assert_matches!(e, ExecutionError::RequestTimeout(_)),
82+
};
83+
}
84+
85+
// Case 2: tight session-level timeout, overridden by per-statement long timeouts.
86+
{
87+
let fast_timeouting_profile = ExecutionProfile::builder()
88+
.request_timeout(Some(short_timeout))
89+
.build();
90+
91+
// Although this `clone()` looks suspicious, it is necessary to get an owned handle to the profile
92+
// in order to call `map_to_another_profile()`, which requires a mutable reference.
93+
// The profile handle itself is just a reference-counted pointer, so cloning it is cheap,
94+
// and the remapping affects the session too.
95+
session.get_default_execution_profile_handle().clone().map_to_another_profile(fast_timeouting_profile);
96+
97+
let mut query = Statement::new(query_str);
98+
99+
match session.query_unpaged(query.clone(), &[]).await {
100+
Ok(_) => panic!("the query should have failed due to a client-side timeout"),
101+
Err(e) => assert_matches!(e, ExecutionError::RequestTimeout(_)),
102+
};
103+
104+
query.set_request_timeout(Some(very_long_timeout));
105+
106+
session.query_unpaged(query, &[]).await.expect(
107+
"the query should have not failed, because no client-side timeout was specified",
108+
);
109+
110+
let mut batch: Batch = Batch::new(BatchType::Logged);
111+
match session.batch(&batch, &[][..] as &[()]).await {
112+
Ok(_) => panic!("the batch should have failed due to a client-side timeout"),
113+
Err(e) => assert_matches!(e, ExecutionError::RequestTimeout(_)),
114+
}
115+
116+
batch.set_request_timeout(Some(very_long_timeout));
117+
session.batch(&batch, &[][..] as &[()]).await.expect("the batch should have not failed, because no client-side timeout was specified");
118+
119+
let mut prepared = session
120+
.prepare(query_str)
121+
.await
122+
.unwrap();
123+
124+
match session.execute_unpaged(&prepared, &[]).await {
125+
Ok(_) => {
126+
panic!("the prepared query should have failed due to a client-side timeout")
127+
}
128+
Err(e) => assert_matches!(e, ExecutionError::RequestTimeout(_)),
129+
};
130+
131+
prepared.set_request_timeout(Some(very_long_timeout));
132+
133+
session.execute_unpaged(&prepared, &[]).await.expect("the prepared query should have not failed, because no client-side timeout was specified");
134+
}
135+
running_proxy
28136
}
137+
).await;
29138

30-
let mut prepared = session
31-
.prepare("SELECT * FROM system_schema.tables")
32-
.await
33-
.unwrap();
34-
35-
prepared.set_request_timeout(Some(Duration::from_millis(1)));
36-
match session.execute_unpaged(&prepared, &[]).await {
37-
Ok(_) => panic!("the prepared query should have failed due to a client-side timeout"),
38-
Err(e) => assert_matches!(e, ExecutionError::RequestTimeout(_)),
39-
};
40-
}
41-
{
42-
let timeouting_session = create_new_session_builder()
43-
.default_execution_profile_handle(fast_timeouting_profile_handle)
44-
.build()
45-
.await
46-
.unwrap();
47-
48-
let mut query = Statement::new("SELECT * FROM system_schema.tables");
49-
50-
match timeouting_session.query_unpaged(query.clone(), &[]).await {
51-
Ok(_) => panic!("the query should have failed due to a client-side timeout"),
52-
Err(e) => assert_matches!(e, ExecutionError::RequestTimeout(_)),
53-
};
54-
55-
query.set_request_timeout(Some(Duration::from_secs(10000)));
56-
57-
timeouting_session.query_unpaged(query, &[]).await.expect(
58-
"the query should have not failed, because no client-side timeout was specified",
59-
);
60-
61-
let mut prepared = timeouting_session
62-
.prepare("SELECT * FROM system_schema.tables")
63-
.await
64-
.unwrap();
65-
66-
match timeouting_session.execute_unpaged(&prepared, &[]).await {
67-
Ok(_) => panic!("the prepared query should have failed due to a client-side timeout"),
68-
Err(e) => assert_matches!(e, ExecutionError::RequestTimeout(_)),
69-
};
70-
71-
prepared.set_request_timeout(Some(Duration::from_secs(10000)));
72-
73-
timeouting_session.execute_unpaged(&prepared, &[]).await.expect("the prepared query should have not failed, because no client-side timeout was specified");
139+
match res {
140+
Ok(()) => (),
141+
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
142+
Err(err) => panic!("{}", err),
74143
}
75144
}

0 commit comments

Comments
 (0)