Skip to content

Commit d9245ef

Browse files
committed
integration: write test for timeouts in pager
As QueryPager has now reached feature parity with other execution methods wrt timeouts, we need to test that timeouts are properly enforced when fetching pages. The test uses scylla-proxy to simulate delayed responses from the server, causing timeouts to occur in various scenarios: - the first page fetch times out (manifests in `execute_iter()`); - the second page fetch times out (manifests in `rows_stream().next()`); - retries cause cumulative delay exceeding the timeout, leading to timeout.
1 parent ad65fd3 commit d9245ef

File tree

1 file changed

+192
-5
lines changed
  • scylla/tests/integration/session

1 file changed

+192
-5
lines changed

scylla/tests/integration/session/pager.rs

Lines changed: 192 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,33 @@
1-
use std::sync::{
2-
Arc,
3-
atomic::{AtomicBool, Ordering},
1+
use std::{
2+
sync::{
3+
Arc,
4+
atomic::{AtomicBool, Ordering},
5+
},
6+
time::Duration,
47
};
58

69
use assert_matches::assert_matches;
710
use futures::{StreamExt as _, TryStreamExt as _};
8-
use scylla::errors::{NextPageError, NextRowError};
911
use scylla::{
1012
client::execution_profile::ExecutionProfile,
1113
policies::retry::{RequestInfo, RetryDecision, RetryPolicy, RetrySession},
1214
statement::Statement,
1315
value::Row,
1416
};
17+
use scylla::{
18+
client::{session::Session, session_builder::SessionBuilder},
19+
errors::{NextPageError, NextRowError, PagerExecutionError, RequestError},
20+
};
1521
use scylla_cql::Consistency;
22+
use scylla_proxy::{
23+
Condition, ProxyError, Reaction as _, RequestOpcode, RequestReaction, RequestRule, WorkerError,
24+
example_db_errors,
25+
};
26+
use tracing::info;
1627

1728
use crate::utils::{
1829
PerformDDL as _, create_new_session_builder, scylla_supports_tablets, setup_tracing,
19-
unique_keyspace_name,
30+
test_with_3_node_cluster, unique_keyspace_name,
2031
};
2132

2233
// Reproduces the problem with execute_iter mentioned in #608.
@@ -220,3 +231,179 @@ async fn test_iter_methods_when_altering_table() {
220231

221232
session.ddl(format!("DROP KEYSPACE {ks}")).await.unwrap();
222233
}
234+
235+
#[tokio::test]
236+
#[cfg_attr(scylla_cloud_tests, ignore)]
237+
async fn test_pager_timeouts() {
238+
setup_tracing();
239+
240+
let res = test_with_3_node_cluster(
241+
scylla_proxy::ShardAwareness::QueryNode,
242+
|proxy_uris, translation_map, mut running_proxy| async move {
243+
/* Prepare phase */
244+
let ks = unique_keyspace_name();
245+
246+
let session: Session = SessionBuilder::new()
247+
.known_node(proxy_uris[0].as_str())
248+
.address_translator(Arc::new(translation_map))
249+
.build()
250+
.await
251+
.unwrap();
252+
253+
session
254+
.ddl(format!(
255+
"CREATE KEYSPACE IF NOT EXISTS {ks} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}"
256+
))
257+
.await
258+
.unwrap();
259+
session.use_keyspace(ks.clone(), true).await.unwrap();
260+
261+
session
262+
.ddl("CREATE TABLE IF NOT EXISTS t (a int PRIMARY KEY)")
263+
.await
264+
.unwrap();
265+
266+
for i in 0..5 {
267+
session
268+
.query_unpaged("INSERT INTO t (a) VALUES (?)", (i,))
269+
.await
270+
.unwrap();
271+
}
272+
273+
let mut prepared = session.prepare("SELECT a FROM t").await.unwrap();
274+
// Important to have multiple pages.
275+
prepared.set_page_size(1);
276+
// Important for retries to fire.
277+
prepared.set_is_idempotent(true);
278+
279+
/* Test phase */
280+
281+
// Case 1: the first page fetch times out.
282+
{
283+
let timeout = Duration::from_secs(1);
284+
prepared.set_request_timeout(Some(timeout));
285+
286+
running_proxy.running_nodes.iter_mut().for_each(|node| {
287+
node.change_request_rules(Some(vec![
288+
RequestRule(
289+
Condition::RequestOpcode(RequestOpcode::Execute)
290+
.and(Condition::not(Condition::ConnectionRegisteredAnyEvent)),
291+
RequestReaction::delay(timeout + Duration::from_secs(1))
292+
)
293+
]));
294+
});
295+
296+
let pager_err = session.execute_iter(prepared.clone(), ()).await.unwrap_err();
297+
let PagerExecutionError::NextPageError(NextPageError::RequestFailure(
298+
RequestError::RequestTimeout(got_timeout),
299+
)) = pager_err
300+
else {
301+
panic!("Expected RequestTimeout error, got: {:?}", pager_err);
302+
};
303+
assert_eq!(got_timeout, timeout);
304+
info!("Case 1 passed.");
305+
}
306+
307+
// Case 2: the second page fetch times out.
308+
{
309+
let timeout = Duration::from_secs(1);
310+
prepared.set_request_timeout(Some(timeout));
311+
312+
running_proxy.running_nodes.iter_mut().for_each(|node| {
313+
node.change_request_rules(Some(vec![
314+
// Pass one frame, then delay all subsequent ones.
315+
RequestRule(
316+
Condition::RequestOpcode(RequestOpcode::Execute)
317+
.and(Condition::not(Condition::ConnectionRegisteredAnyEvent))
318+
.and(Condition::TrueForLimitedTimes(1)),
319+
RequestReaction::noop()
320+
),
321+
RequestRule(
322+
Condition::RequestOpcode(RequestOpcode::Execute)
323+
.and(Condition::not(Condition::ConnectionRegisteredAnyEvent)),
324+
RequestReaction::delay(timeout + Duration::from_secs(1))
325+
)
326+
]));
327+
});
328+
329+
let mut row_stream = session
330+
.execute_iter(prepared.clone(), ())
331+
.await
332+
.unwrap()
333+
.rows_stream::<(i32,)>()
334+
.unwrap();
335+
336+
// Observation that is not critical to the test, but good to note:
337+
// at this point, at most two pages have been fetched:
338+
// - the first page, fetched eagerly by execute_iter;
339+
// - possibly the second page, fetched lazily by rows_stream;
340+
// - no more pages may have been fetched yet, because the second page would be
341+
// stuck on channel.send(), waiting for us to consume the first row.
342+
343+
// First page (1 row) must have been fetched successfully.
344+
let (_a,) = row_stream.next().await.unwrap().unwrap();
345+
346+
// The second page fetch must time out.
347+
let row_err = row_stream.next().await.unwrap().unwrap_err();
348+
let NextRowError::NextPageError(NextPageError::RequestFailure(
349+
RequestError::RequestTimeout(got_timeout),
350+
)) = row_err
351+
else {
352+
panic!("Expected RequestTimeout error, got: {:?}", row_err);
353+
};
354+
assert_eq!(got_timeout, timeout);
355+
info!("Case 2 passed.");
356+
}
357+
358+
// Case 3: retries' cumulative duration exceed the timeout.
359+
{
360+
// Here, each retry will be delayed by 200ms.
361+
// With a 500ms timeout, this means that after 3 retries (600ms total delay),
362+
// the timeout will be exceeded.
363+
let per_retry_delay = Duration::from_millis(200);
364+
let timeout = Duration::from_millis(500);
365+
366+
// Set timeout through the execution profile.
367+
{
368+
let profile = ExecutionProfile::builder().request_timeout(Some(timeout)).build();
369+
let handle = profile.into_handle();
370+
prepared.set_execution_profile_handle(Some(handle));
371+
prepared.set_request_timeout(None);
372+
}
373+
374+
running_proxy.running_nodes.iter_mut().for_each(|node| {
375+
node.change_request_rules(Some(vec![
376+
RequestRule(
377+
Condition::RequestOpcode(RequestOpcode::Execute)
378+
.and(Condition::not(Condition::ConnectionRegisteredAnyEvent)),
379+
RequestReaction::forge_with_error_lazy_delay(
380+
Box::new(example_db_errors::overloaded),
381+
Some(per_retry_delay))
382+
)
383+
]));
384+
});
385+
386+
let pager_err = session.execute_iter(prepared, ()).await.unwrap_err();
387+
let PagerExecutionError::NextPageError(NextPageError::RequestFailure(
388+
RequestError::RequestTimeout(got_timeout),
389+
)) = pager_err
390+
else {
391+
panic!("Expected RequestTimeout error, got: {:?}", pager_err);
392+
};
393+
assert_eq!(got_timeout, timeout);
394+
info!("Case 3 passed.");
395+
}
396+
397+
/* Teardown */
398+
session.ddl(format!("DROP KEYSPACE {ks}")).await.unwrap();
399+
400+
running_proxy
401+
},
402+
).await;
403+
404+
match res {
405+
Ok(()) => (),
406+
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
407+
Err(err) => panic!("{}", err),
408+
}
409+
}

0 commit comments

Comments
 (0)