diff --git a/scylla-cql/src/serialize/row.rs b/scylla-cql/src/serialize/row.rs index d586370913..d4f266113c 100644 --- a/scylla-cql/src/serialize/row.rs +++ b/scylla-cql/src/serialize/row.rs @@ -52,13 +52,6 @@ impl<'a> RowSerializationContext<'a> { pub fn columns(&self) -> &'a [ColumnSpec] { self.columns } - - /// Looks up and returns a column/bind marker by name. - // TODO: change RowSerializationContext to make this faster - #[inline] - pub fn column_by_name(&self, target: &str) -> Option<&ColumnSpec> { - self.columns.iter().find(|&c| c.name() == target) - } } /// Represents a set of values that can be sent along a CQL statement. @@ -514,7 +507,7 @@ impl SerializedValues { pub const EMPTY: &'static SerializedValues = &SerializedValues::new(); /// Constructs `SerializedValues` from given [`SerializeRow`] object. - pub fn from_serializable( + pub fn from_serializable( ctx: &RowSerializationContext, row: &T, ) -> Result { diff --git a/scylla/Cargo.toml b/scylla/Cargo.toml index 18a35efec0..32f70be0ba 100644 --- a/scylla/Cargo.toml +++ b/scylla/Cargo.toml @@ -39,6 +39,7 @@ full-serialization = [ "num-bigint-04", "bigdecimal-04", ] +unstable-testing = [] [dependencies] scylla-macros = { version = "0.7.0", path = "../scylla-macros" } @@ -96,6 +97,7 @@ time = "0.3" [[bench]] name = "benchmark" harness = false +required-features = ["unstable-testing"] [lints.rust] unnameable_types = "warn" diff --git a/scylla/benches/benchmark.rs b/scylla/benches/benchmark.rs index 0bb83df7fe..322f832d14 100644 --- a/scylla/benches/benchmark.rs +++ b/scylla/benches/benchmark.rs @@ -1,7 +1,8 @@ use criterion::{criterion_group, criterion_main, Criterion}; use bytes::BytesMut; -use scylla::routing::partitioner::{calculate_token_for_partition_key, PartitionerName}; +use scylla::internal_testing::calculate_token_for_partition_key; +use scylla::routing::partitioner::PartitionerName; use scylla_cql::frame::response::result::{ColumnType, NativeType}; use scylla_cql::frame::types; use scylla_cql::serialize::row::SerializedValues; diff --git a/scylla/src/client/session_test.rs b/scylla/src/client/session_test.rs index a8227727b6..483016e7df 100644 --- a/scylla/src/client/session_test.rs +++ b/scylla/src/client/session_test.rs @@ -218,9 +218,6 @@ async fn test_prepared_statement() { .unwrap(); let values = (17_i32, 16_i32, "I'm prepared!!!"); - let serialized_values_complex_pk = prepared_complex_pk_statement - .serialize_values(&values) - .unwrap(); session .execute_unpaged(&prepared_statement, &values) @@ -245,12 +242,9 @@ async fn test_prepared_statement() { let prepared_token = Murmur3Partitioner .hash_one(&prepared_statement.compute_partition_key(&values).unwrap()); assert_eq!(token, prepared_token); - let mut pk = SerializedValues::new(); - pk.add_value(&17_i32, &ColumnType::Native(NativeType::Int)) - .unwrap(); let cluster_state_token = session .get_cluster_state() - .compute_token(&ks, "t2", &pk) + .compute_token(&ks, "t2", &(values.0,)) .unwrap(); assert_eq!(token, cluster_state_token); } @@ -272,7 +266,7 @@ async fn test_prepared_statement() { assert_eq!(token, prepared_token); let cluster_state_token = session .get_cluster_state() - .compute_token(&ks, "complex_pk", &serialized_values_complex_pk) + .compute_token(&ks, "complex_pk", &values) .unwrap(); assert_eq!(token, cluster_state_token); } @@ -608,7 +602,6 @@ async fn test_token_calculation() { s.push('a'); } let values = (&s,); - let serialized_values = prepared_statement.serialize_values(&values).unwrap(); session .execute_unpaged(&prepared_statement, &values) .await @@ -631,7 +624,7 @@ async fn test_token_calculation() { assert_eq!(token, prepared_token); let cluster_state_token = session .get_cluster_state() - .compute_token(&ks, "t3", &serialized_values) + .compute_token(&ks, "t3", &values) .unwrap(); assert_eq!(token, cluster_state_token); } diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index b62d605d07..242102e25e 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -35,6 +35,7 @@ use futures::Stream; use itertools::Itertools; use rand::seq::{IndexedRandom, SliceRandom}; use rand::{rng, Rng}; +use scylla_cql::frame::response::result::{ColumnSpec, TableSpec}; use scylla_macros::DeserializeRow; use std::borrow::BorrowMut; use std::cell::Cell; @@ -67,6 +68,22 @@ type PerTable = HashMap; type PerKsTable = HashMap<(String, String), T>; type PerKsTableResult = PerKsTable>; +/// Indicates that reading metadata failed, but in a way +/// that we can handle, by throwing out data for a keyspace. +/// It is possible that some of the errors could be handled in even +/// more granular way (e.g. throwing out a single table), but keyspace +/// granularity seems like a good choice given how independent keyspaces +/// are from each other. +#[derive(Clone, Debug, Error)] +pub(crate) enum SingleKeyspaceMetadataError { + #[error(transparent)] + MissingUDT(MissingUserDefinedType), + #[error("Partition key column with position {0} is missing from metadata")] + IncompletePartitionKey(i32), + #[error("Clustering key column with position {0} is missing from metadata")] + IncompleteClusteringKey(i32), +} + /// Allows to read current metadata from the cluster pub(crate) struct MetadataReader { control_connection_pool_config: PoolConfig, @@ -92,7 +109,7 @@ pub(crate) struct MetadataReader { /// Describes all metadata retrieved from the cluster pub(crate) struct Metadata { pub(crate) peers: Vec, - pub(crate) keyspaces: HashMap>, + pub(crate) keyspaces: HashMap>, } #[non_exhaustive] // <- so that we can add more fields in a backwards-compatible way @@ -181,9 +198,14 @@ pub struct Keyspace { #[derive(Clone, Debug, PartialEq, Eq)] pub struct Table { pub columns: HashMap, + /// Names of the column of partition key. + /// All of the names are guaranteed to be present in `columns` field. pub partition_key: Vec, + /// Names of the column of clustering key. + /// All of the names are guaranteed to be present in `columns` field. pub clustering_key: Vec, pub partitioner: Option, + pub(crate) pk_column_specs: Vec>, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -902,7 +924,7 @@ async fn query_keyspaces( conn: &Arc, keyspaces_to_fetch: &[String], fetch_schema: bool, -) -> Result, MetadataError> { +) -> Result, MetadataError> { let rows = query_filter_keyspace_name::<(String, HashMap)>( conn, "select keyspace_name, replication from system_schema.keyspaces", @@ -953,15 +975,20 @@ async fn query_keyspaces( // As you can notice, in this file we generally operate on two layers of errors: // - Outer (MetadataError) if something went wrong with querying the cluster. - // - Inner (currently MissingUserDefinedType, possibly other variants in the future) if the fetched metadata - // turned out to not be fully consistent. + // - Inner (SingleKeyspaceMetadataError) if the fetched metadata turned out to not be fully consistent. // If there is an inner error, we want to drop metadata for the whole keyspace. - // This logic checks if either tables views or UDTs have such inner error, and returns it if so. + // This logic checks if either tables, views, or UDTs have such inner error, and returns it if so. // Notice that in the error branch, return value is wrapped in `Ok` - but this is the // outer error, so it just means there was no error while querying the cluster. let (tables, views, user_defined_types) = match (tables, views, user_defined_types) { (Ok(t), Ok(v), Ok(u)) => (t, v, u), - (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => return Ok((keyspace_name, Err(e))), + (Err(e), _, _) | (_, Err(e), _) => return Ok((keyspace_name, Err(e))), + (_, _, Err(e)) => { + return Ok(( + keyspace_name, + Err(SingleKeyspaceMetadataError::MissingUDT(e)), + )) + } }; let keyspace = Keyspace { @@ -1364,8 +1391,8 @@ mod toposort_tests { async fn query_tables( conn: &Arc, keyspaces_to_fetch: &[String], - tables: &mut PerKsTableResult, -) -> Result, MissingUserDefinedType>, MetadataError> { + tables: &mut PerKsTableResult, +) -> Result, SingleKeyspaceMetadataError>, MetadataError> { let rows = query_filter_keyspace_name::<(String, String)>( conn, "SELECT keyspace_name, table_name FROM system_schema.tables", @@ -1385,6 +1412,7 @@ async fn query_tables( partition_key: vec![], clustering_key: vec![], partitioner: None, + pk_column_specs: vec![], })); let mut entry = result @@ -1409,8 +1437,9 @@ async fn query_tables( async fn query_views( conn: &Arc, keyspaces_to_fetch: &[String], - tables: &mut PerKsTableResult, -) -> Result, MissingUserDefinedType>, MetadataError> { + tables: &mut PerKsTableResult, +) -> Result, SingleKeyspaceMetadataError>, MetadataError> +{ let rows = query_filter_keyspace_name::<(String, String, String)>( conn, "SELECT keyspace_name, view_name, base_table_name FROM system_schema.views", @@ -1435,6 +1464,7 @@ async fn query_views( partition_key: vec![], clustering_key: vec![], partitioner: None, + pk_column_specs: vec![], })) .map(|table| MaterializedView { view_metadata: table, @@ -1465,7 +1495,7 @@ async fn query_tables_schema( conn: &Arc, keyspaces_to_fetch: &[String], udts: &PerKeyspaceResult>>, MissingUserDefinedType>, -) -> Result, MetadataError> { +) -> Result, MetadataError> { // Upon migration from thrift to CQL, Cassandra internally creates a surrogate column "value" of // type EmptyType for dense tables. This resolves into this CQL type name. // This column shouldn't be exposed to the user but is currently exposed in system tables. @@ -1484,7 +1514,7 @@ async fn query_tables_schema( let empty_ok_map = Ok(HashMap::new()); - let mut tables_schema: HashMap<_, Result<_, MissingUserDefinedType>> = HashMap::new(); + let mut tables_schema: HashMap<_, Result<_, SingleKeyspaceMetadataError>> = HashMap::new(); rows.map(|row_result| { let (keyspace_name, table_name, column_name, kind, position, type_) = row_result?; @@ -1518,7 +1548,10 @@ async fn query_tables_schema( // is minor enough to ignore. Note that the first issue also applies to // solution 1: but the keyspace won't be present in the result at all, // which is arguably worse. - tables_schema.insert((keyspace_name, table_name), Err(e.clone())); + tables_schema.insert( + (keyspace_name, table_name), + Err(SingleKeyspaceMetadataError::MissingUDT(e.clone())), + ); return Ok::<_, MetadataError>(()); } }; @@ -1532,7 +1565,10 @@ async fn query_tables_schema( let cql_type = match pre_cql_type.into_cql_type(&keyspace_name, keyspace_udts) { Ok(t) => t, Err(e) => { - tables_schema.insert((keyspace_name, table_name), Err(e)); + tables_schema.insert( + (keyspace_name, table_name), + Err(SingleKeyspaceMetadataError::MissingUDT(e)), + ); return Ok::<_, MetadataError>(()); } }; @@ -1549,8 +1585,8 @@ async fn query_tables_schema( .entry((keyspace_name, table_name)) .or_insert(Ok(( HashMap::new(), // columns - HashMap::new(), // partition key - HashMap::new(), // clustering key + Vec::new(), // partition key + Vec::new(), // clustering key ))) else { // This table was previously marked as broken, no way to insert anything. @@ -1558,12 +1594,12 @@ async fn query_tables_schema( }; if kind == ColumnKind::PartitionKey || kind == ColumnKind::Clustering { - let key_map = if kind == ColumnKind::PartitionKey { + let key_list: &mut Vec<(i32, String)> = if kind == ColumnKind::PartitionKey { entry.1.borrow_mut() } else { entry.2.borrow_mut() }; - key_map.insert(position, column_name.clone()); + key_list.push((position, column_name.clone())); } entry.0.insert( @@ -1582,30 +1618,85 @@ async fn query_tables_schema( let mut all_partitioners = query_table_partitioners(conn).await?; let mut result = HashMap::new(); - for ((keyspace_name, table_name), table_result) in tables_schema { + 'tables_loop: for ((keyspace_name, table_name), table_result) in tables_schema { let keyspace_and_table_name = (keyspace_name, table_name); - let (columns, partition_key_columns, clustering_key_columns) = match table_result { + #[allow(clippy::type_complexity)] + let (columns, partition_key_columns, clustering_key_columns): ( + HashMap, + Vec<(i32, String)>, + Vec<(i32, String)>, + ) = match table_result { Ok(table) => table, Err(e) => { let _ = result.insert(keyspace_and_table_name, Err(e)); continue; } }; - let mut partition_key = vec!["".to_string(); partition_key_columns.len()]; - for (position, column_name) in partition_key_columns { - partition_key[position as usize] = column_name; - } - let mut clustering_key = vec!["".to_string(); clustering_key_columns.len()]; - for (position, column_name) in clustering_key_columns { - clustering_key[position as usize] = column_name; + fn validate_key_columns(mut key_columns: Vec<(i32, String)>) -> Result, i32> { + key_columns.sort_unstable_by_key(|(position, _)| *position); + + key_columns + .into_iter() + .enumerate() + .map(|(idx, (position, column_name))| { + // unwrap: I don't see the point of handling the scenario of fetching over + // 2 * 10^9 columns. + let idx: i32 = idx.try_into().unwrap(); + if idx == position { + Ok(column_name) + } else { + Err(idx) + } + }) + .collect::, _>>() } + let partition_key = match validate_key_columns(partition_key_columns) { + Ok(partition_key_columns) => partition_key_columns, + Err(position) => { + result.insert( + keyspace_and_table_name, + Err(SingleKeyspaceMetadataError::IncompletePartitionKey( + position, + )), + ); + continue 'tables_loop; + } + }; + + let clustering_key = match validate_key_columns(clustering_key_columns) { + Ok(clustering_key_columns) => clustering_key_columns, + Err(position) => { + result.insert( + keyspace_and_table_name, + Err(SingleKeyspaceMetadataError::IncompleteClusteringKey( + position, + )), + ); + continue 'tables_loop; + } + }; + let partitioner = all_partitioners .remove(&keyspace_and_table_name) .unwrap_or_default(); + // unwrap of get() result: all column names in `partition_key` are at this + // point guaranteed to be present in `columns`. See the construction of `partition_key` + let pk_column_specs = partition_key + .iter() + .map(|column_name| (column_name, columns.get(column_name).unwrap().clone().typ)) + .map(|(name, typ)| { + let table_spec = TableSpec::owned( + keyspace_and_table_name.0.clone(), + keyspace_and_table_name.1.clone(), + ); + ColumnSpec::owned(name.to_owned(), typ, table_spec) + }) + .collect(); + result.insert( keyspace_and_table_name, Ok(Table { @@ -1613,6 +1704,7 @@ async fn query_tables_schema( partition_key, clustering_key, partitioner, + pk_column_specs, }), ); } diff --git a/scylla/src/cluster/state.rs b/scylla/src/cluster/state.rs index 97d55301ab..a754a8a9d9 100644 --- a/scylla/src/cluster/state.rs +++ b/scylla/src/cluster/state.rs @@ -1,15 +1,14 @@ -use crate::errors::ConnectionPoolError; +use crate::errors::{ClusterStateTokenError, ConnectionPoolError}; use crate::network::{Connection, PoolConfig, VerifiedKeyspaceName}; use crate::policies::host_filter::HostFilter; use crate::routing::locator::tablets::{RawTablet, Tablet, TabletsInfo}; use crate::routing::locator::ReplicaLocator; use crate::routing::partitioner::{calculate_token_for_partition_key, PartitionerName}; use crate::routing::{Shard, Token}; -use crate::statement::prepared::TokenCalculationError; use itertools::Itertools; use scylla_cql::frame::response::result::TableSpec; -use scylla_cql::serialize::row::SerializedValues; +use scylla_cql::serialize::row::{RowSerializationContext, SerializeRow, SerializedValues}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use tracing::{debug, warn}; @@ -199,21 +198,39 @@ impl ClusterState { } /// Compute token of a table partition key + /// + /// `partition_key` argument contains the values of all partition key + /// columns. You can use both unnamed values like a tuple (e.g. `(1, 5, 5)`) + /// or named values (e.g. struct that derives `SerializeRow`), as you would + /// when executing a request. No additional values are allowed besides values + /// for primary key columns. pub fn compute_token( &self, keyspace: &str, table: &str, - partition_key: &SerializedValues, - ) -> Result { - let partitioner = self + partition_key: &dyn SerializeRow, + ) -> Result { + let Some(table) = self .keyspaces .get(keyspace) .and_then(|k| k.tables.get(table)) - .and_then(|t| t.partitioner.as_deref()) + else { + return Err(ClusterStateTokenError::UnknownTable { + keyspace: keyspace.to_owned(), + table: table.to_owned(), + }); + }; + let values = SerializedValues::from_serializable( + &RowSerializationContext::from_specs(table.pk_column_specs.as_slice()), + partition_key, + )?; + let partitioner = table + .partitioner + .as_deref() .and_then(PartitionerName::from_str) .unwrap_or_default(); - - calculate_token_for_partition_key(partition_key, &partitioner) + calculate_token_for_partition_key(&values, &partitioner) + .map_err(ClusterStateTokenError::TokenCalculation) } /// Access to replicas owning a given token @@ -246,12 +263,18 @@ impl ClusterState { } /// Access to replicas owning a given partition key (similar to `nodetool getendpoints`) + /// + /// `partition_key` argument contains the values of all partition key + /// columns. You can use both unnamed values like a tuple (e.g. `(1, 5, 5)`) + /// or named values (e.g. struct that derives `SerializeRow`), as you would + /// when executing a request. No additional values are allowed besides values + /// for primary key columns. pub fn get_endpoints( &self, keyspace: &str, table: &str, - partition_key: &SerializedValues, - ) -> Result, Shard)>, TokenCalculationError> { + partition_key: &dyn SerializeRow, + ) -> Result, Shard)>, ClusterStateTokenError> { let token = self.compute_token(keyspace, table, partition_key)?; Ok(self.get_token_endpoints(keyspace, table, token)) } diff --git a/scylla/src/errors.rs b/scylla/src/errors.rs index 366016480d..bb36fc55a5 100644 --- a/scylla/src/errors.rs +++ b/scylla/src/errors.rs @@ -12,6 +12,7 @@ use crate::frame::response; // Re-export error types from pager module. pub use crate::client::pager::{NextPageError, NextRowError}; +use crate::statement::prepared::TokenCalculationError; // Re-export error types from query_result module. pub use crate::response::query_result::{ FirstRowError, IntoRowsResultError, MaybeFirstRowError, ResultNotRowsError, RowsError, @@ -934,6 +935,23 @@ pub(crate) enum ResponseParseError { CqlResponseParseError(#[from] CqlResponseParseError), } +/// Error returned from [ClusterState](crate::cluster::ClusterState) APIs. +#[derive(Clone, Debug, Error)] +#[non_exhaustive] +pub enum ClusterStateTokenError { + /// Failed to calculate token. + #[error(transparent)] + TokenCalculation(#[from] TokenCalculationError), + + /// Failed to serialize values required to compute partition key. + #[error(transparent)] + Serialization(#[from] SerializationError), + + /// ClusterState doesn't currently have metadata for the requested table. + #[error("Can't find metadata for requested table ({keyspace}.{table}).")] + UnknownTable { keyspace: String, table: String }, +} + #[cfg(test)] mod tests { use scylla_cql::Consistency; diff --git a/scylla/src/lib.rs b/scylla/src/lib.rs index bc66116504..5f4e9207ab 100644 --- a/scylla/src/lib.rs +++ b/scylla/src/lib.rs @@ -163,10 +163,6 @@ pub mod serialize { BuiltinSerializationError, BuiltinSerializationErrorKind, BuiltinTypeCheckError, BuiltinTypeCheckErrorKind, }; - - // Not part of the old framework, but something that we should - // still aim to remove from public API. - pub use scylla_cql::serialize::row::{SerializedValues, SerializedValuesIterator}; } /// Contains the [SerializeValue][value::SerializeValue] trait and its implementations. @@ -257,3 +253,22 @@ pub(crate) mod utils; #[cfg(test)] pub(crate) use utils::test_utils; + +#[cfg(feature = "unstable-testing")] +pub mod internal_testing { + use scylla_cql::serialize::row::SerializedValues; + + use crate::routing::partitioner::PartitionerName; + use crate::routing::Token; + use crate::statement::prepared::TokenCalculationError; + + pub fn calculate_token_for_partition_key( + serialized_partition_key_values: &SerializedValues, + partitioner: &PartitionerName, + ) -> Result { + crate::routing::partitioner::calculate_token_for_partition_key( + serialized_partition_key_values, + partitioner, + ) + } +} diff --git a/scylla/src/routing/partitioner.rs b/scylla/src/routing/partitioner.rs index 0778c83065..a4deb9096f 100644 --- a/scylla/src/routing/partitioner.rs +++ b/scylla/src/routing/partitioner.rs @@ -349,7 +349,7 @@ impl PartitionerHasher for CDCPartitionerHasher { /// /// NOTE: the provided values must completely constitute partition key /// and be in the order defined in CREATE TABLE statement. -pub fn calculate_token_for_partition_key( +pub(crate) fn calculate_token_for_partition_key( serialized_partition_key_values: &SerializedValues, partitioner: &PartitionerName, ) -> Result {