diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index ce6ae430983..2edd4d2d593 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -10,7 +10,11 @@ use std::{ collections::{BTreeSet, HashMap}, ops::Deref, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, }; use async_lock::{Semaphore, SemaphoreGuard}; @@ -23,17 +27,24 @@ use scylla::{ session::Session, session_builder::SessionBuilder, }, + cluster::{ClusterState, Node, NodeRef}, deserialize::{DeserializationError, TypeCheckError}, errors::{ - DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError, - PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError, + ClusterStateTokenError, DbError, ExecutionError, IntoRowsResultError, MetadataError, + NewSessionError, NextPageError, NextRowError, PagerExecutionError, PrepareError, + RequestAttemptError, RequestError, RowsError, }, policies::{ - load_balancing::{DefaultPolicy, LoadBalancingPolicy}, + load_balancing::{DefaultPolicy, FallbackPlan, LoadBalancingPolicy, RoutingInfo}, retry::DefaultRetryPolicy, }, response::PagingState, - statement::{batch::BatchType, prepared::PreparedStatement, Consistency}, + routing::{Shard, Token}, + statement::{ + batch::{Batch, BatchType}, + prepared::PreparedStatement, + Consistency, + }, }; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -225,15 +236,17 @@ impl ScyllaDbClient { async fn build_default_session(uri: &str) -> Result { // This explicitly sets a lot of default parameters for clarity and for making future changes // easier. - SessionBuilder::new() + let session = SessionBuilder::new() .known_node(uri) + .cluster_metadata_refresh_interval(Duration::from_secs(10)) .default_execution_profile_handle(Self::build_default_execution_profile_handle( Self::build_default_policy(), )) .build() .boxed_sync() - .await - .map_err(Into::into) + .await?; + session.refresh_metadata().await?; + Ok(session) } async fn get_multi_key_values_statement( @@ -243,6 +256,7 @@ impl ScyllaDbClient { if let Some(prepared_statement) = self.multi_key_values.get(&num_markers) { return Ok(prepared_statement.clone()); } + let markers = std::iter::repeat_n("?", num_markers) .collect::>() .join(","); @@ -265,6 +279,7 @@ impl ScyllaDbClient { if let Some(prepared_statement) = self.multi_keys.get(&num_markers) { return Ok(prepared_statement.clone()); }; + let markers = std::iter::repeat_n("?", num_markers) .collect::>() .join(","); @@ -405,46 +420,74 @@ impl ScyllaDbClient { Ok(rows.next().is_some()) } + fn get_sticky_shard_policy_or_default( + &self, + partition_key: &[u8], + ) -> Arc { + StickyShardPolicy::new( + &self.session, + &self.namespace, + partition_key, + ScyllaDbClient::build_default_policy(), + ) + .map(|policy| Arc::new(policy) as Arc) + .unwrap_or_else(|_| ScyllaDbClient::build_default_policy()) + } + + // Returns a batch query with a sticky shard policy, that always tries to route to the same + // ScyllaDB shard. + // Should be used only on batches where all statements are to the same partition key. + fn get_sticky_batch_query( + &self, + partition_key: &[u8], + ) -> Result { + // Since we assume this is all to the same partition key, we can use an unlogged batch. + // We could use a logged batch to get atomicity across different partitions, but that + // comes with a huge performance penalty (seems to double write latency). + let mut batch_query = Batch::new(BatchType::Unlogged); + let policy = self.get_sticky_shard_policy_or_default(partition_key); + let handle = Self::build_default_execution_profile_handle(policy); + batch_query.set_execution_profile_handle(Some(handle)); + + Ok(batch_query) + } + + // Batches should be always to the same partition key. Batches across different partitions + // will not be atomic. If the caller wants atomicity, it's the caller's responsibility to + // make sure that the batch only has statements to the same partition key. async fn write_batch_internal( &self, root_key: &[u8], batch: UnorderedBatch, ) -> Result<(), ScyllaDbStoreInternalError> { - let session = &self.session; - let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged); - let mut batch_values = Vec::new(); - let query1 = &self.write_batch_delete_prefix_unbounded; - let query2 = &self.write_batch_delete_prefix_bounded; Self::check_batch_len(&batch)?; + let session = &self.session; + let mut batch_query = self.get_sticky_batch_query(root_key)?; + let mut batch_values = Vec::with_capacity(batch.len()); + for key_prefix in batch.key_prefix_deletions { Self::check_key_size(&key_prefix)?; match get_upper_bound_option(&key_prefix) { None => { - let values = vec![root_key.to_vec(), key_prefix]; - batch_values.push(values); - batch_query.append_statement(query1.clone()); + batch_query.append_statement(self.write_batch_delete_prefix_unbounded.clone()); + batch_values.push(vec![root_key.to_vec(), key_prefix]); } Some(upper_bound) => { - let values = vec![root_key.to_vec(), key_prefix, upper_bound]; - batch_values.push(values); - batch_query.append_statement(query2.clone()); + batch_query.append_statement(self.write_batch_delete_prefix_bounded.clone()); + batch_values.push(vec![root_key.to_vec(), key_prefix, upper_bound]); } } } - let query3 = &self.write_batch_deletion; for key in batch.simple_unordered_batch.deletions { Self::check_key_size(&key)?; - let values = vec![root_key.to_vec(), key]; - batch_values.push(values); - batch_query.append_statement(query3.clone()); + batch_query.append_statement(self.write_batch_deletion.clone()); + batch_values.push(vec![root_key.to_vec(), key]); } - let query4 = &self.write_batch_insertion; for (key, value) in batch.simple_unordered_batch.insertions { Self::check_key_size(&key)?; Self::check_value_size(&value)?; - let values = vec![root_key.to_vec(), key, value]; - batch_values.push(values); - batch_query.append_statement(query4.clone()); + batch_query.append_statement(self.write_batch_insertion.clone()); + batch_values.push(vec![root_key.to_vec(), key, value]); } session.batch(&batch_query, batch_values).await?; Ok(()) @@ -517,6 +560,69 @@ impl ScyllaDbClient { } } +// Batch statements in ScyllaDb are currently not token aware. The batch gets sent to a random +// node: https://rust-driver.docs.scylladb.com/stable/statements/batch.html#performance +// However, for batches where all statements are to the same partition key, we can use a sticky +// shard policy to route to the same shard, and make batches be token aware. +// +// This is a policy that always tries to route to the ScyllaDB shards that contain the token, in a +// round-robin fashion. +#[derive(Debug)] +struct StickyShardPolicy { + replicas: Vec<(Arc, Shard)>, + current_replica_index: AtomicUsize, + fallback: Arc, +} + +impl StickyShardPolicy { + fn new( + session: &Session, + namespace: &str, + partition_key: &[u8], + fallback: Arc, + ) -> Result { + let cluster = session.get_cluster_state(); + let token = cluster.compute_token(KEYSPACE, namespace, &(partition_key,))?; + let replicas = cluster.get_token_endpoints(KEYSPACE, namespace, token); + Ok(Self { + replicas, + current_replica_index: AtomicUsize::new(0), + fallback, + }) + } +} + +impl LoadBalancingPolicy for StickyShardPolicy { + fn name(&self) -> String { + "StickyShardPolicy".to_string() + } + + // Always try first to route to the sticky shard. + fn pick<'a>( + &'a self, + request: &'a RoutingInfo<'a>, + cluster: &'a ClusterState, + ) -> Option<(NodeRef<'a>, Option)> { + if self.replicas.is_empty() { + return self.fallback.pick(request, cluster); + } + // fetch_add will wrap around on overflow, so we should be ok just incrementing forever here. + let new_replica_index = + self.current_replica_index.fetch_add(1, Ordering::Relaxed) % self.replicas.len(); + let (node, shard) = &self.replicas[new_replica_index]; + Some((node, Some(*shard))) + } + + // Fallback to the default policy. + fn fallback<'a>( + &'a self, + request: &'a RoutingInfo, + cluster: &'a ClusterState, + ) -> FallbackPlan<'a> { + self.fallback.fallback(request, cluster) + } +} + /// The client itself and the keeping of the count of active connections. #[derive(Clone)] pub struct ScyllaDbStoreInternal { @@ -588,6 +694,22 @@ pub enum ScyllaDbStoreInternalError { /// A next row error in ScyllaDB #[error(transparent)] NextRowError(#[from] NextRowError), + + /// A token error in ScyllaDB + #[error(transparent)] + ClusterStateTokenError(#[from] ClusterStateTokenError), + + /// The token endpoint information is currently missing from the driver + #[error("The token endpoint information is currently missing from the driver")] + MissingTokenEndpoints(Token), + + /// The mutex is poisoned + #[error("The mutex is poisoned")] + PoisonedMutex, + + /// A metadata error in ScyllaDB + #[error(transparent)] + MetadataError(#[from] MetadataError), } impl KeyValueStoreError for ScyllaDbStoreInternalError { @@ -699,6 +821,9 @@ impl DirectWritableKeyValueStore for ScyllaDbStoreInternal { // https://github.com/scylladb/scylladb/blob/master/docs/dev/timestamp-conflict-resolution.md type Batch = UnorderedBatch; + // Batches should be always to the same partition key. Batches across different partitions + // will not be atomic. If the caller wants atomicity, it's the caller's responsibility to + // make sure that the batch only has statements to the same partition key. async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> { let store = self.store.deref(); let _guard = self.acquire().await;