| 
1 |  | -use std::sync::{  | 
2 |  | -    Arc,  | 
3 |  | -    atomic::{AtomicBool, Ordering},  | 
 | 1 | +use std::{  | 
 | 2 | +    sync::{  | 
 | 3 | +        Arc,  | 
 | 4 | +        atomic::{AtomicBool, Ordering},  | 
 | 5 | +    },  | 
 | 6 | +    time::Duration,  | 
4 | 7 | };  | 
5 | 8 | 
 
  | 
6 | 9 | use assert_matches::assert_matches;  | 
7 | 10 | use futures::{StreamExt as _, TryStreamExt as _};  | 
8 |  | -use scylla::errors::{NextPageError, NextRowError};  | 
9 | 11 | use scylla::{  | 
10 | 12 |     client::execution_profile::ExecutionProfile,  | 
11 | 13 |     policies::retry::{RequestInfo, RetryDecision, RetryPolicy, RetrySession},  | 
12 | 14 |     statement::Statement,  | 
13 | 15 |     value::Row,  | 
14 | 16 | };  | 
 | 17 | +use scylla::{  | 
 | 18 | +    client::{session::Session, session_builder::SessionBuilder},  | 
 | 19 | +    errors::{NextPageError, NextRowError, PagerExecutionError, RequestError},  | 
 | 20 | +};  | 
15 | 21 | use scylla_cql::Consistency;  | 
 | 22 | +use scylla_proxy::{  | 
 | 23 | +    Condition, ProxyError, Reaction as _, RequestOpcode, RequestReaction, RequestRule, WorkerError,  | 
 | 24 | +    example_db_errors,  | 
 | 25 | +};  | 
 | 26 | +use tracing::info;  | 
16 | 27 | 
 
  | 
17 | 28 | use crate::utils::{  | 
18 | 29 |     PerformDDL as _, create_new_session_builder, scylla_supports_tablets, setup_tracing,  | 
19 |  | -    unique_keyspace_name,  | 
 | 30 | +    test_with_3_node_cluster, unique_keyspace_name,  | 
20 | 31 | };  | 
21 | 32 | 
 
  | 
22 | 33 | // Reproduces the problem with execute_iter mentioned in #608.  | 
@@ -220,3 +231,179 @@ async fn test_iter_methods_when_altering_table() {  | 
220 | 231 | 
 
  | 
221 | 232 |     session.ddl(format!("DROP KEYSPACE {ks}")).await.unwrap();  | 
222 | 233 | }  | 
 | 234 | + | 
 | 235 | +#[tokio::test]  | 
 | 236 | +#[cfg_attr(scylla_cloud_tests, ignore)]  | 
 | 237 | +async fn test_pager_timeouts() {  | 
 | 238 | +    setup_tracing();  | 
 | 239 | + | 
 | 240 | +    let res = test_with_3_node_cluster(  | 
 | 241 | +        scylla_proxy::ShardAwareness::QueryNode,  | 
 | 242 | +        |proxy_uris, translation_map, mut running_proxy| async move {  | 
 | 243 | +            /* Prepare phase */  | 
 | 244 | +            let ks = unique_keyspace_name();  | 
 | 245 | + | 
 | 246 | +            let session: Session = SessionBuilder::new()  | 
 | 247 | +                .known_node(proxy_uris[0].as_str())  | 
 | 248 | +                .address_translator(Arc::new(translation_map))  | 
 | 249 | +                .build()  | 
 | 250 | +                .await  | 
 | 251 | +                .unwrap();  | 
 | 252 | + | 
 | 253 | +            session  | 
 | 254 | +            .ddl(format!(  | 
 | 255 | +                "CREATE KEYSPACE IF NOT EXISTS {ks} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}"  | 
 | 256 | +            ))  | 
 | 257 | +            .await  | 
 | 258 | +            .unwrap();  | 
 | 259 | +            session.use_keyspace(ks.clone(), true).await.unwrap();  | 
 | 260 | + | 
 | 261 | +            session  | 
 | 262 | +                .ddl("CREATE TABLE IF NOT EXISTS t (a int PRIMARY KEY)")  | 
 | 263 | +                .await  | 
 | 264 | +                .unwrap();  | 
 | 265 | + | 
 | 266 | +            for i in 0..5 {  | 
 | 267 | +                session  | 
 | 268 | +                    .query_unpaged("INSERT INTO t (a) VALUES (?)", (i,))  | 
 | 269 | +                    .await  | 
 | 270 | +                    .unwrap();  | 
 | 271 | +            }  | 
 | 272 | + | 
 | 273 | +            let mut prepared = session.prepare("SELECT a FROM t").await.unwrap();  | 
 | 274 | +            // Important to have multiple pages.  | 
 | 275 | +            prepared.set_page_size(1);  | 
 | 276 | +            // Important for retries to fire.  | 
 | 277 | +            prepared.set_is_idempotent(true);  | 
 | 278 | + | 
 | 279 | +            /* Test phase */  | 
 | 280 | + | 
 | 281 | +            // Case 1: the first page fetch times out.  | 
 | 282 | +            {  | 
 | 283 | +                let timeout = Duration::from_secs(1);  | 
 | 284 | +                prepared.set_request_timeout(Some(timeout));  | 
 | 285 | + | 
 | 286 | +                running_proxy.running_nodes.iter_mut().for_each(|node| {  | 
 | 287 | +                    node.change_request_rules(Some(vec![  | 
 | 288 | +                        RequestRule(  | 
 | 289 | +                            Condition::RequestOpcode(RequestOpcode::Execute)  | 
 | 290 | +                                .and(Condition::not(Condition::ConnectionRegisteredAnyEvent)),  | 
 | 291 | +                            RequestReaction::delay(timeout + Duration::from_secs(1))  | 
 | 292 | +                        )  | 
 | 293 | +                    ]));  | 
 | 294 | +                });  | 
 | 295 | + | 
 | 296 | +                let pager_err = session.execute_iter(prepared.clone(), ()).await.unwrap_err();  | 
 | 297 | +                let PagerExecutionError::NextPageError(NextPageError::RequestFailure(  | 
 | 298 | +                    RequestError::RequestTimeout(got_timeout),  | 
 | 299 | +                )) = pager_err  | 
 | 300 | +                else {  | 
 | 301 | +                    panic!("Expected RequestTimeout error, got: {:?}", pager_err);  | 
 | 302 | +                };  | 
 | 303 | +                assert_eq!(got_timeout, timeout);  | 
 | 304 | +                info!("Case 1 passed.");  | 
 | 305 | +            }  | 
 | 306 | + | 
 | 307 | +            // Case 2: the second page fetch times out.  | 
 | 308 | +            {  | 
 | 309 | +                let timeout = Duration::from_secs(1);  | 
 | 310 | +                prepared.set_request_timeout(Some(timeout));  | 
 | 311 | + | 
 | 312 | +                running_proxy.running_nodes.iter_mut().for_each(|node| {  | 
 | 313 | +                    node.change_request_rules(Some(vec![  | 
 | 314 | +                        // Pass one frame, then delay all subsequent ones.  | 
 | 315 | +                        RequestRule(  | 
 | 316 | +                            Condition::RequestOpcode(RequestOpcode::Execute)  | 
 | 317 | +                                .and(Condition::not(Condition::ConnectionRegisteredAnyEvent))  | 
 | 318 | +                                .and(Condition::TrueForLimitedTimes(1)),  | 
 | 319 | +                            RequestReaction::noop()  | 
 | 320 | +                        ),  | 
 | 321 | +                        RequestRule(  | 
 | 322 | +                            Condition::RequestOpcode(RequestOpcode::Execute)  | 
 | 323 | +                                .and(Condition::not(Condition::ConnectionRegisteredAnyEvent)),  | 
 | 324 | +                            RequestReaction::delay(timeout + Duration::from_secs(1))  | 
 | 325 | +                        )  | 
 | 326 | +                    ]));  | 
 | 327 | +                });  | 
 | 328 | + | 
 | 329 | +                let mut row_stream = session  | 
 | 330 | +                    .execute_iter(prepared.clone(), ())  | 
 | 331 | +                    .await  | 
 | 332 | +                    .unwrap()  | 
 | 333 | +                    .rows_stream::<(i32,)>()  | 
 | 334 | +                    .unwrap();  | 
 | 335 | + | 
 | 336 | +                // Observation that is not critical to the test, but good to note:  | 
 | 337 | +                // at this point, at most two pages have been fetched:  | 
 | 338 | +                // - the first page, fetched eagerly by execute_iter;  | 
 | 339 | +                // - possibly the second page, fetched lazily by rows_stream;  | 
 | 340 | +                // - no more pages may have been fetched yet, because the second page would be  | 
 | 341 | +                //   stuck on channel.send(), waiting for us to consume the first row.  | 
 | 342 | + | 
 | 343 | +                // First page (1 row) must have been fetched successfully.  | 
 | 344 | +                let (_a,) = row_stream.next().await.unwrap().unwrap();  | 
 | 345 | + | 
 | 346 | +                // The second page fetch must time out.  | 
 | 347 | +                let row_err = row_stream.next().await.unwrap().unwrap_err();  | 
 | 348 | +                let NextRowError::NextPageError(NextPageError::RequestFailure(  | 
 | 349 | +                    RequestError::RequestTimeout(got_timeout),  | 
 | 350 | +                )) = row_err  | 
 | 351 | +                else {  | 
 | 352 | +                    panic!("Expected RequestTimeout error, got: {:?}", row_err);  | 
 | 353 | +                };  | 
 | 354 | +                assert_eq!(got_timeout, timeout);  | 
 | 355 | +                info!("Case 2 passed.");  | 
 | 356 | +            }  | 
 | 357 | + | 
 | 358 | +            // Case 3: retries' cumulative duration exceed the timeout.  | 
 | 359 | +            {  | 
 | 360 | +                // Here, each retry will be delayed by 200ms.  | 
 | 361 | +                // With a 500ms timeout, this means that after 3 retries (600ms total delay),  | 
 | 362 | +                // the timeout will be exceeded.  | 
 | 363 | +                let per_retry_delay = Duration::from_millis(200);  | 
 | 364 | +                let timeout = Duration::from_millis(500);  | 
 | 365 | + | 
 | 366 | +                // Set timeout through the execution profile.  | 
 | 367 | +                {  | 
 | 368 | +                    let profile = ExecutionProfile::builder().request_timeout(Some(timeout)).build();  | 
 | 369 | +                    let handle = profile.into_handle();  | 
 | 370 | +                    prepared.set_execution_profile_handle(Some(handle));  | 
 | 371 | +                    prepared.set_request_timeout(None);  | 
 | 372 | +                }  | 
 | 373 | + | 
 | 374 | +                running_proxy.running_nodes.iter_mut().for_each(|node| {  | 
 | 375 | +                    node.change_request_rules(Some(vec![  | 
 | 376 | +                        RequestRule(  | 
 | 377 | +                            Condition::RequestOpcode(RequestOpcode::Execute)  | 
 | 378 | +                                .and(Condition::not(Condition::ConnectionRegisteredAnyEvent)),  | 
 | 379 | +                            RequestReaction::forge_with_error_lazy_delay(  | 
 | 380 | +                                Box::new(example_db_errors::overloaded),  | 
 | 381 | +                                Some(per_retry_delay))  | 
 | 382 | +                        )  | 
 | 383 | +                    ]));  | 
 | 384 | +                });  | 
 | 385 | + | 
 | 386 | +                let pager_err = session.execute_iter(prepared, ()).await.unwrap_err();  | 
 | 387 | +                let PagerExecutionError::NextPageError(NextPageError::RequestFailure(  | 
 | 388 | +                    RequestError::RequestTimeout(got_timeout),  | 
 | 389 | +                )) = pager_err  | 
 | 390 | +                else {  | 
 | 391 | +                    panic!("Expected RequestTimeout error, got: {:?}", pager_err);  | 
 | 392 | +                };  | 
 | 393 | +                assert_eq!(got_timeout, timeout);  | 
 | 394 | +                info!("Case 3 passed.");  | 
 | 395 | +            }  | 
 | 396 | + | 
 | 397 | +            /* Teardown */  | 
 | 398 | +            session.ddl(format!("DROP KEYSPACE {ks}")).await.unwrap();  | 
 | 399 | + | 
 | 400 | +            running_proxy  | 
 | 401 | +        },  | 
 | 402 | +    ).await;  | 
 | 403 | + | 
 | 404 | +    match res {  | 
 | 405 | +        Ok(()) => (),  | 
 | 406 | +        Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),  | 
 | 407 | +        Err(err) => panic!("{}", err),  | 
 | 408 | +    }  | 
 | 409 | +}  | 
0 commit comments