Skip to content

Commit 11b2d67

Browse files
committed
ClusterState: Accepts &dyn SerializeRow instead of SerializedValues
There is no easy way for the users to create SerializedValues, which makes the current APIs cumbersome to use. Instead they should accept `&dyn SerializeRow` and perform serialization based on table metadata. This change means that those methods can now also return SerializationError, and also need to handle a table missing from metadata, preferably also returning an error in this case. No existing error type fits here, so either we need to extend some existing one, or create new one. First idea was extending PartitionKeyError, but it needs to be convertible to ExecutionError, in which we don't need those new variants. For that reason I introduced a new error type for those methods, called ClusterStateTokenError.
1 parent b75dbb9 commit 11b2d67

File tree

3 files changed

+43
-21
lines changed

3 files changed

+43
-21
lines changed

scylla/src/client/session_test.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,6 @@ async fn test_prepared_statement() {
218218
.unwrap();
219219

220220
let values = (17_i32, 16_i32, "I'm prepared!!!");
221-
let serialized_values_complex_pk = prepared_complex_pk_statement
222-
.serialize_values(&values)
223-
.unwrap();
224221

225222
session
226223
.execute_unpaged(&prepared_statement, &values)
@@ -245,12 +242,9 @@ async fn test_prepared_statement() {
245242
let prepared_token = Murmur3Partitioner
246243
.hash_one(&prepared_statement.compute_partition_key(&values).unwrap());
247244
assert_eq!(token, prepared_token);
248-
let mut pk = SerializedValues::new();
249-
pk.add_value(&17_i32, &ColumnType::Native(NativeType::Int))
250-
.unwrap();
251245
let cluster_state_token = session
252246
.get_cluster_state()
253-
.compute_token(&ks, "t2", &pk)
247+
.compute_token(&ks, "t2", &(values.0,))
254248
.unwrap();
255249
assert_eq!(token, cluster_state_token);
256250
}
@@ -272,7 +266,7 @@ async fn test_prepared_statement() {
272266
assert_eq!(token, prepared_token);
273267
let cluster_state_token = session
274268
.get_cluster_state()
275-
.compute_token(&ks, "complex_pk", &serialized_values_complex_pk)
269+
.compute_token(&ks, "complex_pk", &values)
276270
.unwrap();
277271
assert_eq!(token, cluster_state_token);
278272
}
@@ -608,7 +602,6 @@ async fn test_token_calculation() {
608602
s.push('a');
609603
}
610604
let values = (&s,);
611-
let serialized_values = prepared_statement.serialize_values(&values).unwrap();
612605
session
613606
.execute_unpaged(&prepared_statement, &values)
614607
.await
@@ -631,7 +624,7 @@ async fn test_token_calculation() {
631624
assert_eq!(token, prepared_token);
632625
let cluster_state_token = session
633626
.get_cluster_state()
634-
.compute_token(&ks, "t3", &serialized_values)
627+
.compute_token(&ks, "t3", &values)
635628
.unwrap();
636629
assert_eq!(token, cluster_state_token);
637630
}

scylla/src/cluster/state.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
use crate::errors::ConnectionPoolError;
1+
use crate::errors::{ClusterStateTokenError, ConnectionPoolError};
22
use crate::network::{Connection, PoolConfig, VerifiedKeyspaceName};
33
use crate::policies::host_filter::HostFilter;
44
use crate::routing::locator::tablets::{RawTablet, Tablet, TabletsInfo};
55
use crate::routing::locator::ReplicaLocator;
66
use crate::routing::partitioner::{calculate_token_for_partition_key, PartitionerName};
77
use crate::routing::{Shard, Token};
8-
use crate::statement::prepared::TokenCalculationError;
98

109
use itertools::Itertools;
1110
use scylla_cql::frame::response::result::TableSpec;
12-
use scylla_cql::serialize::row::SerializedValues;
11+
use scylla_cql::serialize::row::{RowSerializationContext, SerializeRow, SerializedValues};
1312
use std::collections::{HashMap, HashSet};
1413
use std::sync::Arc;
1514
use tracing::{debug, warn};
@@ -203,17 +202,29 @@ impl ClusterState {
203202
&self,
204203
keyspace: &str,
205204
table: &str,
206-
partition_key: &SerializedValues,
207-
) -> Result<Token, TokenCalculationError> {
208-
let partitioner = self
205+
partition_key: &dyn SerializeRow,
206+
) -> Result<Token, ClusterStateTokenError> {
207+
let Some(table) = self
209208
.keyspaces
210209
.get(keyspace)
211210
.and_then(|k| k.tables.get(table))
212-
.and_then(|t| t.partitioner.as_deref())
211+
else {
212+
return Err(ClusterStateTokenError::UnknownTable {
213+
keyspace: keyspace.to_owned(),
214+
table: table.to_owned(),
215+
});
216+
};
217+
let values = SerializedValues::from_serializable(
218+
&RowSerializationContext::from_specs(table.pk_column_specs.as_slice()),
219+
partition_key,
220+
)?;
221+
let partitioner = table
222+
.partitioner
223+
.as_deref()
213224
.and_then(PartitionerName::from_str)
214225
.unwrap_or_default();
215-
216-
calculate_token_for_partition_key(partition_key, &partitioner)
226+
calculate_token_for_partition_key(&values, &partitioner)
227+
.map_err(ClusterStateTokenError::TokenCalculation)
217228
}
218229

219230
/// Access to replicas owning a given token
@@ -250,8 +261,8 @@ impl ClusterState {
250261
&self,
251262
keyspace: &str,
252263
table: &str,
253-
partition_key: &SerializedValues,
254-
) -> Result<Vec<(Arc<Node>, Shard)>, TokenCalculationError> {
264+
partition_key: &dyn SerializeRow,
265+
) -> Result<Vec<(Arc<Node>, Shard)>, ClusterStateTokenError> {
255266
let token = self.compute_token(keyspace, table, partition_key)?;
256267
Ok(self.get_token_endpoints(keyspace, table, token))
257268
}

scylla/src/errors.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::frame::response;
1212
// Re-export error types from pager module.
1313
pub use crate::client::pager::{NextPageError, NextRowError};
1414

15+
use crate::statement::prepared::TokenCalculationError;
1516
// Re-export error types from query_result module.
1617
pub use crate::response::query_result::{
1718
FirstRowError, IntoRowsResultError, MaybeFirstRowError, ResultNotRowsError, RowsError,
@@ -934,6 +935,23 @@ pub(crate) enum ResponseParseError {
934935
CqlResponseParseError(#[from] CqlResponseParseError),
935936
}
936937

938+
/// Error returned from [ClusterState](crate::cluster::ClusterState) APIs.
939+
#[derive(Clone, Debug, Error)]
940+
#[non_exhaustive]
941+
pub enum ClusterStateTokenError {
942+
/// Failed to calculate token.
943+
#[error(transparent)]
944+
TokenCalculation(#[from] TokenCalculationError),
945+
946+
/// Failed to serialize values required to compute partition key.
947+
#[error(transparent)]
948+
Serialization(#[from] SerializationError),
949+
950+
/// ClusterState doesn't currently have metadata for the requested table.
951+
#[error("Can't find metadata for requested table ({keyspace}.{table}).")]
952+
UnknownTable { keyspace: String, table: String },
953+
}
954+
937955
#[cfg(test)]
938956
mod tests {
939957
use scylla_cql::Consistency;

0 commit comments

Comments
 (0)