Skip to content

Commit 72b59c7

Browse files
Move timestamp test to statement.rs and add missing cases
1 parent c223d1a commit 72b59c7

File tree

3 files changed

+276
-194
lines changed

3 files changed

+276
-194
lines changed

scylla/tests/integration/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod shards;
1919
mod silent_prepare_batch;
2020
mod silent_prepare_query;
2121
mod skip_metadata_optimization;
22+
mod statement;
2223
mod tablets;
2324
#[path = "../common/utils.rs"]
2425
mod utils;

scylla/tests/integration/session.rs

Lines changed: 2 additions & 194 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::utils::{
55
use assert_matches::assert_matches;
66
use futures::{FutureExt, StreamExt as _, TryStreamExt};
77
use itertools::Itertools;
8-
use scylla::batch::{Batch, BatchStatement, BatchType};
8+
use scylla::batch::{Batch, BatchStatement};
99
use scylla::client::caching_session::CachingSession;
1010
use scylla::client::execution_profile::ExecutionProfile;
1111
use scylla::client::session::Session;
@@ -31,7 +31,7 @@ use std::collections::BTreeSet;
3131
use std::collections::{BTreeMap, HashMap};
3232
use std::collections::HashSet;
3333
use std::sync::atomic::{AtomicBool, Ordering};
34-
use std::sync::{Arc, Mutex};
34+
use std::sync::Arc;
3535
use tokio::net::TcpListener;
3636
use uuid::Uuid;
3737

@@ -945,198 +945,6 @@ async fn test_await_schema_agreement() {
945945
let _schema_version = session.await_schema_agreement().await.unwrap();
946946
}
947947

948-
#[tokio::test]
949-
async fn test_timestamp() {
950-
setup_tracing();
951-
let session = create_new_session_builder().build().await.unwrap();
952-
let ks = unique_keyspace_name();
953-
954-
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
955-
session
956-
.ddl(format!(
957-
"CREATE TABLE IF NOT EXISTS {}.t_timestamp (a text, b text, primary key (a))",
958-
ks
959-
))
960-
.await
961-
.unwrap();
962-
963-
session.await_schema_agreement().await.unwrap();
964-
965-
let query_str = format!("INSERT INTO {}.t_timestamp (a, b) VALUES (?, ?)", ks);
966-
967-
// test regular query timestamps
968-
969-
let mut regular_query = Query::new(query_str.to_string());
970-
971-
regular_query.set_timestamp(Some(420));
972-
session
973-
.query_unpaged(regular_query.clone(), ("regular query", "higher timestamp"))
974-
.await
975-
.unwrap();
976-
977-
regular_query.set_timestamp(Some(42));
978-
session
979-
.query_unpaged(regular_query.clone(), ("regular query", "lower timestamp"))
980-
.await
981-
.unwrap();
982-
983-
// test prepared statement timestamps
984-
985-
let mut prepared_statement = session.prepare(query_str).await.unwrap();
986-
987-
prepared_statement.set_timestamp(Some(420));
988-
session
989-
.execute_unpaged(&prepared_statement, ("prepared query", "higher timestamp"))
990-
.await
991-
.unwrap();
992-
993-
prepared_statement.set_timestamp(Some(42));
994-
session
995-
.execute_unpaged(&prepared_statement, ("prepared query", "lower timestamp"))
996-
.await
997-
.unwrap();
998-
999-
// test batch statement timestamps
1000-
1001-
let mut batch: Batch = Default::default();
1002-
batch.append_statement(regular_query);
1003-
batch.append_statement(prepared_statement);
1004-
1005-
batch.set_timestamp(Some(420));
1006-
session
1007-
.batch(
1008-
&batch,
1009-
(
1010-
("first query in batch", "higher timestamp"),
1011-
("second query in batch", "higher timestamp"),
1012-
),
1013-
)
1014-
.await
1015-
.unwrap();
1016-
1017-
batch.set_timestamp(Some(42));
1018-
session
1019-
.batch(
1020-
&batch,
1021-
(
1022-
("first query in batch", "lower timestamp"),
1023-
("second query in batch", "lower timestamp"),
1024-
),
1025-
)
1026-
.await
1027-
.unwrap();
1028-
1029-
let query_rows_result = session
1030-
.query_unpaged(
1031-
format!("SELECT a, b, WRITETIME(b) FROM {}.t_timestamp", ks),
1032-
&[],
1033-
)
1034-
.await
1035-
.unwrap()
1036-
.into_rows_result()
1037-
.unwrap();
1038-
1039-
let mut results = query_rows_result
1040-
.rows::<(&str, &str, i64)>()
1041-
.unwrap()
1042-
.map(Result::unwrap)
1043-
.collect::<Vec<_>>();
1044-
results.sort();
1045-
1046-
let expected_results = [
1047-
("first query in batch", "higher timestamp", 420),
1048-
("prepared query", "higher timestamp", 420),
1049-
("regular query", "higher timestamp", 420),
1050-
("second query in batch", "higher timestamp", 420),
1051-
]
1052-
.into_iter()
1053-
.collect::<Vec<_>>();
1054-
1055-
assert_eq!(results, expected_results);
1056-
}
1057-
1058-
#[tokio::test]
1059-
async fn test_timestamp_generator() {
1060-
use rand::random;
1061-
use scylla::policies::timestamp_generator::TimestampGenerator;
1062-
1063-
setup_tracing();
1064-
struct LocalTimestampGenerator {
1065-
generated_timestamps: Arc<Mutex<HashSet<i64>>>,
1066-
}
1067-
1068-
impl TimestampGenerator for LocalTimestampGenerator {
1069-
fn next_timestamp(&self) -> i64 {
1070-
let timestamp = random::<i64>().abs();
1071-
self.generated_timestamps.lock().unwrap().insert(timestamp);
1072-
timestamp
1073-
}
1074-
}
1075-
1076-
let timestamps = Arc::new(Mutex::new(HashSet::new()));
1077-
let generator = LocalTimestampGenerator {
1078-
generated_timestamps: timestamps.clone(),
1079-
};
1080-
1081-
let session = create_new_session_builder()
1082-
.timestamp_generator(Arc::new(generator))
1083-
.build()
1084-
.await
1085-
.unwrap();
1086-
let ks = unique_keyspace_name();
1087-
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
1088-
session
1089-
.ddl(format!(
1090-
"CREATE TABLE IF NOT EXISTS {}.t_generator (a int primary key, b int)",
1091-
ks
1092-
))
1093-
.await
1094-
.unwrap();
1095-
1096-
let prepared = session
1097-
.prepare(format!(
1098-
"INSERT INTO {}.t_generator (a, b) VALUES (1, 1)",
1099-
ks
1100-
))
1101-
.await
1102-
.unwrap();
1103-
session.execute_unpaged(&prepared, []).await.unwrap();
1104-
1105-
let unprepared = Query::new(format!(
1106-
"INSERT INTO {}.t_generator (a, b) VALUES (2, 2)",
1107-
ks
1108-
));
1109-
session.query_unpaged(unprepared, []).await.unwrap();
1110-
1111-
let mut batch = Batch::new(BatchType::Unlogged);
1112-
let stmt = session
1113-
.prepare(format!(
1114-
"INSERT INTO {}.t_generator (a, b) VALUES (3, 3)",
1115-
ks
1116-
))
1117-
.await
1118-
.unwrap();
1119-
batch.append_statement(stmt);
1120-
session.batch(&batch, &((),)).await.unwrap();
1121-
1122-
let query_rows_result = session
1123-
.query_unpaged(
1124-
format!("SELECT a, b, WRITETIME(b) FROM {}.t_generator", ks),
1125-
&[],
1126-
)
1127-
.await
1128-
.unwrap()
1129-
.into_rows_result()
1130-
.unwrap();
1131-
1132-
let timestamps_locked = timestamps.lock().unwrap();
1133-
assert!(query_rows_result
1134-
.rows::<(i32, i32, i64)>()
1135-
.unwrap()
1136-
.map(|row_result| row_result.unwrap())
1137-
.all(|(_a, _b, writetime)| timestamps_locked.contains(&writetime)));
1138-
}
1139-
1140948
#[ignore = "works on remote Scylla instances only (local ones are too fast)"]
1141949
#[tokio::test]
1142950
async fn test_request_timeout() {

0 commit comments

Comments
 (0)