Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 152 additions & 27 deletions linera-views/src/backends/scylla_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
use std::{
collections::{BTreeSet, HashMap},
ops::Deref,
sync::Arc,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};

use async_lock::{Semaphore, SemaphoreGuard};
Expand All @@ -23,17 +27,24 @@ use scylla::{
session::Session,
session_builder::SessionBuilder,
},
cluster::{ClusterState, Node, NodeRef},
deserialize::{DeserializationError, TypeCheckError},
errors::{
DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
ClusterStateTokenError, DbError, ExecutionError, IntoRowsResultError, MetadataError,
NewSessionError, NextPageError, NextRowError, PagerExecutionError, PrepareError,
RequestAttemptError, RequestError, RowsError,
},
policies::{
load_balancing::{DefaultPolicy, LoadBalancingPolicy},
load_balancing::{DefaultPolicy, FallbackPlan, LoadBalancingPolicy, RoutingInfo},
retry::DefaultRetryPolicy,
},
response::PagingState,
statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
routing::{Shard, Token},
statement::{
batch::{Batch, BatchType},
prepared::PreparedStatement,
Consistency,
},
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand Down Expand Up @@ -225,15 +236,17 @@ impl ScyllaDbClient {
async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
// This explicitly sets a lot of default parameters for clarity and for making future changes
// easier.
SessionBuilder::new()
let session = SessionBuilder::new()
.known_node(uri)
.cluster_metadata_refresh_interval(Duration::from_secs(10))
.default_execution_profile_handle(Self::build_default_execution_profile_handle(
Self::build_default_policy(),
))
.build()
.boxed_sync()
.await
.map_err(Into::into)
.await?;
session.refresh_metadata().await?;
Ok(session)
}

