Skip to content

Commit e619385

Browse files
authored
Merge pull request #768 from avelanarius/session_slimming_1
session: remove estimate_replicas_for_query, get_tracing_info_custom
2 parents 22c4519 + ebfc00d commit e619385

File tree

7 files changed

+174
-149
lines changed

7 files changed

+174
-149
lines changed

examples/compare-tokens.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use anyhow::Result;
22
use scylla::frame::value::ValueList;
3+
use scylla::routing::Token;
34
use scylla::transport::partitioner::{Murmur3Partitioner, Partitioner};
45
use scylla::transport::NodeAddr;
5-
use scylla::{load_balancing, Session, SessionBuilder};
6+
use scylla::{Session, SessionBuilder};
67
use std::env;
78

89
#[tokio::main]
@@ -32,16 +33,11 @@ async fn main() -> Result<()> {
3233
let serialized_pk = (pk,).serialized()?.into_owned();
3334
let t = Murmur3Partitioner::hash(&prepared.compute_partition_key(&serialized_pk)?).value;
3435

35-
let statement_info = load_balancing::RoutingInfo {
36-
token: Some(scylla::routing::Token { value: t }),
37-
keyspace: Some("ks"),
38-
is_confirmed_lwt: false,
39-
..Default::default()
40-
};
4136
println!(
42-
"Estimated replicas for query: {:?}",
37+
"Token endpoints for query: {:?}",
4338
session
44-
.estimate_replicas_for_query(&statement_info)
39+
.get_cluster_data()
40+
.get_token_endpoints("ks", Token { value: t })
4541
.iter()
4642
.map(|n| n.address)
4743
.collect::<Vec<NodeAddr>>()

examples/tracing.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use scylla::batch::Batch;
77
use scylla::statement::{
88
prepared_statement::PreparedStatement, query::Query, Consistency, SerialConsistency,
99
};
10-
use scylla::tracing::{GetTracingConfig, TracingInfo};
10+
use scylla::tracing::TracingInfo;
1111
use scylla::transport::iterator::RowIterator;
1212
use scylla::QueryResult;
1313
use scylla::{Session, SessionBuilder};
@@ -21,7 +21,10 @@ async fn main() -> Result<()> {
2121
let uri = env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
2222

2323
println!("Connecting to {} ...", uri);
24-
let session: Session = SessionBuilder::new().known_node(uri).build().await?;
24+
let session: Session = SessionBuilder::new()
25+
.known_node(uri.as_str())
26+
.build()
27+
.await?;
2528

2629
session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}", &[]).await?;
2730

@@ -106,19 +109,18 @@ async fn main() -> Result<()> {
106109
println!("Batch tracing id: {:?}\n", batch_result.tracing_id);
107110

108111
// CUSTOM
109-
// GetTracingConfig allows to specify a custom settings for querying tracing info
112+
// Session configuration allows specifying custom settings for querying tracing info.
110113
// Tracing info might not immediately be available on queried node
111114
// so the driver performs a few attempts with sleeps in between.
112-
113-
let custom_config: GetTracingConfig = GetTracingConfig {
114-
attempts: NonZeroU32::new(8).unwrap(),
115-
interval: Duration::from_millis(100),
116-
consistency: Consistency::One,
117-
};
118-
119-
let _custom_info: TracingInfo = session
120-
.get_tracing_info_custom(&query_tracing_id, &custom_config)
115+
let session: Session = SessionBuilder::new()
116+
.known_node(uri)
117+
.tracing_info_fetch_attempts(NonZeroU32::new(8).unwrap())
118+
.tracing_info_fetch_interval(Duration::from_millis(100))
119+
.tracing_info_fetch_consistency(Consistency::One)
120+
.build()
121121
.await?;
122122

123+
let _custom_info: TracingInfo = session.get_tracing_info(&query_tracing_id).await?;
124+
123125
Ok(())
124126
}

scylla/src/tracing.rs

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
use crate::statement::Consistency;
21
use itertools::Itertools;
32
use std::collections::HashMap;
43
use std::net::IpAddr;
5-
use std::num::NonZeroU32;
6-
use std::time::Duration;
74
use uuid::Uuid;
85

96
use crate::cql_to_rust::{FromRow, FromRowError};
@@ -35,20 +32,6 @@ pub struct TracingEvent {
3532
pub thread: Option<String>,
3633
}
3734

38-
/// Used to configure a custom retry strategy when querying tracing info
39-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40-
pub struct GetTracingConfig {
41-
/// Number of attempts to be made before giving up.
42-
/// Default value: 5
43-
pub attempts: NonZeroU32,
44-
/// Interval to wait between each attempt.
45-
/// Default value: 3 milliseconds
46-
pub interval: Duration,
47-
/// Consistency to use in queries that read TracingInfo.
48-
/// Default value: One
49-
pub consistency: Consistency,
50-
}
51-
5235
impl TracingInfo {
5336
/// Returns a list of unique nodes involved in the query
5437
pub fn nodes(&self) -> Vec<IpAddr> {
@@ -60,16 +43,6 @@ impl TracingInfo {
6043
}
6144
}
6245

63-
impl Default for GetTracingConfig {
64-
fn default() -> GetTracingConfig {
65-
GetTracingConfig {
66-
attempts: NonZeroU32::new(5).unwrap(),
67-
interval: Duration::from_millis(3),
68-
consistency: Consistency::One,
69-
}
70-
}
71-
}
72-
7346
// A query used to query TracingInfo from system_traces.sessions
7447
pub(crate) const TRACES_SESSION_QUERY_STR: &str =
7548
"SELECT client, command, coordinator, duration, parameters, request, started_at \

scylla/src/transport/session.rs

Lines changed: 45 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use std::fmt::Display;
2727
use std::future::Future;
2828
use std::io;
2929
use std::net::SocketAddr;
30+
use std::num::NonZeroU32;
3031
use std::str::FromStr;
3132
use std::sync::atomic::AtomicUsize;
3233
use std::sync::atomic::Ordering;
@@ -59,7 +60,7 @@ use crate::prepared_statement::{PartitionKeyError, PreparedStatement};
5960
use crate::query::Query;
6061
use crate::routing::Token;
6162
use crate::statement::{Consistency, SerialConsistency};
62-
use crate::tracing::{GetTracingConfig, TracingEvent, TracingInfo};
63+
use crate::tracing::{TracingEvent, TracingInfo};
6364
use crate::transport::cluster::{Cluster, ClusterData, ClusterNeatDebug};
6465
use crate::transport::connection::{Connection, ConnectionConfig, VerifiedKeyspaceName};
6566
use crate::transport::connection_pool::PoolConfig;
@@ -133,6 +134,9 @@ pub struct Session {
133134
auto_await_schema_agreement_timeout: Option<Duration>,
134135
refresh_metadata_on_auto_schema_agreement: bool,
135136
keyspace_name: ArcSwapOption<String>,
137+
tracing_info_fetch_attempts: NonZeroU32,
138+
tracing_info_fetch_interval: Duration,
139+
tracing_info_fetch_consistency: Consistency,
136140
}
137141

138142
/// This implementation deliberately omits some details from Cluster in order
@@ -238,6 +242,22 @@ pub struct SessionConfig {
238242
/// Please do performance measurements before committing to disabling
239243
/// this option.
240244
pub enable_write_coalescing: bool,
245+
246+
/// Number of attempts to fetch [`TracingInfo`]
247+
/// in [`Session::get_tracing_info`]. Tracing info
248+
/// might not be available immediately on queried node - that's why
249+
/// the driver performs a few attempts with sleeps in between.
250+
pub tracing_info_fetch_attempts: NonZeroU32,
251+
252+
/// Delay between attempts to fetch [`TracingInfo`]
253+
/// in [`Session::get_tracing_info`]. Tracing info
254+
/// might not be available immediately on queried node - that's why
255+
/// the driver performs a few attempts with sleeps in between.
256+
pub tracing_info_fetch_interval: Duration,
257+
258+
/// Consistency level of fetching [`TracingInfo`]
259+
/// in [`Session::get_tracing_info`].
260+
pub tracing_info_fetch_consistency: Consistency,
241261
}
242262

243263
/// Describes database server known on Session startup.
@@ -297,6 +317,9 @@ impl SessionConfig {
297317
#[cfg(feature = "cloud")]
298318
cloud_config: None,
299319
enable_write_coalescing: true,
320+
tracing_info_fetch_attempts: NonZeroU32::new(5).unwrap(),
321+
tracing_info_fetch_interval: Duration::from_millis(3),
322+
tracing_info_fetch_consistency: Consistency::One,
300323
}
301324
}
302325

@@ -547,6 +570,9 @@ impl Session {
547570
refresh_metadata_on_auto_schema_agreement: config
548571
.refresh_metadata_on_auto_schema_agreement,
549572
keyspace_name: ArcSwapOption::default(), // will be set by use_keyspace
573+
tracing_info_fetch_attempts: config.tracing_info_fetch_attempts,
574+
tracing_info_fetch_interval: config.tracing_info_fetch_interval,
575+
tracing_info_fetch_consistency: config.tracing_info_fetch_consistency,
550576
};
551577

552578
if let Some(keyspace_name) = config.used_keyspace {
@@ -1344,8 +1370,24 @@ impl Session {
13441370
/// See [the book](https://rust-driver.docs.scylladb.com/stable/tracing/tracing.html)
13451371
/// for more information about query tracing
13461372
pub async fn get_tracing_info(&self, tracing_id: &Uuid) -> Result<TracingInfo, QueryError> {
1347-
self.get_tracing_info_custom(tracing_id, &GetTracingConfig::default())
1348-
.await
1373+
// tracing_info_fetch_attempts is NonZeroU32 so at least one attempt will be made
1374+
for _ in 0..self.tracing_info_fetch_attempts.get() {
1375+
let current_try: Option<TracingInfo> = self
1376+
.try_getting_tracing_info(tracing_id, Some(self.tracing_info_fetch_consistency))
1377+
.await?;
1378+
1379+
match current_try {
1380+
Some(tracing_info) => return Ok(tracing_info),
1381+
None => tokio::time::sleep(self.tracing_info_fetch_interval).await,
1382+
};
1383+
}
1384+
1385+
Err(QueryError::ProtocolError(
1386+
"All tracing queries returned an empty result, \
1387+
maybe the trace information didn't propagate yet. \
1388+
Consider configuring Session with \
1389+
a longer fetch interval (tracing_info_fetch_interval)",
1390+
))
13491391
}
13501392

13511393
/// Gets the name of the keyspace that is currently set, or `None` if no
@@ -1363,35 +1405,6 @@ impl Session {
13631405
self.keyspace_name.load_full()
13641406
}
13651407

1366-
/// Queries tracing info with custom retry settings.\
1367-
/// Tracing info might not be available immediately on queried node -
1368-
/// that's why the driver performs a few attempts with sleeps in between.
1369-
/// [`GetTracingConfig`] allows to specify a custom querying strategy.
1370-
pub async fn get_tracing_info_custom(
1371-
&self,
1372-
tracing_id: &Uuid,
1373-
config: &GetTracingConfig,
1374-
) -> Result<TracingInfo, QueryError> {
1375-
// config.attempts is NonZeroU32 so at least one attempt will be made
1376-
for _ in 0..config.attempts.get() {
1377-
let current_try: Option<TracingInfo> = self
1378-
.try_getting_tracing_info(tracing_id, Some(config.consistency))
1379-
.await?;
1380-
1381-
match current_try {
1382-
Some(tracing_info) => return Ok(tracing_info),
1383-
None => tokio::time::sleep(config.interval).await,
1384-
};
1385-
}
1386-
1387-
Err(QueryError::ProtocolError(
1388-
"All tracing queries returned an empty result, \
1389-
maybe information didn't reach this node yet. \
1390-
Consider using get_tracing_info_custom with \
1391-
bigger interval in GetTracingConfig",
1392-
))
1393-
}
1394-
13951408
// Tries getting the tracing info
13961409
// If the queries return 0 rows then returns None - the information didn't reach this node yet
13971410
// If there is some other error returns this error
@@ -1458,20 +1471,6 @@ impl Session {
14581471
Ok(Some(tracing_info))
14591472
}
14601473

1461-
// Returns which replicas are likely to take part in handling the query.
1462-
// If a list of replicas cannot be easily narrowed down, all available replicas
1463-
// will be returned.
1464-
pub fn estimate_replicas_for_query(&self, statement: &RoutingInfo) -> Vec<Arc<Node>> {
1465-
let cluster_data = self.cluster.get_data();
1466-
match statement.token {
1467-
Some(token) => {
1468-
let cluster_data = self.cluster.get_data();
1469-
cluster_data.get_token_endpoints(statement.keyspace.unwrap_or(""), token)
1470-
}
1471-
None => cluster_data.get_nodes_info().to_owned(),
1472-
}
1473-
}
1474-
14751474
// This method allows to easily run a query using load balancing, retry policy etc.
14761475
// Requires some information about the query and two closures
14771476
// First closure is used to choose a connection

scylla/src/transport/session_builder.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ use crate::cloud::{CloudConfig, CloudConfigError};
1010
#[cfg(feature = "cloud")]
1111
use crate::ExecutionProfile;
1212

13+
use crate::statement::Consistency;
1314
use crate::transport::connection_pool::PoolSize;
1415
use crate::transport::host_filter::HostFilter;
1516
use std::borrow::Borrow;
1617
use std::marker::PhantomData;
1718
use std::net::SocketAddr;
19+
use std::num::NonZeroU32;
1820
#[cfg(feature = "cloud")]
1921
use std::path::Path;
2022
use std::sync::Arc;
@@ -828,6 +830,77 @@ impl<K: SessionBuilderKind> GenericSessionBuilder<K> {
828830
self.config.refresh_metadata_on_auto_schema_agreement = refresh_metadata;
829831
self
830832
}
833+
834+
/// Set the number of attempts to fetch [TracingInfo](crate::tracing::TracingInfo)
835+
/// in [`Session::get_tracing_info`].
836+
/// The default is 5 attempts.
837+
///
838+
/// Tracing info might not be available immediately on queried node - that's why
839+
/// the driver performs a few attempts with sleeps in between.
840+
///
841+
/// # Example
842+
/// ```
843+
/// # use scylla::{Session, SessionBuilder};
844+
/// # use std::num::NonZeroU32;
845+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
846+
/// let session: Session = SessionBuilder::new()
847+
/// .known_node("127.0.0.1:9042")
848+
/// .tracing_info_fetch_attempts(NonZeroU32::new(10).unwrap())
849+
/// .build()
850+
/// .await?;
851+
/// # Ok(())
852+
/// # }
853+
/// ```
854+
pub fn tracing_info_fetch_attempts(mut self, attempts: NonZeroU32) -> Self {
855+
self.config.tracing_info_fetch_attempts = attempts;
856+
self
857+
}
858+
859+
/// Set the delay between attempts to fetch [TracingInfo](crate::tracing::TracingInfo)
860+
/// in [`Session::get_tracing_info`].
861+
/// The default is 3 milliseconds.
862+
///
863+
/// Tracing info might not be available immediately on queried node - that's why
864+
/// the driver performs a few attempts with sleeps in between.
865+
///
866+
/// # Example
867+
/// ```
868+
/// # use scylla::{Session, SessionBuilder};
869+
/// # use std::time::Duration;
870+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
871+
/// let session: Session = SessionBuilder::new()
872+
/// .known_node("127.0.0.1:9042")
873+
/// .tracing_info_fetch_interval(Duration::from_millis(50))
874+
/// .build()
875+
/// .await?;
876+
/// # Ok(())
877+
/// # }
878+
/// ```
879+
pub fn tracing_info_fetch_interval(mut self, interval: Duration) -> Self {
880+
self.config.tracing_info_fetch_interval = interval;
881+
self
882+
}
883+
884+
/// Set the consistency level of fetching [TracingInfo](crate::tracing::TracingInfo)
885+
/// in [`Session::get_tracing_info`].
886+
/// The default is [`Consistency::One`].
887+
///
888+
/// # Example
889+
/// ```
890+
/// # use scylla::{Session, SessionBuilder, statement::Consistency};
891+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
892+
/// let session: Session = SessionBuilder::new()
893+
/// .known_node("127.0.0.1:9042")
894+
/// .tracing_info_fetch_consistency(Consistency::One)
895+
/// .build()
896+
/// .await?;
897+
/// # Ok(())
898+
/// # }
899+
/// ```
900+
pub fn tracing_info_fetch_consistency(mut self, consistency: Consistency) -> Self {
901+
self.config.tracing_info_fetch_consistency = consistency;
902+
self
903+
}
831904
}
832905

833906
/// Creates a [`SessionBuilder`] with default configuration, same as [`SessionBuilder::new`]

0 commit comments

Comments
 (0)