Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions scylla-cql/src/serialize/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,6 @@ impl<'a> RowSerializationContext<'a> {
pub fn columns(&self) -> &'a [ColumnSpec] {
self.columns
}

/// Looks up and returns a column/bind marker by name.
// TODO: change RowSerializationContext to make this faster
#[inline]
pub fn column_by_name(&self, target: &str) -> Option<&ColumnSpec> {
self.columns.iter().find(|&c| c.name() == target)
}
}

/// Represents a set of values that can be sent along a CQL statement.
Expand Down Expand Up @@ -514,7 +507,7 @@ impl SerializedValues {
pub const EMPTY: &'static SerializedValues = &SerializedValues::new();

/// Constructs `SerializedValues` from given [`SerializeRow`] object.
pub fn from_serializable<T: SerializeRow>(
pub fn from_serializable<T: SerializeRow + ?Sized>(
ctx: &RowSerializationContext,
row: &T,
) -> Result<Self, SerializationError> {
Expand Down
2 changes: 2 additions & 0 deletions scylla/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ full-serialization = [
"num-bigint-04",
"bigdecimal-04",
]
unstable-testing = []

[dependencies]
scylla-macros = { version = "0.7.0", path = "../scylla-macros" }
Expand Down Expand Up @@ -96,6 +97,7 @@ time = "0.3"
[[bench]]
name = "benchmark"
harness = false
required-features = ["unstable-testing"]

[lints.rust]
unnameable_types = "warn"
Expand Down
3 changes: 2 additions & 1 deletion scylla/benches/benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use criterion::{criterion_group, criterion_main, Criterion};

use bytes::BytesMut;
use scylla::routing::partitioner::{calculate_token_for_partition_key, PartitionerName};
use scylla::internal_testing::calculate_token_for_partition_key;
use scylla::routing::partitioner::PartitionerName;
use scylla_cql::frame::response::result::{ColumnType, NativeType};
use scylla_cql::frame::types;
use scylla_cql::serialize::row::SerializedValues;
Expand Down
13 changes: 3 additions & 10 deletions scylla/src/client/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,6 @@ async fn test_prepared_statement() {
.unwrap();

let values = (17_i32, 16_i32, "I'm prepared!!!");
let serialized_values_complex_pk = prepared_complex_pk_statement
.serialize_values(&values)
.unwrap();

session
.execute_unpaged(&prepared_statement, &values)
Expand All @@ -245,12 +242,9 @@ async fn test_prepared_statement() {
let prepared_token = Murmur3Partitioner
.hash_one(&prepared_statement.compute_partition_key(&values).unwrap());
assert_eq!(token, prepared_token);
let mut pk = SerializedValues::new();
pk.add_value(&17_i32, &ColumnType::Native(NativeType::Int))
.unwrap();
let cluster_state_token = session
.get_cluster_state()
.compute_token(&ks, "t2", &pk)
.compute_token(&ks, "t2", &(values.0,))
.unwrap();
assert_eq!(token, cluster_state_token);
}
Expand All @@ -272,7 +266,7 @@ async fn test_prepared_statement() {
assert_eq!(token, prepared_token);
let cluster_state_token = session
.get_cluster_state()
.compute_token(&ks, "complex_pk", &serialized_values_complex_pk)
.compute_token(&ks, "complex_pk", &values)
.unwrap();
assert_eq!(token, cluster_state_token);
}
Expand Down Expand Up @@ -608,7 +602,6 @@ async fn test_token_calculation() {
s.push('a');
}
let values = (&s,);
let serialized_values = prepared_statement.serialize_values(&values).unwrap();
session
.execute_unpaged(&prepared_statement, &values)
.await
Expand All @@ -631,7 +624,7 @@ async fn test_token_calculation() {
assert_eq!(token, prepared_token);
let cluster_state_token = session
.get_cluster_state()
.compute_token(&ks, "t3", &serialized_values)
.compute_token(&ks, "t3", &values)
.unwrap();
assert_eq!(token, cluster_state_token);
}
Expand Down
146 changes: 119 additions & 27 deletions scylla/src/cluster/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use futures::Stream;
use itertools::Itertools;
use rand::seq::{IndexedRandom, SliceRandom};
use rand::{rng, Rng};
use scylla_cql::frame::response::result::{ColumnSpec, TableSpec};
use scylla_macros::DeserializeRow;
use std::borrow::BorrowMut;
use std::cell::Cell;
Expand Down Expand Up @@ -67,6 +68,22 @@ type PerTable<T> = HashMap<String, T>;
type PerKsTable<T> = HashMap<(String, String), T>;
type PerKsTableResult<T, E> = PerKsTable<Result<T, E>>;

/// Indicates that reading metadata failed, but in a way
/// that we can handle, by throwing out data for a keyspace.
/// It is possible that some of the errors could be handled in even
/// more granular way (e.g. throwing out a single table), but keyspace
/// granularity seems like a good choice given how independent keyspaces
/// are from each other.
#[derive(Clone, Debug, Error)]
pub(crate) enum SingleKeyspaceMetadataError {
#[error(transparent)]
MissingUDT(MissingUserDefinedType),
#[error("Partition key column with position {0} is missing from metadata")]
IncompletePartitionKey(i32),
#[error("Clustering key column with position {0} is missing from metadata")]
IncompleteClusteringKey(i32),
}

/// Allows to read current metadata from the cluster
pub(crate) struct MetadataReader {
control_connection_pool_config: PoolConfig,
Expand All @@ -92,7 +109,7 @@ pub(crate) struct MetadataReader {
/// Describes all metadata retrieved from the cluster
pub(crate) struct Metadata {
pub(crate) peers: Vec<Peer>,
pub(crate) keyspaces: HashMap<String, Result<Keyspace, MissingUserDefinedType>>,
pub(crate) keyspaces: HashMap<String, Result<Keyspace, SingleKeyspaceMetadataError>>,
}

#[non_exhaustive] // <- so that we can add more fields in a backwards-compatible way
Expand Down Expand Up @@ -181,9 +198,14 @@ pub struct Keyspace {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Table {
pub columns: HashMap<String, Column>,
/// Names of the column of partition key.
/// All of the names are guaranteed to be present in `columns` field.
pub partition_key: Vec<String>,
/// Names of the column of clustering key.
/// All of the names are guaranteed to be present in `columns` field.
pub clustering_key: Vec<String>,
pub partitioner: Option<String>,
pub(crate) pk_column_specs: Vec<ColumnSpec<'static>>,
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -902,7 +924,7 @@ async fn query_keyspaces(
conn: &Arc<Connection>,
keyspaces_to_fetch: &[String],
fetch_schema: bool,
) -> Result<PerKeyspaceResult<Keyspace, MissingUserDefinedType>, MetadataError> {
) -> Result<PerKeyspaceResult<Keyspace, SingleKeyspaceMetadataError>, MetadataError> {
let rows = query_filter_keyspace_name::<(String, HashMap<String, String>)>(
conn,
"select keyspace_name, replication from system_schema.keyspaces",
Expand Down Expand Up @@ -953,15 +975,20 @@ async fn query_keyspaces(

// As you can notice, in this file we generally operate on two layers of errors:
// - Outer (MetadataError) if something went wrong with querying the cluster.
// - Inner (currently MissingUserDefinedType, possibly other variants in the future) if the fetched metadata
// turned out to not be fully consistent.
// - Inner (SingleKeyspaceMetadataError) if the fetched metadata turned out to not be fully consistent.
// If there is an inner error, we want to drop metadata for the whole keyspace.
// This logic checks if either tables views or UDTs have such inner error, and returns it if so.
// This logic checks if either tables, views, or UDTs have such inner error, and returns it if so.
// Notice that in the error branch, return value is wrapped in `Ok` - but this is the
// outer error, so it just means there was no error while querying the cluster.
let (tables, views, user_defined_types) = match (tables, views, user_defined_types) {
(Ok(t), Ok(v), Ok(u)) => (t, v, u),
(Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => return Ok((keyspace_name, Err(e))),
(Err(e), _, _) | (_, Err(e), _) => return Ok((keyspace_name, Err(e))),
(_, _, Err(e)) => {
return Ok((
keyspace_name,
Err(SingleKeyspaceMetadataError::MissingUDT(e)),
))
}
};

let keyspace = Keyspace {
Expand Down Expand Up @@ -1364,8 +1391,8 @@ mod toposort_tests {
async fn query_tables(
conn: &Arc<Connection>,
keyspaces_to_fetch: &[String],
tables: &mut PerKsTableResult<Table, MissingUserDefinedType>,
) -> Result<PerKeyspaceResult<PerTable<Table>, MissingUserDefinedType>, MetadataError> {
tables: &mut PerKsTableResult<Table, SingleKeyspaceMetadataError>,
) -> Result<PerKeyspaceResult<PerTable<Table>, SingleKeyspaceMetadataError>, MetadataError> {
let rows = query_filter_keyspace_name::<(String, String)>(
conn,
"SELECT keyspace_name, table_name FROM system_schema.tables",
Expand All @@ -1385,6 +1412,7 @@ async fn query_tables(
partition_key: vec![],
clustering_key: vec![],
partitioner: None,
pk_column_specs: vec![],
}));

let mut entry = result
Expand All @@ -1409,8 +1437,9 @@ async fn query_tables(
async fn query_views(
conn: &Arc<Connection>,
keyspaces_to_fetch: &[String],
tables: &mut PerKsTableResult<Table, MissingUserDefinedType>,
) -> Result<PerKeyspaceResult<PerTable<MaterializedView>, MissingUserDefinedType>, MetadataError> {
tables: &mut PerKsTableResult<Table, SingleKeyspaceMetadataError>,
) -> Result<PerKeyspaceResult<PerTable<MaterializedView>, SingleKeyspaceMetadataError>, MetadataError>
{
let rows = query_filter_keyspace_name::<(String, String, String)>(
conn,
"SELECT keyspace_name, view_name, base_table_name FROM system_schema.views",
Expand All @@ -1435,6 +1464,7 @@ async fn query_views(
partition_key: vec![],
clustering_key: vec![],
partitioner: None,
pk_column_specs: vec![],
}))
.map(|table| MaterializedView {
view_metadata: table,
Expand Down Expand Up @@ -1465,7 +1495,7 @@ async fn query_tables_schema(
conn: &Arc<Connection>,
keyspaces_to_fetch: &[String],
udts: &PerKeyspaceResult<PerTable<Arc<UserDefinedType<'static>>>, MissingUserDefinedType>,
) -> Result<PerKsTableResult<Table, MissingUserDefinedType>, MetadataError> {
) -> Result<PerKsTableResult<Table, SingleKeyspaceMetadataError>, MetadataError> {
// Upon migration from thrift to CQL, Cassandra internally creates a surrogate column "value" of
// type EmptyType for dense tables. This resolves into this CQL type name.
// This column shouldn't be exposed to the user but is currently exposed in system tables.
Expand All @@ -1484,7 +1514,7 @@ async fn query_tables_schema(

let empty_ok_map = Ok(HashMap::new());

let mut tables_schema: HashMap<_, Result<_, MissingUserDefinedType>> = HashMap::new();
let mut tables_schema: HashMap<_, Result<_, SingleKeyspaceMetadataError>> = HashMap::new();

rows.map(|row_result| {
let (keyspace_name, table_name, column_name, kind, position, type_) = row_result?;
Expand Down Expand Up @@ -1518,7 +1548,10 @@ async fn query_tables_schema(
// is minor enough to ignore. Note that the first issue also applies to
// solution 1: but the keyspace won't be present in the result at all,
// which is arguably worse.
tables_schema.insert((keyspace_name, table_name), Err(e.clone()));
tables_schema.insert(
(keyspace_name, table_name),
Err(SingleKeyspaceMetadataError::MissingUDT(e.clone())),
);
return Ok::<_, MetadataError>(());
}
};
Expand All @@ -1532,7 +1565,10 @@ async fn query_tables_schema(
let cql_type = match pre_cql_type.into_cql_type(&keyspace_name, keyspace_udts) {
Ok(t) => t,
Err(e) => {
tables_schema.insert((keyspace_name, table_name), Err(e));
tables_schema.insert(
(keyspace_name, table_name),
Err(SingleKeyspaceMetadataError::MissingUDT(e)),
);
return Ok::<_, MetadataError>(());
}
};
Expand All @@ -1549,21 +1585,21 @@ async fn query_tables_schema(
.entry((keyspace_name, table_name))
.or_insert(Ok((
HashMap::new(), // columns
HashMap::new(), // partition key
HashMap::new(), // clustering key
Vec::new(), // partition key
Vec::new(), // clustering key
)))
else {
// This table was previously marked as broken, no way to insert anything.
return Ok::<_, MetadataError>(());
};

if kind == ColumnKind::PartitionKey || kind == ColumnKind::Clustering {
let key_map = if kind == ColumnKind::PartitionKey {
let key_list: &mut Vec<(i32, String)> = if kind == ColumnKind::PartitionKey {
entry.1.borrow_mut()
} else {
entry.2.borrow_mut()
};
key_map.insert(position, column_name.clone());
key_list.push((position, column_name.clone()));
}

entry.0.insert(
Expand All @@ -1582,37 +1618,93 @@ async fn query_tables_schema(
let mut all_partitioners = query_table_partitioners(conn).await?;
let mut result = HashMap::new();

for ((keyspace_name, table_name), table_result) in tables_schema {
'tables_loop: for ((keyspace_name, table_name), table_result) in tables_schema {
let keyspace_and_table_name = (keyspace_name, table_name);

let (columns, partition_key_columns, clustering_key_columns) = match table_result {
#[allow(clippy::type_complexity)]
let (columns, partition_key_columns, clustering_key_columns): (
HashMap<String, Column>,
Vec<(i32, String)>,
Vec<(i32, String)>,
) = match table_result {
Ok(table) => table,
Err(e) => {
let _ = result.insert(keyspace_and_table_name, Err(e));
continue;
}
};
let mut partition_key = vec!["".to_string(); partition_key_columns.len()];
for (position, column_name) in partition_key_columns {
partition_key[position as usize] = column_name;
}

let mut clustering_key = vec!["".to_string(); clustering_key_columns.len()];
for (position, column_name) in clustering_key_columns {
clustering_key[position as usize] = column_name;
fn validate_key_columns(mut key_columns: Vec<(i32, String)>) -> Result<Vec<String>, i32> {
key_columns.sort_unstable_by_key(|(position, _)| *position);

key_columns
.into_iter()
.enumerate()
.map(|(idx, (position, column_name))| {
// unwrap: I don't see the point of handling the scenario of fetching over
// 2 * 10^9 columns.
let idx: i32 = idx.try_into().unwrap();
if idx == position {
Ok(column_name)
} else {
Err(idx)
}
})
.collect::<Result<Vec<_>, _>>()
}

let partition_key = match validate_key_columns(partition_key_columns) {
Ok(partition_key_columns) => partition_key_columns,
Err(position) => {
result.insert(
keyspace_and_table_name,
Err(SingleKeyspaceMetadataError::IncompletePartitionKey(
position,
)),
);
continue 'tables_loop;
}
};

let clustering_key = match validate_key_columns(clustering_key_columns) {
Ok(clustering_key_columns) => clustering_key_columns,
Err(position) => {
result.insert(
keyspace_and_table_name,
Err(SingleKeyspaceMetadataError::IncompleteClusteringKey(
position,
)),
);
continue 'tables_loop;
}
};

let partitioner = all_partitioners
.remove(&keyspace_and_table_name)
.unwrap_or_default();

// unwrap of get() result: all column names in `partition_key` are at this
// point guaranteed to be present in `columns`. See the construction of `partition_key`
let pk_column_specs = partition_key
.iter()
.map(|column_name| (column_name, columns.get(column_name).unwrap().clone().typ))
.map(|(name, typ)| {
let table_spec = TableSpec::owned(
keyspace_and_table_name.0.clone(),
keyspace_and_table_name.1.clone(),
);
ColumnSpec::owned(name.to_owned(), typ, table_spec)
})
.collect();

result.insert(
keyspace_and_table_name,
Ok(Table {
columns,
partition_key,
clustering_key,
partitioner,
pk_column_specs,
}),
);
}
Expand Down
Loading
Loading