async fn get_multi_key_values_statement(
Expand All @@ -243,6 +256,7 @@ impl ScyllaDbClient {
if let Some(prepared_statement) = self.multi_key_values.get(&num_markers) {
return Ok(prepared_statement.clone());
}

let markers = std::iter::repeat_n("?", num_markers)
.collect::<Vec<_>>()
.join(",");
Expand All @@ -265,6 +279,7 @@ impl ScyllaDbClient {
if let Some(prepared_statement) = self.multi_keys.get(&num_markers) {
return Ok(prepared_statement.clone());
};

let markers = std::iter::repeat_n("?", num_markers)
.collect::<Vec<_>>()
.join(",");
Expand Down Expand Up @@ -405,46 +420,74 @@ impl ScyllaDbClient {
Ok(rows.next().is_some())
}

fn get_sticky_shard_policy_or_default(
&self,
partition_key: &[u8],
) -> Arc<dyn LoadBalancingPolicy> {
StickyShardPolicy::new(
&self.session,
&self.namespace,
partition_key,
ScyllaDbClient::build_default_policy(),
)
.map(|policy| Arc::new(policy) as Arc<dyn LoadBalancingPolicy>)
.unwrap_or_else(|_| ScyllaDbClient::build_default_policy())
}

// Returns a batch query with a sticky shard policy, that always tries to route to the same
// ScyllaDB shard.
// Should be used only on batches where all statements are to the same partition key.
fn get_sticky_batch_query(
&self,
partition_key: &[u8],
) -> Result<Batch, ScyllaDbStoreInternalError> {
// Since we assume this is all to the same partition key, we can use an unlogged batch.
// We could use a logged batch to get atomicity across different partitions, but that
// comes with a huge performance penalty (seems to double write latency).
let mut batch_query = Batch::new(BatchType::Unlogged);
let policy = self.get_sticky_shard_policy_or_default(partition_key);
let handle = Self::build_default_execution_profile_handle(policy);
batch_query.set_execution_profile_handle(Some(handle));

Ok(batch_query)
}

// Batches should be always to the same partition key. Batches across different partitions
// will not be atomic. If the caller wants atomicity, it's the caller's responsibility to
// make sure that the batch only has statements to the same partition key.
async fn write_batch_internal(
&self,
root_key: &[u8],
batch: UnorderedBatch,
) -> Result<(), ScyllaDbStoreInternalError> {
let session = &self.session;
let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
let mut batch_values = Vec::new();
let query1 = &self.write_batch_delete_prefix_unbounded;
let query2 = &self.write_batch_delete_prefix_bounded;
Self::check_batch_len(&batch)?;
let session = &self.session;
let mut batch_query = self.get_sticky_batch_query(root_key)?;
let mut batch_values = Vec::with_capacity(batch.len());

for key_prefix in batch.key_prefix_deletions {
Self::check_key_size(&key_prefix)?;
match get_upper_bound_option(&key_prefix) {
None => {
let values = vec![root_key.to_vec(), key_prefix];
batch_values.push(values);
batch_query.append_statement(query1.clone());
batch_query.append_statement(self.write_batch_delete_prefix_unbounded.clone());
batch_values.push(vec![root_key.to_vec(), key_prefix]);
}
Some(upper_bound) => {
let values = vec![root_key.to_vec(), key_prefix, upper_bound];
batch_values.push(values);
batch_query.append_statement(query2.clone());
batch_query.append_statement(self.write_batch_delete_prefix_bounded.clone());
batch_values.push(vec![root_key.to_vec(), key_prefix, upper_bound]);
}
}
}
let query3 = &self.write_batch_deletion;
for key in batch.simple_unordered_batch.deletions {
Self::check_key_size(&key)?;
let values = vec![root_key.to_vec(), key];
batch_values.push(values);
batch_query.append_statement(query3.clone());
batch_query.append_statement(self.write_batch_deletion.clone());
batch_values.push(vec![root_key.to_vec(), key]);
}
let query4 = &self.write_batch_insertion;
for (key, value) in batch.simple_unordered_batch.insertions {
Self::check_key_size(&key)?;
Self::check_value_size(&value)?;
let values = vec![root_key.to_vec(), key, value];
batch_values.push(values);
batch_query.append_statement(query4.clone());
batch_query.append_statement(self.write_batch_insertion.clone());
batch_values.push(vec![root_key.to_vec(), key, value]);
}
session.batch(&batch_query, batch_values).await?;
Ok(())
Expand Down Expand Up @@ -517,6 +560,69 @@ impl ScyllaDbClient {
}
}

// Batch statements in ScyllaDb are currently not token aware. The batch gets sent to a random
// node: https://rust-driver.docs.scylladb.com/stable/statements/batch.html#performance
// However, for batches where all statements are to the same partition key, we can use a sticky
// shard policy to route to the same shard, and make batches be token aware.
//
// This is a policy that always tries to route to the ScyllaDB shards that contain the token, in a
// round-robin fashion.
#[derive(Debug)]
struct StickyShardPolicy {
replicas: Vec<(Arc<Node>, Shard)>,
current_replica_index: AtomicUsize,
fallback: Arc<dyn LoadBalancingPolicy>,
}

impl StickyShardPolicy {
fn new(
session: &Session,
namespace: &str,
partition_key: &[u8],
fallback: Arc<dyn LoadBalancingPolicy>,
) -> Result<Self, ScyllaDbStoreInternalError> {
let cluster = session.get_cluster_state();
let token = cluster.compute_token(KEYSPACE, namespace, &(partition_key,))?;
let replicas = cluster.get_token_endpoints(KEYSPACE, namespace, token);
Ok(Self {
replicas,
current_replica_index: AtomicUsize::new(0),
fallback,
})
}
}

impl LoadBalancingPolicy for StickyShardPolicy {
fn name(&self) -> String {
"StickyShardPolicy".to_string()
}

// Always try first to route to the sticky shard.
fn pick<'a>(
&'a self,
request: &'a RoutingInfo<'a>,
cluster: &'a ClusterState,
) -> Option<(NodeRef<'a>, Option<Shard>)> {
if self.replicas.is_empty() {
return self.fallback.pick(request, cluster);
}
// fetch_add will wrap around on overflow, so we should be ok just incrementing forever here.
let new_replica_index =
self.current_replica_index.fetch_add(1, Ordering::Relaxed) % self.replicas.len();
let (node, shard) = &self.replicas[new_replica_index];
Some((node, Some(*shard)))
}

// Fallback to the default policy.
fn fallback<'a>(
&'a self,
request: &'a RoutingInfo,
cluster: &'a ClusterState,
) -> FallbackPlan<'a> {
self.fallback.fallback(request, cluster)
}
}

/// The client itself and the keeping of the count of active connections.
#[derive(Clone)]
pub struct ScyllaDbStoreInternal {
Expand Down Expand Up @@ -588,6 +694,22 @@ pub enum ScyllaDbStoreInternalError {
/// A next row error in ScyllaDB
#[error(transparent)]
NextRowError(#[from] NextRowError),

/// A token error in ScyllaDB
#[error(transparent)]
ClusterStateTokenError(#[from] ClusterStateTokenError),

/// The token endpoint information is currently missing from the driver
#[error("The token endpoint information is currently missing from the driver")]
MissingTokenEndpoints(Token),

/// The mutex is poisoned
#[error("The mutex is poisoned")]
PoisonedMutex,

/// A metadata error in ScyllaDB
#[error(transparent)]
MetadataError(#[from] MetadataError),
}

impl KeyValueStoreError for ScyllaDbStoreInternalError {
Expand Down Expand Up @@ -699,6 +821,9 @@ impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
// https://github.com/scylladb/scylladb/blob/master/docs/dev/timestamp-conflict-resolution.md
type Batch = UnorderedBatch;

// Batches should be always to the same partition key. Batches across different partitions
// will not be atomic. If the caller wants atomicity, it's the caller's responsibility to
// make sure that the batch only has statements to the same partition key.
async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
let store = self.store.deref();
let _guard = self.acquire().await;
Expand Down
Loading