|
1 | | -use crate::errors::ConnectionPoolError; |
| 1 | +use crate::errors::{ClusterStateTokenError, ConnectionPoolError}; |
2 | 2 | use crate::network::{Connection, PoolConfig, VerifiedKeyspaceName}; |
3 | 3 | use crate::policies::host_filter::HostFilter; |
4 | 4 | use crate::routing::locator::tablets::{RawTablet, Tablet, TabletsInfo}; |
5 | 5 | use crate::routing::locator::ReplicaLocator; |
6 | 6 | use crate::routing::partitioner::{calculate_token_for_partition_key, PartitionerName}; |
7 | 7 | use crate::routing::{Shard, Token}; |
8 | | -use crate::statement::prepared::TokenCalculationError; |
9 | 8 |
|
10 | 9 | use itertools::Itertools; |
11 | 10 | use scylla_cql::frame::response::result::TableSpec; |
12 | | -use scylla_cql::serialize::row::SerializedValues; |
| 11 | +use scylla_cql::serialize::row::{RowSerializationContext, SerializeRow, SerializedValues}; |
13 | 12 | use std::collections::{HashMap, HashSet}; |
14 | 13 | use std::sync::Arc; |
15 | 14 | use tracing::{debug, warn}; |
@@ -203,17 +202,29 @@ impl ClusterState { |
203 | 202 | &self, |
204 | 203 | keyspace: &str, |
205 | 204 | table: &str, |
206 | | - partition_key: &SerializedValues, |
207 | | - ) -> Result<Token, TokenCalculationError> { |
208 | | - let partitioner = self |
| 205 | + partition_key: &dyn SerializeRow, |
| 206 | + ) -> Result<Token, ClusterStateTokenError> { |
| 207 | + let Some(table) = self |
209 | 208 | .keyspaces |
210 | 209 | .get(keyspace) |
211 | 210 | .and_then(|k| k.tables.get(table)) |
212 | | - .and_then(|t| t.partitioner.as_deref()) |
| 211 | + else { |
| 212 | + return Err(ClusterStateTokenError::UnknownTable { |
| 213 | + keyspace: keyspace.to_owned(), |
| 214 | + table: table.to_owned(), |
| 215 | + }); |
| 216 | + }; |
| 217 | + let values = SerializedValues::from_serializable( |
| 218 | + &RowSerializationContext::from_specs(table.pk_column_specs.as_slice()), |
| 219 | + partition_key, |
| 220 | + )?; |
| 221 | + let partitioner = table |
| 222 | + .partitioner |
| 223 | + .as_deref() |
213 | 224 | .and_then(PartitionerName::from_str) |
214 | 225 | .unwrap_or_default(); |
215 | | - |
216 | | - calculate_token_for_partition_key(partition_key, &partitioner) |
| 226 | + calculate_token_for_partition_key(&values, &partitioner) |
| 227 | + .map_err(ClusterStateTokenError::TokenCalculation) |
217 | 228 | } |
218 | 229 |
|
219 | 230 | /// Access to replicas owning a given token |
@@ -250,8 +261,8 @@ impl ClusterState { |
250 | 261 | &self, |
251 | 262 | keyspace: &str, |
252 | 263 | table: &str, |
253 | | - partition_key: &SerializedValues, |
254 | | - ) -> Result<Vec<(Arc<Node>, Shard)>, TokenCalculationError> { |
| 264 | + partition_key: &dyn SerializeRow, |
| 265 | + ) -> Result<Vec<(Arc<Node>, Shard)>, ClusterStateTokenError> { |
255 | 266 | let token = self.compute_token(keyspace, table, partition_key)?; |
256 | 267 | Ok(self.get_token_endpoints(keyspace, table, token)) |
257 | 268 | } |
|
0 commit comments