Skip to content

Commit d65e7cf

Browse files
authored
Merge pull request #1252 from Lorak-mmk/cluster-state-api-fixes
Remove SerializedValues from public API of `scylla` crate.
2 parents e808345 + 473cc84 commit d65e7cf

File tree

9 files changed

+199
-62
lines changed

9 files changed

+199
-62
lines changed

scylla-cql/src/serialize/row.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,6 @@ impl<'a> RowSerializationContext<'a> {
5252
pub fn columns(&self) -> &'a [ColumnSpec] {
5353
self.columns
5454
}
55-
56-
/// Looks up and returns a column/bind marker by name.
57-
// TODO: change RowSerializationContext to make this faster
58-
#[inline]
59-
pub fn column_by_name(&self, target: &str) -> Option<&ColumnSpec> {
60-
self.columns.iter().find(|&c| c.name() == target)
61-
}
6255
}
6356

6457
/// Represents a set of values that can be sent along a CQL statement.
@@ -514,7 +507,7 @@ impl SerializedValues {
514507
pub const EMPTY: &'static SerializedValues = &SerializedValues::new();
515508

516509
/// Constructs `SerializedValues` from given [`SerializeRow`] object.
517-
pub fn from_serializable<T: SerializeRow>(
510+
pub fn from_serializable<T: SerializeRow + ?Sized>(
518511
ctx: &RowSerializationContext,
519512
row: &T,
520513
) -> Result<Self, SerializationError> {

scylla/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ full-serialization = [
3939
"num-bigint-04",
4040
"bigdecimal-04",
4141
]
42+
unstable-testing = []
4243

4344
[dependencies]
4445
scylla-macros = { version = "0.7.0", path = "../scylla-macros" }
@@ -96,6 +97,7 @@ time = "0.3"
9697
[[bench]]
9798
name = "benchmark"
9899
harness = false
100+
required-features = ["unstable-testing"]
99101

100102
[lints.rust]
101103
unnameable_types = "warn"

scylla/benches/benchmark.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use criterion::{criterion_group, criterion_main, Criterion};
22

33
use bytes::BytesMut;
4-
use scylla::routing::partitioner::{calculate_token_for_partition_key, PartitionerName};
4+
use scylla::internal_testing::calculate_token_for_partition_key;
5+
use scylla::routing::partitioner::PartitionerName;
56
use scylla_cql::frame::response::result::{ColumnType, NativeType};
67
use scylla_cql::frame::types;
78
use scylla_cql::serialize::row::SerializedValues;

scylla/src/client/session_test.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,6 @@ async fn test_prepared_statement() {
218218
.unwrap();
219219

220220
let values = (17_i32, 16_i32, "I'm prepared!!!");
221-
let serialized_values_complex_pk = prepared_complex_pk_statement
222-
.serialize_values(&values)
223-
.unwrap();
224221

225222
session
226223
.execute_unpaged(&prepared_statement, &values)
@@ -245,12 +242,9 @@ async fn test_prepared_statement() {
245242
let prepared_token = Murmur3Partitioner
246243
.hash_one(&prepared_statement.compute_partition_key(&values).unwrap());
247244
assert_eq!(token, prepared_token);
248-
let mut pk = SerializedValues::new();
249-
pk.add_value(&17_i32, &ColumnType::Native(NativeType::Int))
250-
.unwrap();
251245
let cluster_state_token = session
252246
.get_cluster_state()
253-
.compute_token(&ks, "t2", &pk)
247+
.compute_token(&ks, "t2", &(values.0,))
254248
.unwrap();
255249
assert_eq!(token, cluster_state_token);
256250
}
@@ -272,7 +266,7 @@ async fn test_prepared_statement() {
272266
assert_eq!(token, prepared_token);
273267
let cluster_state_token = session
274268
.get_cluster_state()
275-
.compute_token(&ks, "complex_pk", &serialized_values_complex_pk)
269+
.compute_token(&ks, "complex_pk", &values)
276270
.unwrap();
277271
assert_eq!(token, cluster_state_token);
278272
}
@@ -608,7 +602,6 @@ async fn test_token_calculation() {
608602
s.push('a');
609603
}
610604
let values = (&s,);
611-
let serialized_values = prepared_statement.serialize_values(&values).unwrap();
612605
session
613606
.execute_unpaged(&prepared_statement, &values)
614607
.await
@@ -631,7 +624,7 @@ async fn test_token_calculation() {
631624
assert_eq!(token, prepared_token);
632625
let cluster_state_token = session
633626
.get_cluster_state()
634-
.compute_token(&ks, "t3", &serialized_values)
627+
.compute_token(&ks, "t3", &values)
635628
.unwrap();
636629
assert_eq!(token, cluster_state_token);
637630
}

scylla/src/cluster/metadata.rs

Lines changed: 119 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use futures::Stream;
3535
use itertools::Itertools;
3636
use rand::seq::{IndexedRandom, SliceRandom};
3737
use rand::{rng, Rng};
38+
use scylla_cql::frame::response::result::{ColumnSpec, TableSpec};
3839
use scylla_macros::DeserializeRow;
3940
use std::borrow::BorrowMut;
4041
use std::cell::Cell;
@@ -67,6 +68,22 @@ type PerTable<T> = HashMap<String, T>;
6768
type PerKsTable<T> = HashMap<(String, String), T>;
6869
type PerKsTableResult<T, E> = PerKsTable<Result<T, E>>;
6970

71+
/// Indicates that reading metadata failed, but in a way
72+
/// that we can handle, by throwing out data for a keyspace.
73+
/// It is possible that some of the errors could be handled in even
74+
/// more granular way (e.g. throwing out a single table), but keyspace
75+
/// granularity seems like a good choice given how independent keyspaces
76+
/// are from each other.
77+
#[derive(Clone, Debug, Error)]
78+
pub(crate) enum SingleKeyspaceMetadataError {
79+
#[error(transparent)]
80+
MissingUDT(MissingUserDefinedType),
81+
#[error("Partition key column with position {0} is missing from metadata")]
82+
IncompletePartitionKey(i32),
83+
#[error("Clustering key column with position {0} is missing from metadata")]
84+
IncompleteClusteringKey(i32),
85+
}
86+
7087
/// Allows to read current metadata from the cluster
7188
pub(crate) struct MetadataReader {
7289
control_connection_pool_config: PoolConfig,
@@ -92,7 +109,7 @@ pub(crate) struct MetadataReader {
92109
/// Describes all metadata retrieved from the cluster
93110
pub(crate) struct Metadata {
94111
pub(crate) peers: Vec<Peer>,
95-
pub(crate) keyspaces: HashMap<String, Result<Keyspace, MissingUserDefinedType>>,
112+
pub(crate) keyspaces: HashMap<String, Result<Keyspace, SingleKeyspaceMetadataError>>,
96113
}
97114

98115
#[non_exhaustive] // <- so that we can add more fields in a backwards-compatible way
@@ -181,9 +198,14 @@ pub struct Keyspace {
181198
#[derive(Clone, Debug, PartialEq, Eq)]
182199
pub struct Table {
183200
pub columns: HashMap<String, Column>,
201+
/// Names of the column of partition key.
202+
/// All of the names are guaranteed to be present in `columns` field.
184203
pub partition_key: Vec<String>,
204+
/// Names of the column of clustering key.
205+
/// All of the names are guaranteed to be present in `columns` field.
185206
pub clustering_key: Vec<String>,
186207
pub partitioner: Option<String>,
208+
pub(crate) pk_column_specs: Vec<ColumnSpec<'static>>,
187209
}
188210

189211
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -902,7 +924,7 @@ async fn query_keyspaces(
902924
conn: &Arc<Connection>,
903925
keyspaces_to_fetch: &[String],
904926
fetch_schema: bool,
905-
) -> Result<PerKeyspaceResult<Keyspace, MissingUserDefinedType>, MetadataError> {
927+
) -> Result<PerKeyspaceResult<Keyspace, SingleKeyspaceMetadataError>, MetadataError> {
906928
let rows = query_filter_keyspace_name::<(String, HashMap<String, String>)>(
907929
conn,
908930
"select keyspace_name, replication from system_schema.keyspaces",
@@ -953,15 +975,20 @@ async fn query_keyspaces(
953975

954976
// As you can notice, in this file we generally operate on two layers of errors:
955977
// - Outer (MetadataError) if something went wrong with querying the cluster.
956-
// - Inner (currently MissingUserDefinedType, possibly other variants in the future) if the fetched metadata
957-
// turned out to not be fully consistent.
978+
// - Inner (SingleKeyspaceMetadataError) if the fetched metadata turned out to not be fully consistent.
958979
// If there is an inner error, we want to drop metadata for the whole keyspace.
959-
// This logic checks if either tables views or UDTs have such inner error, and returns it if so.
980+
// This logic checks if either tables, views, or UDTs have such inner error, and returns it if so.
960981
// Notice that in the error branch, return value is wrapped in `Ok` - but this is the
961982
// outer error, so it just means there was no error while querying the cluster.
962983
let (tables, views, user_defined_types) = match (tables, views, user_defined_types) {
963984
(Ok(t), Ok(v), Ok(u)) => (t, v, u),
964-
(Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => return Ok((keyspace_name, Err(e))),
985+
(Err(e), _, _) | (_, Err(e), _) => return Ok((keyspace_name, Err(e))),
986+
(_, _, Err(e)) => {
987+
return Ok((
988+
keyspace_name,
989+
Err(SingleKeyspaceMetadataError::MissingUDT(e)),
990+
))
991+
}
965992
};
966993

967994
let keyspace = Keyspace {
@@ -1364,8 +1391,8 @@ mod toposort_tests {
13641391
async fn query_tables(
13651392
conn: &Arc<Connection>,
13661393
keyspaces_to_fetch: &[String],
1367-
tables: &mut PerKsTableResult<Table, MissingUserDefinedType>,
1368-
) -> Result<PerKeyspaceResult<PerTable<Table>, MissingUserDefinedType>, MetadataError> {
1394+
tables: &mut PerKsTableResult<Table, SingleKeyspaceMetadataError>,
1395+
) -> Result<PerKeyspaceResult<PerTable<Table>, SingleKeyspaceMetadataError>, MetadataError> {
13691396
let rows = query_filter_keyspace_name::<(String, String)>(
13701397
conn,
13711398
"SELECT keyspace_name, table_name FROM system_schema.tables",
@@ -1385,6 +1412,7 @@ async fn query_tables(
13851412
partition_key: vec![],
13861413
clustering_key: vec![],
13871414
partitioner: None,
1415+
pk_column_specs: vec![],
13881416
}));
13891417

13901418
let mut entry = result
@@ -1409,8 +1437,9 @@ async fn query_tables(
14091437
async fn query_views(
14101438
conn: &Arc<Connection>,
14111439
keyspaces_to_fetch: &[String],
1412-
tables: &mut PerKsTableResult<Table, MissingUserDefinedType>,
1413-
) -> Result<PerKeyspaceResult<PerTable<MaterializedView>, MissingUserDefinedType>, MetadataError> {
1440+
tables: &mut PerKsTableResult<Table, SingleKeyspaceMetadataError>,
1441+
) -> Result<PerKeyspaceResult<PerTable<MaterializedView>, SingleKeyspaceMetadataError>, MetadataError>
1442+
{
14141443
let rows = query_filter_keyspace_name::<(String, String, String)>(
14151444
conn,
14161445
"SELECT keyspace_name, view_name, base_table_name FROM system_schema.views",
@@ -1435,6 +1464,7 @@ async fn query_views(
14351464
partition_key: vec![],
14361465
clustering_key: vec![],
14371466
partitioner: None,
1467+
pk_column_specs: vec![],
14381468
}))
14391469
.map(|table| MaterializedView {
14401470
view_metadata: table,
@@ -1465,7 +1495,7 @@ async fn query_tables_schema(
14651495
conn: &Arc<Connection>,
14661496
keyspaces_to_fetch: &[String],
14671497
udts: &PerKeyspaceResult<PerTable<Arc<UserDefinedType<'static>>>, MissingUserDefinedType>,
1468-
) -> Result<PerKsTableResult<Table, MissingUserDefinedType>, MetadataError> {
1498+
) -> Result<PerKsTableResult<Table, SingleKeyspaceMetadataError>, MetadataError> {
14691499
// Upon migration from thrift to CQL, Cassandra internally creates a surrogate column "value" of
14701500
// type EmptyType for dense tables. This resolves into this CQL type name.
14711501
// This column shouldn't be exposed to the user but is currently exposed in system tables.
@@ -1484,7 +1514,7 @@ async fn query_tables_schema(
14841514

14851515
let empty_ok_map = Ok(HashMap::new());
14861516

1487-
let mut tables_schema: HashMap<_, Result<_, MissingUserDefinedType>> = HashMap::new();
1517+
let mut tables_schema: HashMap<_, Result<_, SingleKeyspaceMetadataError>> = HashMap::new();
14881518

14891519
rows.map(|row_result| {
14901520
let (keyspace_name, table_name, column_name, kind, position, type_) = row_result?;
@@ -1518,7 +1548,10 @@ async fn query_tables_schema(
15181548
// is minor enough to ignore. Note that the first issue also applies to
15191549
// solution 1: but the keyspace won't be present in the result at all,
15201550
// which is arguably worse.
1521-
tables_schema.insert((keyspace_name, table_name), Err(e.clone()));
1551+
tables_schema.insert(
1552+
(keyspace_name, table_name),
1553+
Err(SingleKeyspaceMetadataError::MissingUDT(e.clone())),
1554+
);
15221555
return Ok::<_, MetadataError>(());
15231556
}
15241557
};
@@ -1532,7 +1565,10 @@ async fn query_tables_schema(
15321565
let cql_type = match pre_cql_type.into_cql_type(&keyspace_name, keyspace_udts) {
15331566
Ok(t) => t,
15341567
Err(e) => {
1535-
tables_schema.insert((keyspace_name, table_name), Err(e));
1568+
tables_schema.insert(
1569+
(keyspace_name, table_name),
1570+
Err(SingleKeyspaceMetadataError::MissingUDT(e)),
1571+
);
15361572
return Ok::<_, MetadataError>(());
15371573
}
15381574
};
@@ -1549,21 +1585,21 @@ async fn query_tables_schema(
15491585
.entry((keyspace_name, table_name))
15501586
.or_insert(Ok((
15511587
HashMap::new(), // columns
1552-
HashMap::new(), // partition key
1553-
HashMap::new(), // clustering key
1588+
Vec::new(), // partition key
1589+
Vec::new(), // clustering key
15541590
)))
15551591
else {
15561592
// This table was previously marked as broken, no way to insert anything.
15571593
return Ok::<_, MetadataError>(());
15581594
};
15591595

15601596
if kind == ColumnKind::PartitionKey || kind == ColumnKind::Clustering {
1561-
let key_map = if kind == ColumnKind::PartitionKey {
1597+
let key_list: &mut Vec<(i32, String)> = if kind == ColumnKind::PartitionKey {
15621598
entry.1.borrow_mut()
15631599
} else {
15641600
entry.2.borrow_mut()
15651601
};
1566-
key_map.insert(position, column_name.clone());
1602+
key_list.push((position, column_name.clone()));
15671603
}
15681604

15691605
entry.0.insert(
@@ -1582,37 +1618,93 @@ async fn query_tables_schema(
15821618
let mut all_partitioners = query_table_partitioners(conn).await?;
15831619
let mut result = HashMap::new();
15841620

1585-
for ((keyspace_name, table_name), table_result) in tables_schema {
1621+
'tables_loop: for ((keyspace_name, table_name), table_result) in tables_schema {
15861622
let keyspace_and_table_name = (keyspace_name, table_name);
15871623

1588-
let (columns, partition_key_columns, clustering_key_columns) = match table_result {
1624+
#[allow(clippy::type_complexity)]
1625+
let (columns, partition_key_columns, clustering_key_columns): (
1626+
HashMap<String, Column>,
1627+
Vec<(i32, String)>,
1628+
Vec<(i32, String)>,
1629+
) = match table_result {
15891630
Ok(table) => table,
15901631
Err(e) => {
15911632
let _ = result.insert(keyspace_and_table_name, Err(e));
15921633
continue;
15931634
}
15941635
};
1595-
let mut partition_key = vec!["".to_string(); partition_key_columns.len()];
1596-
for (position, column_name) in partition_key_columns {
1597-
partition_key[position as usize] = column_name;
1598-
}
15991636

1600-
let mut clustering_key = vec!["".to_string(); clustering_key_columns.len()];
1601-
for (position, column_name) in clustering_key_columns {
1602-
clustering_key[position as usize] = column_name;
1637+
fn validate_key_columns(mut key_columns: Vec<(i32, String)>) -> Result<Vec<String>, i32> {
1638+
key_columns.sort_unstable_by_key(|(position, _)| *position);
1639+
1640+
key_columns
1641+
.into_iter()
1642+
.enumerate()
1643+
.map(|(idx, (position, column_name))| {
1644+
// unwrap: I don't see the point of handling the scenario of fetching over
1645+
// 2 * 10^9 columns.
1646+
let idx: i32 = idx.try_into().unwrap();
1647+
if idx == position {
1648+
Ok(column_name)
1649+
} else {
1650+
Err(idx)
1651+
}
1652+
})
1653+
.collect::<Result<Vec<_>, _>>()
16031654
}
16041655

1656+
let partition_key = match validate_key_columns(partition_key_columns) {
1657+
Ok(partition_key_columns) => partition_key_columns,
1658+
Err(position) => {
1659+
result.insert(
1660+
keyspace_and_table_name,
1661+
Err(SingleKeyspaceMetadataError::IncompletePartitionKey(
1662+
position,
1663+
)),
1664+
);
1665+
continue 'tables_loop;
1666+
}
1667+
};
1668+
1669+
let clustering_key = match validate_key_columns(clustering_key_columns) {
1670+
Ok(clustering_key_columns) => clustering_key_columns,
1671+
Err(position) => {
1672+
result.insert(
1673+
keyspace_and_table_name,
1674+
Err(SingleKeyspaceMetadataError::IncompleteClusteringKey(
1675+
position,
1676+
)),
1677+
);
1678+
continue 'tables_loop;
1679+
}
1680+
};
1681+
16051682
let partitioner = all_partitioners
16061683
.remove(&keyspace_and_table_name)
16071684
.unwrap_or_default();
16081685

1686+
// unwrap of get() result: all column names in `partition_key` are at this
1687+
// point guaranteed to be present in `columns`. See the construction of `partition_key`
1688+
let pk_column_specs = partition_key
1689+
.iter()
1690+
.map(|column_name| (column_name, columns.get(column_name).unwrap().clone().typ))
1691+
.map(|(name, typ)| {
1692+
let table_spec = TableSpec::owned(
1693+
keyspace_and_table_name.0.clone(),
1694+
keyspace_and_table_name.1.clone(),
1695+
);
1696+
ColumnSpec::owned(name.to_owned(), typ, table_spec)
1697+
})
1698+
.collect();
1699+
16091700
result.insert(
16101701
keyspace_and_table_name,
16111702
Ok(Table {
16121703
columns,
16131704
partition_key,
16141705
clustering_key,
16151706
partitioner,
1707+
pk_column_specs,
16161708
}),
16171709
);
16181710
}

0 commit comments

Comments
 (0)