Skip to content

Commit da4a247

Browse files
Lorak-mmkwprzytula
andcommitted
Metadata: Verify that there are no gaps in pk and ck
Previously it was possible that some positions of `partition_key` and `clustering_key` would remain unfilled and thus empty strings. The probability was very low - Scylla would have to return very weird data - but the possibility was there. This commit verifies that this is not happening, and returns and error if it is. Co-authored-by: Wojciech Przytuła <[email protected]>
1 parent cdf7f84 commit da4a247

File tree

1 file changed

+48
-8
lines changed

1 file changed

+48
-8
lines changed

scylla/src/cluster/metadata.rs

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ type PerKsTableResult<T, E> = PerKsTable<Result<T, E>>;
7777
pub(crate) enum SingleKeyspaceMetadataError {
7878
#[error(transparent)]
7979
MissingUDT(MissingUserDefinedType),
80+
#[error("Partition key column with position {0} is missing from metadata")]
81+
IncompletePartitionKey(i32),
82+
#[error("Clustering key column with position {0} is missing from metadata")]
83+
IncompleteClusteringKey(i32),
8084
}
8185

8286
/// Allows to read current metadata from the cluster
@@ -1606,7 +1610,7 @@ async fn query_tables_schema(
16061610
let mut all_partitioners = query_table_partitioners(conn).await?;
16071611
let mut result = HashMap::new();
16081612

1609-
for ((keyspace_name, table_name), table_result) in tables_schema {
1613+
'tables_loop: for ((keyspace_name, table_name), table_result) in tables_schema {
16101614
let keyspace_and_table_name = (keyspace_name, table_name);
16111615

16121616
#[allow(clippy::type_complexity)]
@@ -1621,16 +1625,52 @@ async fn query_tables_schema(
16211625
continue;
16221626
}
16231627
};
1624-
let mut partition_key = vec!["".to_string(); partition_key_columns.len()];
1625-
for (position, column_name) in partition_key_columns {
1626-
partition_key[position as usize] = column_name;
1627-
}
16281628

1629-
let mut clustering_key = vec!["".to_string(); clustering_key_columns.len()];
1630-
for (position, column_name) in clustering_key_columns {
1631-
clustering_key[position as usize] = column_name;
1629+
fn validate_key_columns(mut key_columns: Vec<(i32, String)>) -> Result<Vec<String>, i32> {
1630+
key_columns.sort_unstable_by_key(|(position, _)| *position);
1631+
1632+
key_columns
1633+
.into_iter()
1634+
.enumerate()
1635+
.map(|(idx, (position, column_name))| {
1636+
// unwrap: I don't see the point of handling the scenario of fetching over
1637+
// 2 * 10^9 columns.
1638+
let idx: i32 = idx.try_into().unwrap();
1639+
if idx == position {
1640+
Ok(column_name)
1641+
} else {
1642+
Err(idx)
1643+
}
1644+
})
1645+
.collect::<Result<Vec<_>, _>>()
16321646
}
16331647

1648+
let partition_key = match validate_key_columns(partition_key_columns) {
1649+
Ok(partition_key_columns) => partition_key_columns,
1650+
Err(position) => {
1651+
result.insert(
1652+
keyspace_and_table_name,
1653+
Err(SingleKeyspaceMetadataError::IncompletePartitionKey(
1654+
position,
1655+
)),
1656+
);
1657+
continue 'tables_loop;
1658+
}
1659+
};
1660+
1661+
let clustering_key = match validate_key_columns(clustering_key_columns) {
1662+
Ok(clustering_key_columns) => clustering_key_columns,
1663+
Err(position) => {
1664+
result.insert(
1665+
keyspace_and_table_name,
1666+
Err(SingleKeyspaceMetadataError::IncompleteClusteringKey(
1667+
position,
1668+
)),
1669+
);
1670+
continue 'tables_loop;
1671+
}
1672+
};
1673+
16341674
let partitioner = all_partitioners
16351675
.remove(&keyspace_and_table_name)
16361676
.unwrap_or_default();

0 commit comments

Comments
 (0)