Skip to content

Commit ecc7fe6

Browse files
authored
Merge pull request #1128 from smoczy123/implement-timestamp-generator
Implement timestamp generators
2 parents d40e184 + 34315f2 commit ecc7fe6

File tree

9 files changed

+408
-9
lines changed

9 files changed

+408
-9
lines changed

docs/source/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
- [USE keyspace](queries/usekeyspace.md)
2828
- [Schema agreement](queries/schema-agreement.md)
2929
- [Query timeouts](queries/timeouts.md)
30+
- [Timestamp generators](queries/timestamp-generators.md)
3031

3132
- [Execution profiles](execution-profiles/execution-profiles.md)
3233
- [Creating a profile and setting it](execution-profiles/create-and-use.md)

docs/source/queries/queries.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Driver supports all kinds of statements supported by ScyllaDB. The following tables aim to bridge between DB concepts and driver's API.
44
They include recommendations on which API to use in what cases.
55

6-
## Kinds of CQL statements (from the CQL protocol point of view):
6+
## Kinds of CQL statements (from the CQL protocol point of view)
77

88
| Kind of CQL statement | Single | Batch |
99
|-----------------------|---------------------|------------------------------------------|
@@ -59,7 +59,7 @@ This is **NOT** strictly related to content of the CQL query string.
5959
| Load balancing | advanced if prepared, else primitive | advanced if prepared **and ALL** statements in the batch target the same partition, else primitive |
6060
| Suitable operations | most of operations | - a list of operations that needs to be executed atomically (batch LightWeight Transaction)</br> - a batch of operations targetting the same partition (as an advanced optimisation) |
6161

62-
## CQL statements - operations (based on what the CQL string contains):
62+
## CQL statements - operations (based on what the CQL string contains)
6363

6464
| CQL data manipulation statement | Recommended statement kind | Recommended Session operation |
6565
|------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
@@ -86,9 +86,10 @@ This is **NOT** strictly related to content of the CQL query string.
8686

8787
For more detailed comparison and more best practices, see [doc page about paging](paged.md).
8888

89-
### Queries are fully asynchronous - you can run as many of them in parallel as you wish.
89+
### Queries are fully asynchronous - you can run as many of them in parallel as you wish
90+
91+
## `USE KEYSPACE`
9092

91-
## `USE KEYSPACE`:
9293
There is a special functionality to enable [USE keyspace](usekeyspace.md).
9394

9495
```{eval-rst}
@@ -106,4 +107,5 @@ There is a special functionality to enable [USE keyspace](usekeyspace.md).
106107
schema-agreement
107108
lwt
108109
timeouts
110+
timestamp-generators
109111
```
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Timestamp generators
2+
3+
If you want to generate timestamps on the client side you can provide
4+
a TimestampGenerator to a SessionBuilder when creating a Session. Then
5+
every executed statement will have attached a new timestamp generated
6+
by the provided TimestampGenerator.
7+
Timestamps are set according to precendence:
8+
9+
1. ```USING TIMESTAMP``` in the query itself
10+
2. Manually using ```set_timestamp``` on the query
11+
3. Timestamp generated by the generator
12+
13+
## Simple Timestamp Generator
14+
15+
Most basic client-side timestamp generator. Generates timestamp
16+
based on system clock. Provides no guarantees and panics when the system clock
17+
provides timestamp before the unix epoch.
18+
19+
## Monotonic Timestamp Generator
20+
21+
Client-side timestamp generator. Guarantees monotonic timestamps
22+
based on the system clock, with automatic timestamp incrementation
23+
if the system clock timestamp would not be monotonic. If the clock skew
24+
exceeds `warning_threshold` of the generator (can be changed with `with_warning_times`, 1s by default)
25+
user will be warned with timestamp generation with `warning_interval` cooldown period
26+
(can be changed with `with_warning_times`, 1s by default) to not spam the user. If user does not want to
27+
be warned about the clock skew, the warnings can be turned off with `without_warnings` function.
28+
29+
``` rust
30+
# extern crate scylla;
31+
# use std::error::Error;
32+
# async fn check_only_compiles() -> Result<(), Box<dyn std::error::Error>> {
33+
use scylla::client::session::Session;
34+
use scylla::client::session_builder::SessionBuilder;
35+
use scylla::policies::timestamp_generator::MonotonicTimestampGenerator;
36+
use scylla::query::Query;
37+
use std::sync::Arc;
38+
39+
let session: Session = SessionBuilder::new()
40+
.known_node("127.0.0.1:9042")
41+
.timestamp_generator(Arc::new(MonotonicTimestampGenerator::new()))
42+
.build()
43+
.await?;
44+
45+
// This query will have a timestamp generated
46+
// by the monotonic timestamp generator
47+
let my_query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(?)");
48+
let to_insert: i32 = 12345;
49+
session.query_unpaged(my_query, (to_insert,)).await?;
50+
# Ok(())
51+
# }
52+
```
53+
54+

