|
1 | 1 | use assert_matches::assert_matches;
|
2 | 2 | use futures::TryStreamExt;
|
3 |
| -use scylla::errors::PagerExecutionError; |
| 3 | +use scylla::errors::{ExecutionError, PagerExecutionError}; |
4 | 4 | use scylla::{
|
5 | 5 | batch::{Batch, BatchType},
|
6 | 6 | client::session::Session,
|
7 | 7 | query::Query,
|
| 8 | + response::query_result::QueryResult, |
8 | 9 | };
|
9 | 10 | use scylla_cql::frame::request::query::{PagingState, PagingStateResponse};
|
10 | 11 |
|
@@ -295,3 +296,171 @@ async fn execute_iter_serialization_error() {
|
295 | 296 | Err(PagerExecutionError::SerializationError(_))
|
296 | 297 | )
|
297 | 298 | }
|
| 299 | + |
| 300 | +async fn create_session(table_name: &str) -> Session { |
| 301 | + let session: Session = create_new_session_builder().build().await.unwrap(); |
| 302 | + let ks = unique_keyspace_name(); |
| 303 | + |
| 304 | + let cql_create_ks = format!( |
| 305 | + "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION =\ |
| 306 | + {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", |
| 307 | + ks |
| 308 | + ); |
| 309 | + session.ddl(cql_create_ks).await.unwrap(); |
| 310 | + session.use_keyspace(ks, false).await.unwrap(); |
| 311 | + |
| 312 | + let cql_drop_table = format!("DROP TABLE IF EXISTS {}", table_name); |
| 313 | + session.ddl(cql_drop_table).await.unwrap(); |
| 314 | + |
| 315 | + let cql_create_table = format!( |
| 316 | + "CREATE TABLE IF NOT EXISTS {} (id int PRIMARY KEY, val int)", |
| 317 | + table_name, |
| 318 | + ); |
| 319 | + session.ddl(cql_create_table).await.unwrap(); |
| 320 | + |
| 321 | + session |
| 322 | +} |
| 323 | + |
| 324 | +async fn check_query_unpaged(insert_num: u64, use_prepared_statements: bool) { |
| 325 | + let table_name = if use_prepared_statements { |
| 326 | + "execute_unpaged" |
| 327 | + } else { |
| 328 | + "query_unpaged" |
| 329 | + }; |
| 330 | + let session: Session = create_session(table_name).await; |
| 331 | + session.await_schema_agreement().await.unwrap(); |
| 332 | + |
| 333 | + for i in 0..insert_num { |
| 334 | + if use_prepared_statements { |
| 335 | + let prepared_statement = session |
| 336 | + .prepare(format!("INSERT INTO {}(id, val) VALUES (?, ?)", table_name)) |
| 337 | + .await |
| 338 | + .unwrap(); |
| 339 | + session |
| 340 | + .execute_unpaged(&prepared_statement, &vec![i as i32, i as i32]) |
| 341 | + .await |
| 342 | + .unwrap(); |
| 343 | + } else { |
| 344 | + let cql = format!("INSERT INTO {}(id, val) VALUES ({}, {})", table_name, i, i); |
| 345 | + session.query_unpaged(cql, &[]).await.unwrap(); |
| 346 | + } |
| 347 | + } |
| 348 | + |
| 349 | + let query_result: QueryResult; |
| 350 | + if use_prepared_statements { |
| 351 | + let prepared_statement = session |
| 352 | + .prepare(format!("SELECT * FROM {}", table_name)) |
| 353 | + .await |
| 354 | + .unwrap(); |
| 355 | + query_result = session |
| 356 | + .execute_unpaged(&prepared_statement, &[]) |
| 357 | + .await |
| 358 | + .unwrap(); |
| 359 | + } else { |
| 360 | + let select_query = Query::new(format!("SELECT * FROM {}", table_name)).with_page_size(5); |
| 361 | + query_result = session.query_unpaged(select_query, &[]).await.unwrap(); |
| 362 | + } |
| 363 | + let rows = query_result.into_rows_result().unwrap(); |
| 364 | + |
| 365 | + // NOTE: check rows number using 'rows_num()' method. |
| 366 | + assert_eq!(rows.rows_num(), insert_num as usize); |
| 367 | + |
| 368 | + // NOTE: check actual rows number. |
| 369 | + let actual_rows = rows |
| 370 | + .rows::<(i32, i32)>() |
| 371 | + .unwrap() |
| 372 | + .collect::<Result<Vec<_>, _>>() |
| 373 | + .unwrap(); |
| 374 | + assert_eq!(actual_rows.len(), insert_num as usize); |
| 375 | +} |
| 376 | + |
| 377 | +async fn unpaged_error(use_prepared_statements: bool) { |
| 378 | + let table_name = if use_prepared_statements { |
| 379 | + "execute_unpaged" |
| 380 | + } else { |
| 381 | + "query_unpaged" |
| 382 | + }; |
| 383 | + let session: Session = create_session(table_name).await; |
| 384 | + session.await_schema_agreement().await.unwrap(); |
| 385 | + |
| 386 | + let query_result: Result<QueryResult, ExecutionError>; |
| 387 | + if use_prepared_statements { |
| 388 | + let prepared_statement = session |
| 389 | + .prepare(format!("SELECT * FROM {}", table_name)) |
| 390 | + .await |
| 391 | + .unwrap(); |
| 392 | + // NOTE: drop table to make the main query return error |
| 393 | + session |
| 394 | + .ddl(format!("DROP TABLE IF EXISTS {}", table_name)) |
| 395 | + .await |
| 396 | + .unwrap(); |
| 397 | + query_result = session.execute_unpaged(&prepared_statement, &[]).await; |
| 398 | + } else { |
| 399 | + let select_query = Query::new(format!("SELECT * FROM fake{}", table_name)); |
| 400 | + query_result = session.query_unpaged(select_query, &[]).await; |
| 401 | + } |
| 402 | + match query_result { |
| 403 | + Ok(_) => panic!("Unexpected success"), |
| 404 | + Err(err) => println!("Table not found as expected: {:?}", err), |
| 405 | + } |
| 406 | +} |
| 407 | + |
| 408 | +#[tokio::test] |
| 409 | +async fn test_query_unpaged_error() { |
| 410 | + unpaged_error(false).await |
| 411 | +} |
| 412 | + |
| 413 | +#[tokio::test] |
| 414 | +async fn test_execute_unpaged_error() { |
| 415 | + unpaged_error(true).await |
| 416 | +} |
| 417 | + |
| 418 | +#[tokio::test] |
| 419 | +async fn test_query_unpaged_no_rows() { |
| 420 | + check_query_unpaged(0, false).await; |
| 421 | +} |
| 422 | + |
| 423 | +#[tokio::test] |
| 424 | +async fn test_query_unpaged_one_row() { |
| 425 | + check_query_unpaged(1, false).await; |
| 426 | +} |
| 427 | + |
| 428 | +#[tokio::test] |
| 429 | +async fn test_query_unpaged_ten_rows() { |
| 430 | + check_query_unpaged(10, false).await; |
| 431 | +} |
| 432 | + |
| 433 | +#[tokio::test] |
| 434 | +async fn test_query_unpaged_hundred_rows() { |
| 435 | + check_query_unpaged(100, false).await; |
| 436 | +} |
| 437 | + |
| 438 | +#[tokio::test] |
| 439 | +async fn test_query_unpaged_thousand_rows() { |
| 440 | + check_query_unpaged(1000, false).await; |
| 441 | +} |
| 442 | + |
| 443 | +#[tokio::test] |
| 444 | +async fn test_execute_unpaged_no_rows() { |
| 445 | + check_query_unpaged(0, true).await; |
| 446 | +} |
| 447 | + |
| 448 | +#[tokio::test] |
| 449 | +async fn test_execute_unpaged_one_row() { |
| 450 | + check_query_unpaged(1, true).await; |
| 451 | +} |
| 452 | + |
| 453 | +#[tokio::test] |
| 454 | +async fn test_execute_unpaged_ten_rows() { |
| 455 | + check_query_unpaged(10, true).await; |
| 456 | +} |
| 457 | + |
| 458 | +#[tokio::test] |
| 459 | +async fn test_execute_unpaged_hundred_rows() { |
| 460 | + check_query_unpaged(100, true).await; |
| 461 | +} |
| 462 | + |
| 463 | +#[tokio::test] |
| 464 | +async fn test_execute_unpaged_thousand_rows() { |
| 465 | + check_query_unpaged(1000, true).await; |
| 466 | +} |
0 commit comments