@@ -3,7 +3,7 @@ use super::execution_profile::ExecutionProfile;
33use super :: session:: Session ;
44use super :: session_builder:: SessionBuilder ;
55use crate as scylla;
6- use crate :: batch:: { Batch , BatchStatement } ;
6+ use crate :: batch:: { Batch , BatchStatement , BatchType } ;
77use crate :: cluster:: metadata:: Strategy :: NetworkTopologyStrategy ;
88use crate :: cluster:: metadata:: { CollectionType , ColumnKind , CqlType , NativeType , UserDefinedType } ;
99use crate :: deserialize:: DeserializeOwnedValue ;
@@ -32,7 +32,7 @@ use scylla_cql::types::serialize::value::SerializeValue;
3232use std:: collections:: { BTreeMap , HashMap } ;
3333use std:: collections:: { BTreeSet , HashSet } ;
3434use std:: sync:: atomic:: { AtomicBool , Ordering } ;
35- use std:: sync:: Arc ;
35+ use std:: sync:: { Arc , Mutex } ;
3636use tokio:: net:: TcpListener ;
3737use uuid:: Uuid ;
3838
@@ -1327,6 +1327,88 @@ async fn test_timestamp() {
13271327 assert_eq ! ( results, expected_results) ;
13281328}
13291329
1330+ #[ tokio:: test]
1331+ async fn test_timestamp_generator ( ) {
1332+ use crate :: policies:: timestamp_generator:: TimestampGenerator ;
1333+ use rand:: random;
1334+
1335+ setup_tracing ( ) ;
1336+ struct LocalTimestampGenerator {
1337+ generated_timestamps : Arc < Mutex < HashSet < i64 > > > ,
1338+ }
1339+
1340+ impl TimestampGenerator for LocalTimestampGenerator {
1341+ fn next_timestamp ( & self ) -> i64 {
1342+ let timestamp = random :: < i64 > ( ) . abs ( ) ;
1343+ self . generated_timestamps . lock ( ) . unwrap ( ) . insert ( timestamp) ;
1344+ timestamp
1345+ }
1346+ }
1347+
1348+ let timestamps = Arc :: new ( Mutex :: new ( HashSet :: new ( ) ) ) ;
1349+ let generator = LocalTimestampGenerator {
1350+ generated_timestamps : timestamps. clone ( ) ,
1351+ } ;
1352+
1353+ let session = create_new_session_builder ( )
1354+ . timestamp_generator ( Arc :: new ( generator) )
1355+ . build ( )
1356+ . await
1357+ . unwrap ( ) ;
1358+ let ks = unique_keyspace_name ( ) ;
1359+ session. ddl ( format ! ( "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}" , ks) ) . await . unwrap ( ) ;
1360+ session
1361+ . ddl ( format ! (
1362+ "CREATE TABLE IF NOT EXISTS {}.t_generator (a int primary key, b int)" ,
1363+ ks
1364+ ) )
1365+ . await
1366+ . unwrap ( ) ;
1367+
1368+ let prepared = session
1369+ . prepare ( format ! (
1370+ "INSERT INTO {}.t_generator (a, b) VALUES (1, 1)" ,
1371+ ks
1372+ ) )
1373+ . await
1374+ . unwrap ( ) ;
1375+ session. execute_unpaged ( & prepared, [ ] ) . await . unwrap ( ) ;
1376+
1377+ let unprepared = Query :: new ( format ! (
1378+ "INSERT INTO {}.t_generator (a, b) VALUES (2, 2)" ,
1379+ ks
1380+ ) ) ;
1381+ session. query_unpaged ( unprepared, [ ] ) . await . unwrap ( ) ;
1382+
1383+ let mut batch = Batch :: new ( BatchType :: Unlogged ) ;
1384+ let stmt = session
1385+ . prepare ( format ! (
1386+ "INSERT INTO {}.t_generator (a, b) VALUES (3, 3)" ,
1387+ ks
1388+ ) )
1389+ . await
1390+ . unwrap ( ) ;
1391+ batch. append_statement ( stmt) ;
1392+ session. batch ( & batch, & ( ( ) , ) ) . await . unwrap ( ) ;
1393+
1394+ let query_rows_result = session
1395+ . query_unpaged (
1396+ format ! ( "SELECT a, b, WRITETIME(b) FROM {}.t_generator" , ks) ,
1397+ & [ ] ,
1398+ )
1399+ . await
1400+ . unwrap ( )
1401+ . into_rows_result ( )
1402+ . unwrap ( ) ;
1403+
1404+ let timestamps_locked = timestamps. lock ( ) . unwrap ( ) ;
1405+ assert ! ( query_rows_result
1406+ . rows:: <( i32 , i32 , i64 ) >( )
1407+ . unwrap( )
1408+ . map( |row_result| row_result. unwrap( ) )
1409+ . all( |( _a, _b, writetime) | timestamps_locked. contains( & writetime) ) ) ;
1410+ }
1411+
13301412#[ ignore = "works on remote Scylla instances only (local ones are too fast)" ]
13311413#[ tokio:: test]
13321414async fn test_request_timeout ( ) {
0 commit comments