scylla/src/client/session.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::policies::host_filter::HostFilter;
3131
use crate::policies::load_balancing::{self, RoutingInfo};
3232
use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
3333
use crate::policies::speculative_execution;
34+
use crate::policies::timestamp_generator::TimestampGenerator;
3435
use crate::prepared_statement::{PartitionKeyError, PreparedStatement};
3536
use crate::query::Query;
3637
#[allow(deprecated)]
@@ -181,6 +182,10 @@ pub struct SessionConfig {
181182
/// Generally, this options is best left as default (false).
182183
pub disallow_shard_aware_port: bool,
183184

185+
/// Timestamp generator used for generating timestamps on the client-side
186+
/// If None, server-side timestamps are used.
187+
pub timestamp_generator: Option<Arc<dyn TimestampGenerator>>,
188+
184189
/// If empty, fetch all keyspaces
185190
pub keyspaces_to_fetch: Vec<String>,
186191

@@ -293,6 +298,7 @@ impl SessionConfig {
293298
connect_timeout: Duration::from_secs(5),
294299
connection_pool_size: Default::default(),
295300
disallow_shard_aware_port: false,
301+
timestamp_generator: None,
296302
keyspaces_to_fetch: Vec::new(),
297303
fetch_schema_metadata: true,
298304
keepalive_interval: Some(Duration::from_secs(30)),
@@ -982,6 +988,7 @@ where
982988
compression: config.compression,
983989
tcp_nodelay: config.tcp_nodelay,
984990
tcp_keepalive_interval: config.tcp_keepalive_interval,
991+
timestamp_generator: config.timestamp_generator,
985992
#[cfg(feature = "ssl")]
986993
ssl_config: config.ssl_context.map(SslConfig::new_with_global_context),
987994
authenticator: config.authenticator.clone(),

scylla/src/client/session_builder.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::cloud::{CloudConfig, CloudConfigError};
1414
use crate::errors::NewSessionError;
1515
use crate::policies::address_translator::AddressTranslator;
1616
use crate::policies::host_filter::HostFilter;
17+
use crate::policies::timestamp_generator::TimestampGenerator;
1718
use crate::statement::Consistency;
1819
#[cfg(feature = "ssl")]
1920
use openssl::ssl::SslContext;
@@ -684,6 +685,28 @@ impl<K: SessionBuilderKind> GenericSessionBuilder<K> {
684685
self
685686
}
686687

688+
/// Set the timestamp generator that will generate timestamps on the client-side.
689+
///
690+
/// # Example
691+
/// ```
692+
/// # use scylla::client::session::Session;
693+
/// # use scylla::client::session_builder::SessionBuilder;
694+
/// # use scylla::policies::timestamp_generator::SimpleTimestampGenerator;
695+
/// # use std::sync::Arc;
696+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
697+
/// let session: Session = SessionBuilder::new()
698+
/// .known_node("127.0.0.1:9042")
699+
/// .timestamp_generator(Arc::new(SimpleTimestampGenerator::new()))
700+
/// .build()
701+
/// .await?;
702+
/// # Ok(())
703+
/// # }
704+
/// ```
705+
pub fn timestamp_generator(mut self, timestamp_generator: Arc<dyn TimestampGenerator>) -> Self {
706+
self.config.timestamp_generator = Some(timestamp_generator);
707+
self
708+
}
709+
687710
/// Set the keyspaces to be fetched, to retrieve their strategy, and schema metadata if enabled
688711
/// No keyspaces, the default value, means all the keyspaces will be fetched.
689712
///

scylla/src/client/session_test.rs

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use super::execution_profile::ExecutionProfile;
33
use super::session::Session;
44
use super::session_builder::SessionBuilder;
55
use crate as scylla;
6-
use crate::batch::{Batch, BatchStatement};
6+
use crate::batch::{Batch, BatchStatement, BatchType};
77
use crate::cluster::metadata::Strategy::NetworkTopologyStrategy;
88
use crate::cluster::metadata::{CollectionType, ColumnKind, CqlType, NativeType, UserDefinedType};
99
use crate::deserialize::DeserializeOwnedValue;
@@ -32,7 +32,7 @@ use scylla_cql::types::serialize::value::SerializeValue;
3232
use std::collections::{BTreeMap, HashMap};
3333
use std::collections::{BTreeSet, HashSet};
3434
use std::sync::atomic::{AtomicBool, Ordering};
35-
use std::sync::Arc;
35+
use std::sync::{Arc, Mutex};
3636
use tokio::net::TcpListener;
3737
use uuid::Uuid;
3838

@@ -1327,6 +1327,88 @@ async fn test_timestamp() {
13271327
assert_eq!(results, expected_results);
13281328
}
13291329

1330+
#[tokio::test]
1331+
async fn test_timestamp_generator() {
1332+
use crate::policies::timestamp_generator::TimestampGenerator;
1333+
use rand::random;
1334+
1335+
setup_tracing();
1336+
struct LocalTimestampGenerator {
1337+
generated_timestamps: Arc<Mutex<HashSet<i64>>>,
1338+
}
1339+
1340+
impl TimestampGenerator for LocalTimestampGenerator {
1341+
fn next_timestamp(&self) -> i64 {
1342+
let timestamp = random::<i64>().abs();
1343+
self.generated_timestamps.lock().unwrap().insert(timestamp);
1344+
timestamp
1345+
}
1346+
}
1347+
1348+
let timestamps = Arc::new(Mutex::new(HashSet::new()));
1349+
let generator = LocalTimestampGenerator {
1350+
generated_timestamps: timestamps.clone(),
1351+
};
1352+
1353+
let session = create_new_session_builder()
1354+
.timestamp_generator(Arc::new(generator))
1355+
.build()
1356+
.await
1357+
.unwrap();
1358+
let ks = unique_keyspace_name();
1359+
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
1360+
session
1361+
.ddl(format!(
1362+
"CREATE TABLE IF NOT EXISTS {}.t_generator (a int primary key, b int)",
1363+
ks
1364+
))
1365+
.await
1366+
.unwrap();
1367+
1368+
let prepared = session
1369+
.prepare(format!(
1370+
"INSERT INTO {}.t_generator (a, b) VALUES (1, 1)",
1371+
ks
1372+
))
1373+
.await
1374+
.unwrap();
1375+
session.execute_unpaged(&prepared, []).await.unwrap();
1376+
1377+
let unprepared = Query::new(format!(
1378+
"INSERT INTO {}.t_generator (a, b) VALUES (2, 2)",
1379+
ks
1380+
));
1381+
session.query_unpaged(unprepared, []).await.unwrap();
1382+
1383+
let mut batch = Batch::new(BatchType::Unlogged);
1384+
let stmt = session
1385+
.prepare(format!(
1386+
"INSERT INTO {}.t_generator (a, b) VALUES (3, 3)",
1387+
ks
1388+
))
1389+
.await
1390+
.unwrap();
1391+
batch.append_statement(stmt);
1392+
session.batch(&batch, &((),)).await.unwrap();
1393+
1394+
let query_rows_result = session
1395+
.query_unpaged(
1396+
format!("SELECT a, b, WRITETIME(b) FROM {}.t_generator", ks),
1397+
&[],
1398+
)
1399+
.await
1400+
.unwrap()
1401+
.into_rows_result()
1402+
.unwrap();
1403+
1404+
let timestamps_locked = timestamps.lock().unwrap();
1405+
assert!(query_rows_result
1406+
.rows::<(i32, i32, i64)>()
1407+
.unwrap()
1408+
.map(|row_result| row_result.unwrap())
1409+
.all(|(_a, _b, writetime)| timestamps_locked.contains(&writetime)));
1410+
}
1411+
13301412
#[ignore = "works on remote Scylla instances only (local ones are too fast)"]
13311413
#[tokio::test]
13321414
async fn test_request_timeout() {

scylla/src/network/connection.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::frame::{
2222
FrameParams, SerializedRequest,
2323
};
2424
use crate::policies::address_translator::AddressTranslator;
25+
use crate::policies::timestamp_generator::TimestampGenerator;
2526
use crate::query::Query;
2627
use crate::response::query_result::QueryResult;
2728
use crate::response::{
@@ -324,6 +325,7 @@ pub(crate) struct ConnectionConfig {
324325
pub(crate) compression: Option<Compression>,
325326
pub(crate) tcp_nodelay: bool,
326327
pub(crate) tcp_keepalive_interval: Option<Duration>,
328+
pub(crate) timestamp_generator: Option<Arc<dyn TimestampGenerator>>,
327329
#[cfg(feature = "ssl")]
328330
pub(crate) ssl_config: Option<SslConfig>,
329331
pub(crate) connect_timeout: std::time::Duration,
@@ -349,6 +351,7 @@ impl Default for ConnectionConfig {
349351
compression: None,
350352
tcp_nodelay: true,
351353
tcp_keepalive_interval: None,
354+
timestamp_generator: None,
352355
event_sender: None,
353356
#[cfg(feature = "ssl")]
354357
ssl_config: None,
@@ -853,6 +856,14 @@ impl Connection {
853856
page_size: Option<PageSize>,
854857
paging_state: PagingState,
855858
) -> Result<QueryResponse, RequestAttemptError> {
859+
let get_timestamp_from_gen = || {
860+
self.config
861+
.timestamp_generator
862+
.as_ref()
863+
.map(|gen| gen.next_timestamp())
864+
};
865+
let timestamp = query.get_timestamp().or_else(get_timestamp_from_gen);
866+
856867
let query_frame = query::Query {
857868
contents: Cow::Borrowed(&query.contents),
858869
parameters: query::QueryParameters {
@@ -862,7 +873,7 @@ impl Connection {
862873
page_size: page_size.map(Into::into),
863874
paging_state,
864875
skip_metadata: false,
865-
timestamp: query.get_timestamp(),
876+
timestamp,
866877
},
867878
};
868879

@@ -915,14 +926,24 @@ impl Connection {
915926
page_size: Option<PageSize>,
916927
paging_state: PagingState,
917928
) -> Result<QueryResponse, RequestAttemptError> {
929+
let get_timestamp_from_gen = || {
930+
self.config
931+
.timestamp_generator
932+
.as_ref()
933+
.map(|gen| gen.next_timestamp())
934+
};
935+
let timestamp = prepared_statement
936+
.get_timestamp()
937+
.or_else(get_timestamp_from_gen);
938+
918939
let execute_frame = execute::Execute {
919940
id: prepared_statement.get_id().to_owned(),
920941
parameters: query::QueryParameters {
921942
consistency,
922943
serial_consistency,
923944
values: Cow::Borrowed(values),
924945
page_size: page_size.map(Into::into),
925-
timestamp: prepared_statement.get_timestamp(),
946+
timestamp,
926947
skip_metadata: prepared_statement.get_use_cached_result_metadata(),
927948
paging_state,
928949
},
@@ -1057,13 +1078,21 @@ impl Connection {
10571078

10581079
let values = RawBatchValuesAdapter::new(values, contexts);
10591080

1081+
let get_timestamp_from_gen = || {
1082+
self.config
1083+
.timestamp_generator
1084+
.as_ref()
1085+
.map(|gen| gen.next_timestamp())
1086+
};
1087+
let timestamp = batch.get_timestamp().or_else(get_timestamp_from_gen);
1088+
10601089
let batch_frame = batch::Batch {
10611090
statements: Cow::Borrowed(&batch.statements),
10621091
values,
10631092
batch_type: batch.get_type(),
10641093
consistency,
10651094
serial_consistency,
1066-
timestamp: batch.get_timestamp(),
1095+
timestamp,
10671096
};
10681097

10691098
loop {

scylla/src/policies/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ pub mod host_filter;
2020
pub mod load_balancing;
2121
pub mod retry;
2222
pub mod speculative_execution;
23+
pub mod timestamp_generator;

0 commit comments

Comments
 (0)