From ae3e14cbbfc23965241df8f982d60de05972e516 Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Wed, 12 Feb 2025 13:20:12 +0100 Subject: [PATCH 1/6] wip Signed-off-by: Dusan Malusev --- scylla/tests/integration/main.rs | 1 + scylla/tests/integration/paging_query.rs | 240 +++++++++++++++++++++++ 2 files changed, 241 insertions(+) create mode 100644 scylla/tests/integration/paging_query.rs diff --git a/scylla/tests/integration/main.rs b/scylla/tests/integration/main.rs index d69c0aa0ca..bbd866f56d 100644 --- a/scylla/tests/integration/main.rs +++ b/scylla/tests/integration/main.rs @@ -11,6 +11,7 @@ mod hygiene; mod large_batch_statements; mod lwt_optimisation; mod new_session; +mod paging_query; mod query_result; mod retries; mod self_identity; diff --git a/scylla/tests/integration/paging_query.rs b/scylla/tests/integration/paging_query.rs new file mode 100644 index 0000000000..b3387a769c --- /dev/null +++ b/scylla/tests/integration/paging_query.rs @@ -0,0 +1,240 @@ +use crate::utils::{setup_tracing, test_with_3_node_cluster, unique_keyspace_name, PerformDDL}; +use scylla::client::session::Session; +use scylla::client::session_builder::SessionBuilder; +use scylla::query::Query; +use scylla_cql::frame::request::query::PagingState; +use scylla_cql::Consistency; +use scylla_proxy::{ProxyError, ShardAwareness, WorkerError}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::ops::ControlFlow; +use std::sync::Arc; + +const PAGE_SIZE: usize = 10; +const ITEMS: usize = 20; + +async fn prepare_data( + proxy_uris: [String; 3], + translation_map: HashMap, +) -> (String, Session) { + let session = SessionBuilder::new() + .known_node(proxy_uris[0].as_str()) + .address_translator(Arc::new(translation_map)) + .build() + .await + .unwrap(); + + let ks = unique_keyspace_name(); + + session.ddl( + format!("CREATE KEYSPACE IF NOT EXISTS {ks} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}") + ) + .await. + unwrap(); + session + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.t (a int, primary key (a))", + ks + )) + .await + .unwrap(); + + let mut prepared_insert = session + .prepare(format!("INSERT INTO {ks}.t (a) VALUES (?)")) + .await + .unwrap(); + + prepared_insert.set_consistency(Consistency::Quorum); + + for i in 0..ITEMS as i32 { + session + .execute_unpaged(&prepared_insert, (i,)) + .await + .unwrap(); + } + + (ks, session) +} + +#[tokio::test] +async fn test_paging_single_page_result() { + setup_tracing(); + let result = test_with_3_node_cluster( + ShardAwareness::QueryNode, + |proxy_uris, translation_map, running_proxy| async move { + let (ks, session) = prepare_data(proxy_uris, translation_map).await; + + let mut query = Query::from(format!("SELECT a FROM {}.t WHERE a = ?", ks)); + query.set_consistency(Consistency::Quorum); + query.set_page_size(10); + + let state = PagingState::default(); + let result = session.query_single_page(query, (0,), state).await; + + assert!(result.is_ok()); + let (query_result, paging_state_response) = result.unwrap(); + + assert_eq!(query_result.into_rows_result().unwrap().rows_num(), 1); + assert!(paging_state_response.finished()); + + running_proxy + }, + ) + .await; + + match result { + Ok(_) => {} + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => {} + Err(err) => panic!("{err:?}"), + } +} +#[tokio::test] +async fn test_paging_single_page_result_prepared() { + setup_tracing(); + let result = test_with_3_node_cluster( + ShardAwareness::QueryNode, + |proxy_uris, translation_map, running_proxy| async move { + let (ks, session) = prepare_data(proxy_uris, translation_map).await; + + let mut query = session + .prepare(format!("SELECT a FROM {}.t WHERE a = ?", ks)) + .await + .unwrap(); + query.set_consistency(Consistency::Quorum); + query.set_page_size(10); + + let state = PagingState::default(); + let result = session.execute_single_page(&query, (0,), state).await; + + assert!(result.is_ok()); + let (query_result, paging_state_response) = result.unwrap(); + + assert_eq!(query_result.into_rows_result().unwrap().rows_num(), 1); + assert!(paging_state_response.finished()); + + running_proxy + }, + ) + .await; + + match result { + Ok(_) => {} + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => {} + Err(err) => panic!("{err:?}"), + } +} + +#[tokio::test] +async fn test_paging_multiple_no_errors() { + setup_tracing(); + let result = test_with_3_node_cluster( + ShardAwareness::QueryNode, + |proxy_uris, translation_map, running_proxy| async move { + let (ks, session) = prepare_data(proxy_uris, translation_map).await; + + let mut query = session + .prepare(format!("SELECT a FROM {}.t WHERE a = ?", ks)) + .await + .unwrap(); + query.set_consistency(Consistency::Quorum); + query.set_page_size(10); + + let mut state = PagingState::default(); + + let mut counter = 0; + loop { + let result = session + .execute_single_page(&query, &(), state.clone()) + .await; + let (query_result, paging_state_response) = result.unwrap(); + match paging_state_response.into_paging_control_flow() { + ControlFlow::Break(()) => { + break; + } + ControlFlow::Continue(new_paging_state) => { + assert_eq!(query_result.into_rows_result().unwrap().rows_num(), 10); + state = new_paging_state; + } + } + + counter += 1; + } + + assert_eq!(counter, ITEMS / PAGE_SIZE); + + running_proxy + }, + ) + .await; + + match result { + Ok(_) => {} + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => {} + Err(err) => panic!("{err:?}"), + } +} + +#[tokio::test] +async fn test_paging_multiple_no_errors_prepared() { + setup_tracing(); + let result = test_with_3_node_cluster( + ShardAwareness::QueryNode, + |proxy_uris, translation_map, running_proxy| async move { + let (ks, session) = prepare_data(proxy_uris, translation_map).await; + + let mut query = Query::from(format!("SELECT a FROM {}.t", ks)); + query.set_consistency(Consistency::Quorum); + query.set_page_size(PAGE_SIZE as i32); + + let mut state = PagingState::default(); + + let mut counter = 0; + loop { + let result = session + .query_single_page(query.clone(), &(), state.clone()) + .await; + let (query_result, paging_state_response) = result.unwrap(); + match paging_state_response.into_paging_control_flow() { + ControlFlow::Break(()) => { + break; + } + ControlFlow::Continue(new_paging_state) => { + assert_eq!(query_result.into_rows_result().unwrap().rows_num(), 10); + state = new_paging_state; + } + } + + counter += 1; + } + + assert_eq!(counter, ITEMS / PAGE_SIZE); + + running_proxy + }, + ) + .await; + + match result { + Ok(_) => {} + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => {} + Err(err) => panic!("{err:?}"), + } +} + +#[tokio::test] +async fn test_paging_error() {} + +#[tokio::test] +async fn test_paging_error_prepared() {} + +#[tokio::test] +async fn test_paging_error_on_next_page() {} + +#[tokio::test] +async fn test_paging_error_on_next_page_prepared() {} + +#[tokio::test] +async fn test_paging_wrong_page_state() {} + +#[tokio::test] +async fn test_paging_wrong_page_state_prepared() {} From a40432f1e49518376eb4c2dedcafc6d6acd96ff5 Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Wed, 12 Feb 2025 19:23:24 +0100 Subject: [PATCH 2/6] wip Signed-off-by: Dusan Malusev --- scylla/tests/integration/paging_query.rs | 350 ++++++++++++++--------- 1 file changed, 221 insertions(+), 129 deletions(-) diff --git a/scylla/tests/integration/paging_query.rs b/scylla/tests/integration/paging_query.rs index b3387a769c..a5f2467c24 100644 --- a/scylla/tests/integration/paging_query.rs +++ b/scylla/tests/integration/paging_query.rs @@ -1,17 +1,33 @@ use crate::utils::{setup_tracing, test_with_3_node_cluster, unique_keyspace_name, PerformDDL}; +use assert_matches::assert_matches; use scylla::client::session::Session; use scylla::client::session_builder::SessionBuilder; +use scylla::errors::{BadQuery, ExecutionError, RequestAttemptError}; +use scylla::prepared_statement::PreparedStatement; use scylla::query::Query; -use scylla_cql::frame::request::query::PagingState; +use scylla::response::query_result::QueryResult; +use scylla_cql::frame::request::query::{PagingState, PagingStateResponse}; +use scylla_cql::frame::request::RequestOpcode; +use scylla_cql::frame::response::error::DbError; use scylla_cql::Consistency; -use scylla_proxy::{ProxyError, ShardAwareness, WorkerError}; +use scylla_cql::_macro_internal::SerializeRow; +use scylla_proxy::{ + Condition, ProxyError, RequestReaction, RequestRule, RunningProxy, ShardAwareness, WorkerError, +}; use std::collections::HashMap; +use std::error::Error; use std::net::SocketAddr; use std::ops::ControlFlow; use std::sync::Arc; -const PAGE_SIZE: usize = 10; -const ITEMS: usize = 20; +const PAGE_SIZE: i32 = 10; +const ITEMS: i32 = 20; + +#[derive(Clone)] +enum Statement { + Prepared(PreparedStatement), + Simple(Query), +} async fn prepare_data( proxy_uris: [String; 3], @@ -46,7 +62,7 @@ async fn prepare_data( prepared_insert.set_consistency(Consistency::Quorum); - for i in 0..ITEMS as i32 { + for i in 0..ITEMS { session .execute_unpaged(&prepared_insert, (i,)) .await @@ -56,28 +72,71 @@ async fn prepare_data( (ks, session) } -#[tokio::test] -async fn test_paging_single_page_result() { +async fn execute_statement( + session: impl AsRef, + statement: Statement, + args: impl SerializeRow, + paging_state: PagingState, +) -> Result<(QueryResult, PagingStateResponse), ExecutionError> { + match statement { + Statement::Simple(query) => { + session + .as_ref() + .query_single_page(query, args, paging_state) + .await + } + Statement::Prepared(prepared) => { + session + .as_ref() + .execute_single_page(&prepared, args, paging_state) + .await + } + } +} + +async fn execute_test(query: impl AsRef, callback: F) +where + F: Fn(Arc, RunningProxy, Statement) -> Fut, + Fut: std::future::Future>>, +{ setup_tracing(); + let result = test_with_3_node_cluster( ShardAwareness::QueryNode, |proxy_uris, translation_map, running_proxy| async move { let (ks, session) = prepare_data(proxy_uris, translation_map).await; + let session = Arc::new(session); + + let query = query.as_ref().to_string().replace("%keyspace%", &ks); - let mut query = Query::from(format!("SELECT a FROM {}.t WHERE a = ?", ks)); + let mut query = Query::from(query.clone()); query.set_consistency(Consistency::Quorum); - query.set_page_size(10); + query.set_page_size(PAGE_SIZE); - let state = PagingState::default(); - let result = session.query_single_page(query, (0,), state).await; + let mut prepared = session.prepare(query.clone()).await.unwrap(); + prepared.set_consistency(Consistency::Quorum); + prepared.set_page_size(PAGE_SIZE); - assert!(result.is_ok()); - let (query_result, paging_state_response) = result.unwrap(); + let mut running_proxy = callback( + Arc::clone(&session), + running_proxy, + Statement::Simple(query), + ) + .await + .unwrap(); - assert_eq!(query_result.into_rows_result().unwrap().rows_num(), 1); - assert!(paging_state_response.finished()); + running_proxy.running_nodes.iter_mut().for_each(|node| { + node.change_response_rules(None); + node.change_request_rules(None); + }); - running_proxy + callback( + Arc::clone(&session), + running_proxy, + Statement::Prepared(prepared), + ) + .await + .unwrap() }, ) .await; @@ -88,153 +147,186 @@ async fn test_paging_single_page_result() { Err(err) => panic!("{err:?}"), } } -#[tokio::test] -async fn test_paging_single_page_result_prepared() { - setup_tracing(); - let result = test_with_3_node_cluster( - ShardAwareness::QueryNode, - |proxy_uris, translation_map, running_proxy| async move { - let (ks, session) = prepare_data(proxy_uris, translation_map).await; - let mut query = session - .prepare(format!("SELECT a FROM {}.t WHERE a = ?", ks)) - .await - .unwrap(); - query.set_consistency(Consistency::Quorum); - query.set_page_size(10); - - let state = PagingState::default(); - let result = session.execute_single_page(&query, (0,), state).await; - - assert!(result.is_ok()); - let (query_result, paging_state_response) = result.unwrap(); +#[tokio::test] +async fn test_paging_single_page_result() { + execute_test( + "SELECT a FROM %keyspace%.t WHERE a = ?", + |session, running_proxy, statement| async move { + let (query_result, paging_state_response) = + execute_statement(&session, statement, (0,), PagingState::start()).await?; - assert_eq!(query_result.into_rows_result().unwrap().rows_num(), 1); + assert_eq!(query_result.into_rows_result()?.rows_num(), 1); assert!(paging_state_response.finished()); - running_proxy + Ok(running_proxy) }, ) .await; - - match result { - Ok(_) => {} - Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => {} - Err(err) => panic!("{err:?}"), - } } #[tokio::test] async fn test_paging_multiple_no_errors() { - setup_tracing(); - let result = test_with_3_node_cluster( - ShardAwareness::QueryNode, - |proxy_uris, translation_map, running_proxy| async move { - let (ks, session) = prepare_data(proxy_uris, translation_map).await; - - let mut query = session - .prepare(format!("SELECT a FROM {}.t WHERE a = ?", ks)) - .await - .unwrap(); - query.set_consistency(Consistency::Quorum); - query.set_page_size(10); - - let mut state = PagingState::default(); - - let mut counter = 0; - loop { - let result = session - .execute_single_page(&query, &(), state.clone()) - .await; - let (query_result, paging_state_response) = result.unwrap(); + execute_test( + "SELECT a FROM %keyspace%.t", + |session, running_proxy, statement| async move { + let mut state = PagingState::start(); + + for _ in 0..ITEMS / PAGE_SIZE { + let (query_result, paging_state_response) = + execute_statement(&session, statement.clone(), &[], state.clone()).await?; match paging_state_response.into_paging_control_flow() { - ControlFlow::Break(()) => { - break; + ControlFlow::Break(_) => { + panic!("Unexpected break"); } ControlFlow::Continue(new_paging_state) => { - assert_eq!(query_result.into_rows_result().unwrap().rows_num(), 10); + assert_eq!( + query_result.into_rows_result()?.rows_num(), + PAGE_SIZE as usize + ); state = new_paging_state; } } - - counter += 1; } - assert_eq!(counter, ITEMS / PAGE_SIZE); - - running_proxy + Ok(running_proxy) }, ) .await; - - match result { - Ok(_) => {} - Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => {} - Err(err) => panic!("{err:?}"), - } } #[tokio::test] -async fn test_paging_multiple_no_errors_prepared() { - setup_tracing(); - let result = test_with_3_node_cluster( - ShardAwareness::QueryNode, - |proxy_uris, translation_map, running_proxy| async move { - let (ks, session) = prepare_data(proxy_uris, translation_map).await; - - let mut query = Query::from(format!("SELECT a FROM {}.t", ks)); - query.set_consistency(Consistency::Quorum); - query.set_page_size(PAGE_SIZE as i32); - - let mut state = PagingState::default(); - - let mut counter = 0; - loop { - let result = session - .query_single_page(query.clone(), &(), state.clone()) - .await; - let (query_result, paging_state_response) = result.unwrap(); - match paging_state_response.into_paging_control_flow() { - ControlFlow::Break(()) => { - break; - } - ControlFlow::Continue(new_paging_state) => { - assert_eq!(query_result.into_rows_result().unwrap().rows_num(), 10); - state = new_paging_state; - } +async fn test_paging_error() { + execute_test( + "SELECT a FROM %keyspace%.t WHERE a = ?", + |session, running_proxy, statement| async move { + let result = execute_statement( + &session, + statement.clone(), + ("hello world",), + PagingState::start(), + ) + .await + .unwrap_err(); + + match statement { + Statement::Simple(_) => { + assert_matches!( + result, + ExecutionError::LastAttemptError(RequestAttemptError::SerializationError( + _ + )) + ); + } + Statement::Prepared(_) => { + assert_matches!( + result, + ExecutionError::BadQuery(BadQuery::SerializationError(_)) + ); } - - counter += 1; } - assert_eq!(counter, ITEMS / PAGE_SIZE); - - running_proxy + Ok(running_proxy) }, ) .await; - - match result { - Ok(_) => {} - Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => {} - Err(err) => panic!("{err:?}"), - } } #[tokio::test] -async fn test_paging_error() {} - -#[tokio::test] -async fn test_paging_error_prepared() {} - -#[tokio::test] -async fn test_paging_error_on_next_page() {} +async fn test_paging_error_on_next_page() { + execute_test( + "SELECT a FROM %keyspace%.t", + |session, mut running_proxy, statement| async move { + let mut state = PagingState::start(); + let (_, paging_state_resp) = + execute_statement(&session, statement.clone(), (), state.clone()).await?; + + state = paging_state_resp + .into_paging_control_flow() + .continue_value() + .unwrap(); -#[tokio::test] -async fn test_paging_error_on_next_page_prepared() {} + running_proxy.running_nodes.iter_mut().for_each(|node| { + node.change_request_rules(Some(vec![ + RequestRule( + Condition::RequestOpcode(RequestOpcode::Execute), + RequestReaction::forge_with_error(DbError::ServerError), + ), + RequestRule( + Condition::RequestOpcode(RequestOpcode::Query), + RequestReaction::forge_with_error(DbError::ServerError), + ), + ])) + }); + + let result = execute_statement(&session, statement.clone(), (), state.clone()) + .await + .unwrap_err(); -#[tokio::test] -async fn test_paging_wrong_page_state() {} + assert_matches!( + result, + ExecutionError::LastAttemptError(RequestAttemptError::DbError( + DbError::ServerError, + .. + )) + ); -#[tokio::test] -async fn test_paging_wrong_page_state_prepared() {} + Ok(running_proxy) + }, + ) + .await; +} +// +// #[tokio::test] +// async fn test_paging_wrong_page_state() { +// execute_test( +// "SELECT a FROM %keyspace%.t", +// |session, mut running_proxy, statement| async move { +// let mut state = PagingState::start(); +// let (_, paging_state_resp) = +// execute_statement(&session, statement.clone(), (), state.clone()).await?; +// +// state = paging_state_resp +// .into_paging_control_flow() +// .continue_value() +// .unwrap(); +// +// running_proxy.running_nodes.iter_mut().for_each(|node| { +// let response = +// RequestReaction::forge_response(Arc::new(|RequestFrame { params, .. }| { +// ResponseFrame { +// params: params.for_response(), +// opcode: ResponseOpcode::Event, +// body: Bytes::new(), +// } +// })); +// +// node.change_request_rules(Some(vec![ +// RequestRule( +// Condition::RequestOpcode(RequestOpcode::Execute), +// response.clone(), +// ), +// RequestRule( +// Condition::RequestOpcode(RequestOpcode::Query), +// response.clone(), +// ), +// ])); +// }); +// +// let result = execute_statement(&session, statement, (), state.clone()) +// .await +// .unwrap_err(); +// +// assert_matches!( +// result, +// ExecutionError::LastAttemptError(RequestAttemptError::DbError( +// DbError::ServerError, +// .. +// ),) +// ); +// +// Ok(running_proxy) +// }, +// ) +// .await; +// } From 6251b9501dda367a390ace4a858c2808cb35754d Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Thu, 13 Feb 2025 15:38:33 +0100 Subject: [PATCH 3/6] wip Signed-off-by: Dusan Malusev --- scylla/tests/integration/paging_query.rs | 232 ++++++++++++----------- 1 file changed, 117 insertions(+), 115 deletions(-) diff --git a/scylla/tests/integration/paging_query.rs b/scylla/tests/integration/paging_query.rs index a5f2467c24..d24e59ffbf 100644 --- a/scylla/tests/integration/paging_query.rs +++ b/scylla/tests/integration/paging_query.rs @@ -1,4 +1,7 @@ -use crate::utils::{setup_tracing, test_with_3_node_cluster, unique_keyspace_name, PerformDDL}; +use crate::utils::{ + create_new_session_builder, setup_tracing, test_with_3_node_cluster, unique_keyspace_name, + PerformDDL, +}; use assert_matches::assert_matches; use scylla::client::session::Session; use scylla::client::session_builder::SessionBuilder; @@ -14,9 +17,7 @@ use scylla_cql::_macro_internal::SerializeRow; use scylla_proxy::{ Condition, ProxyError, RequestReaction, RequestRule, RunningProxy, ShardAwareness, WorkerError, }; -use std::collections::HashMap; use std::error::Error; -use std::net::SocketAddr; use std::ops::ControlFlow; use std::sync::Arc; @@ -29,18 +30,9 @@ enum Statement { Simple(Query), } -async fn prepare_data( - proxy_uris: [String; 3], - translation_map: HashMap, -) -> (String, Session) { - let session = SessionBuilder::new() - .known_node(proxy_uris[0].as_str()) - .address_translator(Arc::new(translation_map)) - .build() - .await - .unwrap(); - +async fn prepare_data(session: impl AsRef) -> String { let ks = unique_keyspace_name(); + let session = session.as_ref(); session.ddl( format!("CREATE KEYSPACE IF NOT EXISTS {ks} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}") @@ -69,7 +61,7 @@ async fn prepare_data( .unwrap(); } - (ks, session) + ks } async fn execute_statement( @@ -94,49 +86,81 @@ async fn execute_statement( } } -async fn execute_test(query: impl AsRef, callback: F) +async fn test_callback( + session: Arc, + query: impl AsRef, + callback: F, + running_proxy: Option, +) -> Option where - F: Fn(Arc, RunningProxy, Statement) -> Fut, - Fut: std::future::Future>>, + F: Fn(Arc, Option, Statement) -> Fut, + Fut: std::future::Future, Box>>, { - setup_tracing(); + let ks = prepare_data(Arc::clone(&session)).await; - let result = test_with_3_node_cluster( - ShardAwareness::QueryNode, - |proxy_uris, translation_map, running_proxy| async move { - let (ks, session) = prepare_data(proxy_uris, translation_map).await; - let session = Arc::new(session); + let query = query.as_ref().to_string().replace("%keyspace%", &ks); - let query = query.as_ref().to_string().replace("%keyspace%", &ks); + let mut query = Query::from(query.clone()); + query.set_consistency(Consistency::Quorum); + query.set_page_size(PAGE_SIZE); - let mut query = Query::from(query.clone()); - query.set_consistency(Consistency::Quorum); - query.set_page_size(PAGE_SIZE); + let mut prepared = session.prepare(query.clone()).await.unwrap(); + prepared.set_consistency(Consistency::Quorum); + prepared.set_page_size(PAGE_SIZE); - let mut prepared = session.prepare(query.clone()).await.unwrap(); - prepared.set_consistency(Consistency::Quorum); - prepared.set_page_size(PAGE_SIZE); + let mut running_proxy = callback( + Arc::clone(&session), + running_proxy, + Statement::Simple(query), + ) + .await + .unwrap(); + + if let Some(ref mut running_proxy) = running_proxy { + running_proxy.running_nodes.iter_mut().for_each(|node| { + node.change_response_rules(None); + node.change_request_rules(None); + }); + } - let mut running_proxy = callback( - Arc::clone(&session), - running_proxy, - Statement::Simple(query), - ) - .await - .unwrap(); + callback( + Arc::clone(&session), + running_proxy, + Statement::Prepared(prepared), + ) + .await + .unwrap() +} - running_proxy.running_nodes.iter_mut().for_each(|node| { - node.change_response_rules(None); - node.change_request_rules(None); - }); +async fn execute_test(query: impl AsRef, callback: F, init_cluster: bool) +where + F: Fn(Arc, Option, Statement) -> Fut, + Fut: std::future::Future, Box>>, +{ + setup_tracing(); - callback( - Arc::clone(&session), - running_proxy, - Statement::Prepared(prepared), - ) - .await - .unwrap() + if !init_cluster { + let session = Arc::new(create_new_session_builder().build().await.unwrap()); + + test_callback(session, query, callback, None).await; + return; + } + + let result = test_with_3_node_cluster( + ShardAwareness::QueryNode, + |proxy_uris, translation_map, running_proxy| async move { + let session = Arc::new( + SessionBuilder::new() + .known_node(proxy_uris[0].as_str()) + .address_translator(Arc::new(translation_map)) + .build() + .await + .unwrap(), + ); + + test_callback(session, query, callback, Some(running_proxy)) + .await + .unwrap() }, ) .await; @@ -161,6 +185,28 @@ async fn test_paging_single_page_result() { Ok(running_proxy) }, + false, + ) + .await; +} +#[tokio::test] +async fn test_paging_single_page_single_result() { + execute_test( + "SELECT a FROM %keyspace%.t WHERE a = ?", + |session, running_proxy, statement| async move { + let (query_result, paging_state_response) = + execute_statement(&session, statement, (0,), PagingState::start()).await?; + + let results = query_result.into_rows_result()?; + assert_eq!(results.rows_num(), 1); + assert!(paging_state_response.finished()); + + let (a,) = results.single_row::<(i32,)>()?; + assert_eq!(a, 0); + + Ok(running_proxy) + }, + false, ) .await; } @@ -191,11 +237,13 @@ async fn test_paging_multiple_no_errors() { Ok(running_proxy) }, + false, ) .await; } #[tokio::test] +#[cfg(not(scylla_cloud_tests))] async fn test_paging_error() { execute_test( "SELECT a FROM %keyspace%.t WHERE a = ?", @@ -228,11 +276,13 @@ async fn test_paging_error() { Ok(running_proxy) }, + true, ) .await; } #[tokio::test] +#[cfg(not(scylla_cloud_tests))] async fn test_paging_error_on_next_page() { execute_test( "SELECT a FROM %keyspace%.t", @@ -246,18 +296,23 @@ async fn test_paging_error_on_next_page() { .continue_value() .unwrap(); - running_proxy.running_nodes.iter_mut().for_each(|node| { - node.change_request_rules(Some(vec![ - RequestRule( - Condition::RequestOpcode(RequestOpcode::Execute), - RequestReaction::forge_with_error(DbError::ServerError), - ), - RequestRule( - Condition::RequestOpcode(RequestOpcode::Query), - RequestReaction::forge_with_error(DbError::ServerError), - ), - ])) - }); + running_proxy + .as_mut() + .unwrap() + .running_nodes + .iter_mut() + .for_each(|node| { + node.change_request_rules(Some(vec![ + RequestRule( + Condition::RequestOpcode(RequestOpcode::Execute), + RequestReaction::forge_with_error(DbError::ServerError), + ), + RequestRule( + Condition::RequestOpcode(RequestOpcode::Query), + RequestReaction::forge_with_error(DbError::ServerError), + ), + ])) + }); let result = execute_statement(&session, statement.clone(), (), state.clone()) .await @@ -273,60 +328,7 @@ async fn test_paging_error_on_next_page() { Ok(running_proxy) }, + true, ) .await; } -// -// #[tokio::test] -// async fn test_paging_wrong_page_state() { -// execute_test( -// "SELECT a FROM %keyspace%.t", -// |session, mut running_proxy, statement| async move { -// let mut state = PagingState::start(); -// let (_, paging_state_resp) = -// execute_statement(&session, statement.clone(), (), state.clone()).await?; -// -// state = paging_state_resp -// .into_paging_control_flow() -// .continue_value() -// .unwrap(); -// -// running_proxy.running_nodes.iter_mut().for_each(|node| { -// let response = -// RequestReaction::forge_response(Arc::new(|RequestFrame { params, .. }| { -// ResponseFrame { -// params: params.for_response(), -// opcode: ResponseOpcode::Event, -// body: Bytes::new(), -// } -// })); -// -// node.change_request_rules(Some(vec![ -// RequestRule( -// Condition::RequestOpcode(RequestOpcode::Execute), -// response.clone(), -// ), -// RequestRule( -// Condition::RequestOpcode(RequestOpcode::Query), -// response.clone(), -// ), -// ])); -// }); -// -// let result = execute_statement(&session, statement, (), state.clone()) -// .await -// .unwrap_err(); -// -// assert_matches!( -// result, -// ExecutionError::LastAttemptError(RequestAttemptError::DbError( -// DbError::ServerError, -// .. -// ),) -// ); -// -// Ok(running_proxy) -// }, -// ) -// .await; -// } From 6d77a917c25fcd4ba881aaf10761cc395e94a57e Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Thu, 13 Feb 2025 15:56:28 +0100 Subject: [PATCH 4/6] wip Signed-off-by: Dusan Malusev --- scylla/tests/integration/paging_query.rs | 32 +++++++++++------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/scylla/tests/integration/paging_query.rs b/scylla/tests/integration/paging_query.rs index d24e59ffbf..614ae216b5 100644 --- a/scylla/tests/integration/paging_query.rs +++ b/scylla/tests/integration/paging_query.rs @@ -1,7 +1,6 @@ -use crate::utils::{ - create_new_session_builder, setup_tracing, test_with_3_node_cluster, unique_keyspace_name, - PerformDDL, -}; +#[cfg(scylla_cloud_tests)] +use crate::utils::create_new_session_builder; +use crate::utils::{setup_tracing, test_with_3_node_cluster, unique_keyspace_name, PerformDDL}; use assert_matches::assert_matches; use scylla::client::session::Session; use scylla::client::session_builder::SessionBuilder; @@ -52,6 +51,7 @@ async fn prepare_data(session: impl AsRef) -> String { .await .unwrap(); + #[cfg(not(scylla_cloud_tests))] prepared_insert.set_consistency(Consistency::Quorum); for i in 0..ITEMS { @@ -101,10 +101,12 @@ where let query = query.as_ref().to_string().replace("%keyspace%", &ks); let mut query = Query::from(query.clone()); + #[cfg(not(scylla_cloud_tests))] query.set_consistency(Consistency::Quorum); query.set_page_size(PAGE_SIZE); let mut prepared = session.prepare(query.clone()).await.unwrap(); + #[cfg(not(scylla_cloud_tests))] prepared.set_consistency(Consistency::Quorum); prepared.set_page_size(PAGE_SIZE); @@ -132,20 +134,20 @@ where .unwrap() } -async fn execute_test(query: impl AsRef, callback: F, init_cluster: bool) +async fn execute_test(query: impl AsRef, callback: F) where F: Fn(Arc, Option, Statement) -> Fut, Fut: std::future::Future, Box>>, { setup_tracing(); - if !init_cluster { + #[cfg(scylla_cloud_tests)] + { let session = Arc::new(create_new_session_builder().build().await.unwrap()); - test_callback(session, query, callback, None).await; - return; } + #[cfg(not(scylla_cloud_tests))] let result = test_with_3_node_cluster( ShardAwareness::QueryNode, |proxy_uris, translation_map, running_proxy| async move { @@ -165,6 +167,7 @@ where ) .await; + #[cfg(not(scylla_cloud_tests))] match result { Ok(_) => {} Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => {} @@ -185,7 +188,6 @@ async fn test_paging_single_page_result() { Ok(running_proxy) }, - false, ) .await; } @@ -206,7 +208,6 @@ async fn test_paging_single_page_single_result() { Ok(running_proxy) }, - false, ) .await; } @@ -237,7 +238,6 @@ async fn test_paging_multiple_no_errors() { Ok(running_proxy) }, - false, ) .await; } @@ -276,7 +276,6 @@ async fn test_paging_error() { Ok(running_proxy) }, - true, ) .await; } @@ -291,10 +290,10 @@ async fn test_paging_error_on_next_page() { let (_, paging_state_resp) = execute_statement(&session, statement.clone(), (), state.clone()).await?; - state = paging_state_resp - .into_paging_control_flow() - .continue_value() - .unwrap(); + state = match paging_state_resp.into_paging_control_flow() { + ControlFlow::Continue(x) => x, + ControlFlow::Break(..) => panic!("Unexpected break"), + }; running_proxy .as_mut() @@ -328,7 +327,6 @@ async fn test_paging_error_on_next_page() { Ok(running_proxy) }, - true, ) .await; } From 227d18ba90d9f7b1e14d66f5d06dac3b0fa6869a Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Thu, 13 Feb 2025 16:17:28 +0100 Subject: [PATCH 5/6] wip Signed-off-by: Dusan Malusev --- scylla/tests/integration/paging_query.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/scylla/tests/integration/paging_query.rs b/scylla/tests/integration/paging_query.rs index 614ae216b5..a38e0647e2 100644 --- a/scylla/tests/integration/paging_query.rs +++ b/scylla/tests/integration/paging_query.rs @@ -53,7 +53,9 @@ async fn prepare_data(session: impl AsRef) -> String { #[cfg(not(scylla_cloud_tests))] prepared_insert.set_consistency(Consistency::Quorum); - + #[cfg(scylla_cloud_tests)] + prepared_insert.set_consistency(Consistency::One); + for i in 0..ITEMS { session .execute_unpaged(&prepared_insert, (i,)) @@ -103,11 +105,16 @@ where let mut query = Query::from(query.clone()); #[cfg(not(scylla_cloud_tests))] query.set_consistency(Consistency::Quorum); + #[cfg(scylla_cloud_tests)] + query.set_consistency(Consistency::One); + query.set_page_size(PAGE_SIZE); let mut prepared = session.prepare(query.clone()).await.unwrap(); #[cfg(not(scylla_cloud_tests))] prepared.set_consistency(Consistency::Quorum); + #[cfg(scylla_cloud_tests)] + prepared.set_consistency(Consistency::One); prepared.set_page_size(PAGE_SIZE); let mut running_proxy = callback( From 3cd709b66d4c1224442de29fe954ae5aad515ef2 Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Thu, 13 Feb 2025 16:21:41 +0100 Subject: [PATCH 6/6] wip Signed-off-by: Dusan Malusev --- scylla/tests/integration/paging_query.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scylla/tests/integration/paging_query.rs b/scylla/tests/integration/paging_query.rs index a38e0647e2..4f4dfccccf 100644 --- a/scylla/tests/integration/paging_query.rs +++ b/scylla/tests/integration/paging_query.rs @@ -55,7 +55,7 @@ async fn prepare_data(session: impl AsRef) -> String { prepared_insert.set_consistency(Consistency::Quorum); #[cfg(scylla_cloud_tests)] prepared_insert.set_consistency(Consistency::One); - + for i in 0..ITEMS { session .execute_unpaged(&prepared_insert, (i,))