|
1 | | -use std::sync::{ |
2 | | - Arc, |
3 | | - atomic::{AtomicBool, Ordering}, |
| 1 | +use std::{ |
| 2 | + sync::{ |
| 3 | + Arc, |
| 4 | + atomic::{AtomicBool, Ordering}, |
| 5 | + }, |
| 6 | + time::Duration, |
4 | 7 | }; |
5 | 8 |
|
6 | 9 | use assert_matches::assert_matches; |
7 | 10 | use futures::{StreamExt as _, TryStreamExt as _}; |
8 | | -use scylla::errors::{NextPageError, NextRowError}; |
9 | 11 | use scylla::{ |
10 | 12 | client::execution_profile::ExecutionProfile, |
11 | 13 | policies::retry::{RequestInfo, RetryDecision, RetryPolicy, RetrySession}, |
12 | 14 | statement::Statement, |
13 | 15 | value::Row, |
14 | 16 | }; |
| 17 | +use scylla::{ |
| 18 | + client::{session::Session, session_builder::SessionBuilder}, |
| 19 | + errors::{NextPageError, NextRowError, PagerExecutionError, RequestError}, |
| 20 | +}; |
15 | 21 | use scylla_cql::Consistency; |
| 22 | +use scylla_proxy::{ |
| 23 | + Condition, ProxyError, Reaction as _, RequestOpcode, RequestReaction, RequestRule, WorkerError, |
| 24 | + example_db_errors, |
| 25 | +}; |
| 26 | +use tracing::info; |
16 | 27 |
|
17 | 28 | use crate::utils::{ |
18 | 29 | PerformDDL as _, create_new_session_builder, scylla_supports_tablets, setup_tracing, |
19 | | - unique_keyspace_name, |
| 30 | + test_with_3_node_cluster, unique_keyspace_name, |
20 | 31 | }; |
21 | 32 |
|
22 | 33 | // Reproduces the problem with execute_iter mentioned in #608. |
@@ -220,3 +231,179 @@ async fn test_iter_methods_when_altering_table() { |
220 | 231 |
|
221 | 232 | session.ddl(format!("DROP KEYSPACE {ks}")).await.unwrap(); |
222 | 233 | } |
| 234 | + |
| 235 | +#[tokio::test] |
| 236 | +#[cfg_attr(scylla_cloud_tests, ignore)] |
| 237 | +async fn test_pager_timeouts() { |
| 238 | + setup_tracing(); |
| 239 | + |
| 240 | + let res = test_with_3_node_cluster( |
| 241 | + scylla_proxy::ShardAwareness::QueryNode, |
| 242 | + |proxy_uris, translation_map, mut running_proxy| async move { |
| 243 | + /* Prepare phase */ |
| 244 | + let ks = unique_keyspace_name(); |
| 245 | + |
| 246 | + let session: Session = SessionBuilder::new() |
| 247 | + .known_node(proxy_uris[0].as_str()) |
| 248 | + .address_translator(Arc::new(translation_map)) |
| 249 | + .build() |
| 250 | + .await |
| 251 | + .unwrap(); |
| 252 | + |
| 253 | + session |
| 254 | + .ddl(format!( |
| 255 | + "CREATE KEYSPACE IF NOT EXISTS {ks} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}" |
| 256 | + )) |
| 257 | + .await |
| 258 | + .unwrap(); |
| 259 | + session.use_keyspace(ks.clone(), true).await.unwrap(); |
| 260 | + |
| 261 | + session |
| 262 | + .ddl("CREATE TABLE IF NOT EXISTS t (a int PRIMARY KEY)") |
| 263 | + .await |
| 264 | + .unwrap(); |
| 265 | + |
| 266 | + for i in 0..5 { |
| 267 | + session |
| 268 | + .query_unpaged("INSERT INTO t (a) VALUES (?)", (i,)) |
| 269 | + .await |
| 270 | + .unwrap(); |
| 271 | + } |
| 272 | + |
| 273 | + let mut prepared = session.prepare("SELECT a FROM t").await.unwrap(); |
| 274 | + // Important to have multiple pages. |
| 275 | + prepared.set_page_size(1); |
| 276 | + // Important for retries to fire. |
| 277 | + prepared.set_is_idempotent(true); |
| 278 | + |
| 279 | + /* Test phase */ |
| 280 | + |
| 281 | + // Case 1: the first page fetch times out. |
| 282 | + { |
| 283 | + let timeout = Duration::from_millis(100); |
| 284 | + prepared.set_request_timeout(Some(timeout)); |
| 285 | + |
| 286 | + running_proxy.running_nodes.iter_mut().for_each(|node| { |
| 287 | + node.change_request_rules(Some(vec![ |
| 288 | + RequestRule( |
| 289 | + Condition::RequestOpcode(RequestOpcode::Execute) |
| 290 | + .and(Condition::not(Condition::ConnectionRegisteredAnyEvent)), |
| 291 | + RequestReaction::delay(timeout + Duration::from_millis(10)) |
| 292 | + ) |
| 293 | + ])); |
| 294 | + }); |
| 295 | + |
| 296 | + let pager_err = session.execute_iter(prepared.clone(), ()).await.unwrap_err(); |
| 297 | + let PagerExecutionError::NextPageError(NextPageError::RequestFailure( |
| 298 | + RequestError::RequestTimeout(got_timeout), |
| 299 | + )) = pager_err |
| 300 | + else { |
| 301 | + panic!("Expected RequestTimeout error, got: {:?}", pager_err); |
| 302 | + }; |
| 303 | + assert_eq!(got_timeout, timeout); |
| 304 | + info!("Case 1 passed."); |
| 305 | + } |
| 306 | + |
| 307 | + // Case 2: the second page fetch times out. |
| 308 | + { |
| 309 | + let timeout = Duration::from_millis(100); |
| 310 | + prepared.set_request_timeout(Some(timeout)); |
| 311 | + |
| 312 | + running_proxy.running_nodes.iter_mut().for_each(|node| { |
| 313 | + node.change_request_rules(Some(vec![ |
| 314 | + // Pass one frame, then delay all subsequent ones. |
| 315 | + RequestRule( |
| 316 | + Condition::RequestOpcode(RequestOpcode::Execute) |
| 317 | + .and(Condition::not(Condition::ConnectionRegisteredAnyEvent)) |
| 318 | + .and(Condition::TrueForLimitedTimes(1)), |
| 319 | + RequestReaction::noop() |
| 320 | + ), |
| 321 | + RequestRule( |
| 322 | + Condition::RequestOpcode(RequestOpcode::Execute) |
| 323 | + .and(Condition::not(Condition::ConnectionRegisteredAnyEvent)), |
| 324 | + RequestReaction::delay(timeout + Duration::from_millis(10)) |
| 325 | + ) |
| 326 | + ])); |
| 327 | + }); |
| 328 | + |
| 329 | + let mut row_stream = session |
| 330 | + .execute_iter(prepared.clone(), ()) |
| 331 | + .await |
| 332 | + .unwrap() |
| 333 | + .rows_stream::<(i32,)>() |
| 334 | + .unwrap(); |
| 335 | + |
| 336 | + // Observation that is not critical to the test, but good to note: |
| 337 | + // at this point, at most two pages have been fetched: |
| 338 | + // - the first page, fetched eagerly by execute_iter; |
| 339 | + // - possibly the second page, fetched lazily by rows_stream; |
| 340 | + // - no more pages may have been fetched yet, because the second page would be |
| 341 | + // stuck on channel.send(), waiting for us to consume the first row. |
| 342 | + |
| 343 | + // First page (1 row) must have been fetched successfully. |
| 344 | + let (_a,) = row_stream.next().await.unwrap().unwrap(); |
| 345 | + |
| 346 | + // The second page fetch must time out. |
| 347 | + let row_err = row_stream.next().await.unwrap().unwrap_err(); |
| 348 | + let NextRowError::NextPageError(NextPageError::RequestFailure( |
| 349 | + RequestError::RequestTimeout(got_timeout), |
| 350 | + )) = row_err |
| 351 | + else { |
| 352 | + panic!("Expected RequestTimeout error, got: {:?}", row_err); |
| 353 | + }; |
| 354 | + assert_eq!(got_timeout, timeout); |
| 355 | + info!("Case 2 passed."); |
| 356 | + } |
| 357 | + |
| 358 | + // Case 3: retries' cumulative duration exceed the timeout. |
| 359 | + { |
| 360 | + // Here, each retry will be delayed by 40ms. |
| 361 | + // With a 100ms timeout, this means that after 3 retries (120ms total delay), |
| 362 | + // the timeout will be exceeded. |
| 363 | + let per_retry_delay = Duration::from_millis(20); |
| 364 | + let timeout = Duration::from_millis(100); |
| 365 | + |
| 366 | + // Set timeout through the execution profile. |
| 367 | + { |
| 368 | + let profile = ExecutionProfile::builder().request_timeout(Some(timeout)).build(); |
| 369 | + let handle = profile.into_handle(); |
| 370 | + prepared.set_execution_profile_handle(Some(handle)); |
| 371 | + prepared.set_request_timeout(None); |
| 372 | + } |
| 373 | + |
| 374 | + running_proxy.running_nodes.iter_mut().for_each(|node| { |
| 375 | + node.change_request_rules(Some(vec![ |
| 376 | + RequestRule( |
| 377 | + Condition::RequestOpcode(RequestOpcode::Execute) |
| 378 | + .and(Condition::not(Condition::ConnectionRegisteredAnyEvent)), |
| 379 | + RequestReaction::forge_with_error_lazy_delay( |
| 380 | + Box::new(example_db_errors::overloaded), |
| 381 | + Some(per_retry_delay)) |
| 382 | + ) |
| 383 | + ])); |
| 384 | + }); |
| 385 | + |
| 386 | + let pager_err = session.execute_iter(prepared, ()).await.unwrap_err(); |
| 387 | + let PagerExecutionError::NextPageError(NextPageError::RequestFailure( |
| 388 | + RequestError::RequestTimeout(got_timeout), |
| 389 | + )) = pager_err |
| 390 | + else { |
| 391 | + panic!("Expected RequestTimeout error, got: {:?}", pager_err); |
| 392 | + }; |
| 393 | + assert_eq!(got_timeout, timeout); |
| 394 | + info!("Case 3 passed."); |
| 395 | + } |
| 396 | + |
| 397 | + /* Teardown */ |
| 398 | + session.ddl(format!("DROP KEYSPACE {ks}")).await.unwrap(); |
| 399 | + |
| 400 | + running_proxy |
| 401 | + }, |
| 402 | + ).await; |
| 403 | + |
| 404 | + match res { |
| 405 | + Ok(()) => (), |
| 406 | + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (), |
| 407 | + Err(err) => panic!("{}", err), |
| 408 | + } |
| 409 | +} |
0 commit comments