From 6106db7cbdef423608da11f617600c60456d66fa Mon Sep 17 00:00:00 2001 From: Andre da Silva Date: Mon, 2 Jun 2025 11:00:25 -0300 Subject: [PATCH] Optimize ScyllaDB usage --- Cargo.lock | 1 + examples/Cargo.lock | 1 + .../templates/scylla-config.yaml | 1 + linera-views/Cargo.toml | 1 + linera-views/src/backends/scylla_db.rs | 545 ++++++++++-------- 5 files changed, 319 insertions(+), 230 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4dd3b9ac4be..eba2619d335 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5696,6 +5696,7 @@ dependencies = [ "cfg_aliases", "convert_case", "criterion", + "dashmap 5.5.3", "derive_more 1.0.0", "futures", "generic-array", diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 66fb4f38420..c06434a034b 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -3938,6 +3938,7 @@ dependencies = [ "bcs", "cfg_aliases", "convert_case", + "dashmap 5.5.3", "derive_more 1.0.0", "futures", "generic-array", diff --git a/kubernetes/linera-validator/templates/scylla-config.yaml b/kubernetes/linera-validator/templates/scylla-config.yaml index 48bddc41501..4606b8ee2ef 100644 --- a/kubernetes/linera-validator/templates/scylla-config.yaml +++ b/kubernetes/linera-validator/templates/scylla-config.yaml @@ -6,3 +6,4 @@ metadata: data: scylla.yaml: | query_tombstone_page_limit: 200000 + commitlog_segment_size_in_mb: 256 diff --git a/linera-views/Cargo.toml b/linera-views/Cargo.toml index e5c7ced440e..16b83b72031 100644 --- a/linera-views/Cargo.toml +++ b/linera-views/Cargo.toml @@ -37,6 +37,7 @@ aws-sdk-dynamodb = { workspace = true, optional = true } aws-smithy-types = { workspace = true, optional = true } bcs.workspace = true convert_case.workspace = true +dashmap.workspace = true derive_more = { workspace = true, features = ["from"] } futures.workspace = true generic-array.workspace = true diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index cfa98e8f70f..8ff0451707c 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -8,25 +8,32 @@ //! `max_concurrent_queries`. use std::{ - collections::{hash_map::Entry, BTreeSet, HashMap}, + collections::{BTreeSet, HashMap}, ops::Deref, sync::Arc, }; use async_lock::{Semaphore, SemaphoreGuard}; +use dashmap::{mapref::entry::Entry, DashMap}; use futures::{future::join_all, FutureExt as _, StreamExt}; use linera_base::ensure; use scylla::{ - client::{session::Session, session_builder::SessionBuilder}, + client::{ + execution_profile::{ExecutionProfile, ExecutionProfileHandle}, + session::Session, + session_builder::SessionBuilder, + }, deserialize::{DeserializationError, TypeCheckError}, errors::{ DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError, PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError, }, - statement::{ - batch::{BatchStatement, BatchType}, - prepared::PreparedStatement, + policies::{ + load_balancing::{DefaultPolicy, LoadBalancingPolicy}, + retry::DefaultRetryPolicy, }, + response::PagingState, + statement::{batch::BatchType, prepared::PreparedStatement, Consistency}, }; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -49,17 +56,17 @@ use crate::{ /// Fundamental constant in ScyllaDB: The maximum size of a multi keys query /// The limit is in reality 100. But we need one entry for the root key. -const MAX_MULTI_KEYS: usize = 99; +const MAX_MULTI_KEYS: usize = 100 - 1; -/// The maximal size of an operation on ScyllaDB seems to be 16 MB +/// The maximal size of an operation on ScyllaDB seems to be 16 MiB /// https://www.scylladb.com/2019/03/27/best-practices-for-scylla-applications/ -/// "There is a hard limit at 16 MB, and nothing bigger than that can arrive at once +/// "There is a hard limit at 16 MiB, and nothing bigger than that can arrive at once /// at the database at any particular time" -/// So, we set up the maximal size of 16 MB - 10 KB for the values and 10 KB for the keys +/// So, we set up the maximal size of 16 MiB - 10 KiB for the values and 10 KiB for the keys /// We also arbitrarily decrease the size by 4000 bytes because an amount of size is /// taken internally by the database. -const RAW_MAX_VALUE_SIZE: usize = 16762976; -const MAX_KEY_SIZE: usize = 10240; +const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000; +const MAX_KEY_SIZE: usize = 10 * 1024; const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE; /// The `RAW_MAX_VALUE_SIZE` is the maximum size on the ScyllaDB storage. @@ -78,9 +85,7 @@ const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE - MAX_KEY_SIZE - get_uleb128_size(RAW_MAX_VALUE_SIZE) - get_uleb128_size(MAX_KEY_SIZE) - - 1 - - 1 - - 1; + - 3; /// The constant 14000 is an empirical constant that was found to be necessary /// to make the ScyllaDB system work. We have not been able to find this or @@ -89,6 +94,9 @@ const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE /// correct. const MAX_BATCH_SIZE: usize = 5000; +/// The keyspace to use for the ScyllaDB database. +const KEYSPACE: &str = "kv"; + /// The client for ScyllaDB: /// * The session allows to pass queries /// * The namespace that is being assigned to the database @@ -98,91 +106,90 @@ struct ScyllaDbClient { namespace: String, read_value: PreparedStatement, contains_key: PreparedStatement, - write_batch_delete_prefix_unbounded: BatchStatement, - write_batch_delete_prefix_bounded: BatchStatement, - write_batch_deletion: BatchStatement, - write_batch_insertion: BatchStatement, + write_batch_delete_prefix_unbounded: PreparedStatement, + write_batch_delete_prefix_bounded: PreparedStatement, + write_batch_deletion: PreparedStatement, + write_batch_insertion: PreparedStatement, find_keys_by_prefix_unbounded: PreparedStatement, find_keys_by_prefix_bounded: PreparedStatement, find_key_values_by_prefix_unbounded: PreparedStatement, find_key_values_by_prefix_bounded: PreparedStatement, + multi_key_values: DashMap, + multi_keys: DashMap, } impl ScyllaDbClient { - fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> { - ensure!( - key.len() <= MAX_KEY_SIZE, - ScyllaDbStoreInternalError::KeyTooLong - ); - Ok(()) - } + async fn new(session: Session, namespace: &str) -> Result { + let namespace = namespace.to_string(); + let read_value = session + .prepare(format!( + "SELECT v FROM {}.{} WHERE root_key = ? AND k = ?", + KEYSPACE, namespace + )) + .await?; - fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> { - ensure!( - value.len() <= RAW_MAX_VALUE_SIZE, - ScyllaDbStoreInternalError::ValueTooLong - ); - Ok(()) - } + let contains_key = session + .prepare(format!( + "SELECT root_key FROM {}.{} WHERE root_key = ? AND k = ?", + KEYSPACE, namespace + )) + .await?; - fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> { - ensure!( - batch.len() <= MAX_BATCH_SIZE, - ScyllaDbStoreInternalError::BatchTooLong - ); - Ok(()) - } + let write_batch_delete_prefix_unbounded = session + .prepare(format!( + "DELETE FROM {}.{} WHERE root_key = ? AND k >= ?", + KEYSPACE, namespace + )) + .await?; - async fn new(session: Session, namespace: &str) -> Result { - let namespace = namespace.to_string(); - let query = format!( - "SELECT v FROM kv.{} WHERE root_key = ? AND k = ? ALLOW FILTERING", - namespace - ); - let read_value = session.prepare(query).await?; + let write_batch_delete_prefix_bounded = session + .prepare(format!( + "DELETE FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?", + KEYSPACE, namespace + )) + .await?; - let query = format!( - "SELECT root_key FROM kv.{} WHERE root_key = ? AND k = ? ALLOW FILTERING", - namespace - ); - let contains_key = session.prepare(query).await?; + let write_batch_deletion = session + .prepare(format!( + "DELETE FROM {}.{} WHERE root_key = ? AND k = ?", + KEYSPACE, namespace + )) + .await?; - let query = format!("DELETE FROM kv.{} WHERE root_key = ? AND k >= ?", namespace); - let write_batch_delete_prefix_unbounded = session.prepare(query).await?.into(); - let query = format!( - "DELETE FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?", - namespace - ); - let write_batch_delete_prefix_bounded = session.prepare(query).await?.into(); - let query = format!("DELETE FROM kv.{} WHERE root_key = ? AND k = ?", namespace); - let write_batch_deletion = session.prepare(query).await?.into(); - let query = format!( - "INSERT INTO kv.{} (root_key, k, v) VALUES (?, ?, ?)", - namespace - ); - let write_batch_insertion = session.prepare(query).await?.into(); + let write_batch_insertion = session + .prepare(format!( + "INSERT INTO {}.{} (root_key, k, v) VALUES (?, ?, ?)", + KEYSPACE, namespace + )) + .await?; - let query = format!( - "SELECT k FROM kv.{} WHERE root_key = ? AND k >= ? ALLOW FILTERING", - namespace - ); - let find_keys_by_prefix_unbounded = session.prepare(query).await?; - let query = format!( - "SELECT k FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ? ALLOW FILTERING", - namespace - ); - let find_keys_by_prefix_bounded = session.prepare(query).await?; + let find_keys_by_prefix_unbounded = session + .prepare(format!( + "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ?", + KEYSPACE, namespace + )) + .await?; - let query = format!( - "SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ? ALLOW FILTERING", - namespace - ); - let find_key_values_by_prefix_unbounded = session.prepare(query).await?; - let query = format!( - "SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ? ALLOW FILTERING", - namespace - ); - let find_key_values_by_prefix_bounded = session.prepare(query).await?; + let find_keys_by_prefix_bounded = session + .prepare(format!( + "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?", + KEYSPACE, namespace + )) + .await?; + + let find_key_values_by_prefix_unbounded = session + .prepare(format!( + "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ?", + KEYSPACE, namespace + )) + .await?; + + let find_key_values_by_prefix_bounded = session + .prepare(format!( + "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?", + KEYSPACE, namespace + )) + .await?; Ok(Self { session, @@ -197,9 +204,112 @@ impl ScyllaDbClient { find_keys_by_prefix_bounded, find_key_values_by_prefix_unbounded, find_key_values_by_prefix_bounded, + multi_key_values: DashMap::new(), + multi_keys: DashMap::new(), }) } + fn build_default_policy() -> Arc { + DefaultPolicy::builder().token_aware(true).build() + } + + fn build_default_execution_profile_handle( + policy: Arc, + ) -> ExecutionProfileHandle { + let default_profile = ExecutionProfile::builder() + .load_balancing_policy(policy) + .retry_policy(Arc::new(DefaultRetryPolicy::new())) + .consistency(Consistency::LocalQuorum) + .build(); + default_profile.into_handle() + } + + async fn build_default_session(uri: &str) -> Result { + // This explicitly sets a lot of default parameters for clarity and for making future changes + // easier. + SessionBuilder::new() + .known_node(uri) + .default_execution_profile_handle(Self::build_default_execution_profile_handle( + Self::build_default_policy(), + )) + .build() + .boxed() + .await + .map_err(Into::into) + } + + async fn get_multi_key_values_statement( + &self, + num_markers: usize, + ) -> Result { + let entry = self.multi_key_values.entry(num_markers); + match entry { + Entry::Occupied(entry) => Ok(entry.get().clone()), + Entry::Vacant(entry) => { + let markers = std::iter::repeat_n("?", num_markers) + .collect::>() + .join(","); + let prepared_statement = self + .session + .prepare(format!( + "SELECT k,v FROM {}.{} WHERE root_key = ? AND k IN ({})", + KEYSPACE, self.namespace, markers + )) + .await?; + entry.insert(prepared_statement.clone()); + Ok(prepared_statement) + } + } + } + + async fn get_multi_keys_statement( + &self, + num_markers: usize, + ) -> Result { + let entry = self.multi_keys.entry(num_markers); + match entry { + Entry::Occupied(entry) => Ok(entry.get().clone()), + Entry::Vacant(entry) => { + let markers = std::iter::repeat_n("?", num_markers) + .collect::>() + .join(","); + let prepared_statement = self + .session + .prepare(format!( + "SELECT k FROM {}.{} WHERE root_key = ? AND k IN ({})", + KEYSPACE, self.namespace, markers + )) + .await?; + entry.insert(prepared_statement.clone()); + Ok(prepared_statement) + } + } + } + + fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> { + ensure!( + key.len() <= MAX_KEY_SIZE, + ScyllaDbStoreInternalError::KeyTooLong + ); + Ok(()) + } + + fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> { + ensure!( + value.len() <= RAW_MAX_VALUE_SIZE, + ScyllaDbStoreInternalError::ValueTooLong + ); + Ok(()) + } + + fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> { + ensure!( + batch.len() <= MAX_BATCH_SIZE, + ScyllaDbStoreInternalError::BatchTooLong + ); + Ok(()) + } + async fn read_value_internal( &self, root_key: &[u8], @@ -210,59 +320,48 @@ impl ScyllaDbClient { // Read the value of a key let values = (root_key.to_vec(), key); - let results = session - .execute_unpaged(&self.read_value, &values) - .await? - .into_rows_result()?; - let mut rows = results.rows::<(Vec,)>()?; + let (result, _) = session + .execute_single_page(&self.read_value, &values, PagingState::start()) + .await?; + let rows = result.into_rows_result()?; + let mut rows = rows.rows::<(Vec,)>()?; Ok(match rows.next() { Some(row) => Some(row?.0), None => None, }) } - async fn read_multi_values_internal( - &self, - root_key: &[u8], + fn get_occurrences_map( keys: Vec>, - ) -> Result>>, ScyllaDbStoreInternalError> { - let num_keys = keys.len(); - let session = &self.session; + ) -> Result, Vec>, ScyllaDbStoreInternalError> { let mut map = HashMap::, Vec>::new(); - let mut inputs = Vec::new(); - inputs.push(root_key.to_vec()); for (i_key, key) in keys.into_iter().enumerate() { Self::check_key_size(&key)?; - match map.entry(key.clone()) { - Entry::Occupied(entry) => { - let entry = entry.into_mut(); - entry.push(i_key); - } - Entry::Vacant(entry) => { - entry.insert(vec![i_key]); - inputs.push(key); - } - } + map.entry(key).or_default().push(i_key); } - let num_unique_keys = map.len(); - let mut group_query = "?".to_string(); - group_query.push_str(&",?".repeat(num_unique_keys - 1)); - let query = format!( - "SELECT k,v FROM kv.{} WHERE root_key = ? AND k IN ({}) ALLOW FILTERING", - self.namespace, group_query - ); + Ok(map) + } - let mut rows = session - .query_iter(&*query, &inputs) + async fn read_multi_values_internal( + &self, + root_key: &[u8], + keys: Vec>, + ) -> Result>>, ScyllaDbStoreInternalError> { + let mut values = vec![None; keys.len()]; + let map = Self::get_occurrences_map(keys)?; + let statement = self.get_multi_key_values_statement(map.len()).await?; + let mut inputs = vec![root_key.to_vec()]; + inputs.extend(map.keys().cloned()); + let mut rows = self + .session + .execute_iter(statement, &inputs) .await? .rows_stream::<(Vec, Vec)>()?; - let mut values = vec![None; num_keys]; while let Some(row) = rows.next().await { let (key, value) = row?; - for i_key in map.get(&key).unwrap().clone() { - let value = Some(value.clone()); - *values.get_mut(i_key).expect("an entry in values") = value; + for i_key in &map[&key] { + values[*i_key] = Some(value.clone()); } } Ok(values) @@ -273,44 +372,24 @@ impl ScyllaDbClient { root_key: &[u8], keys: Vec>, ) -> Result, ScyllaDbStoreInternalError> { - let num_keys = keys.len(); - let session = &self.session; - let mut map = HashMap::, Vec>::new(); - let mut inputs = Vec::new(); - inputs.push(root_key.to_vec()); - for (i_key, key) in keys.into_iter().enumerate() { - Self::check_key_size(&key)?; - match map.entry(key.clone()) { - Entry::Occupied(entry) => { - let entry = entry.into_mut(); - entry.push(i_key); - } - Entry::Vacant(entry) => { - entry.insert(vec![i_key]); - inputs.push(key); - } - } - } - let num_unique_keys = map.len(); - let mut group_query = "?".to_string(); - group_query.push_str(&",?".repeat(num_unique_keys - 1)); - let query = format!( - "SELECT k FROM kv.{} WHERE root_key = ? AND k IN ({}) ALLOW FILTERING", - self.namespace, group_query - ); - - let mut rows = session - .query_iter(&*query, &inputs) + let mut values = vec![false; keys.len()]; + let map = Self::get_occurrences_map(keys)?; + let statement = self.get_multi_keys_statement(map.len()).await?; + let mut inputs = vec![root_key.to_vec()]; + inputs.extend(map.keys().cloned()); + let mut rows = self + .session + .execute_iter(statement, &inputs) .await? .rows_stream::<(Vec,)>()?; - let mut values = vec![false; num_keys]; while let Some(row) = rows.next().await { let (key,) = row?; - for i_key in map.get(&key).unwrap().clone() { - *values.get_mut(i_key).expect("an entry in values") = true; + for i_key in &map[&key] { + values[*i_key] = true; } } + Ok(values) } @@ -324,11 +403,11 @@ impl ScyllaDbClient { // Read the value of a key let values = (root_key.to_vec(), key); - let results = session - .execute_unpaged(&self.contains_key, &values) - .await? - .into_rows_result()?; - let mut rows = results.rows::<(Vec,)>()?; + let (result, _) = session + .execute_single_page(&self.contains_key, &values, PagingState::start()) + .await?; + let rows = result.into_rows_result()?; + let mut rows = rows.rows::<(Vec,)>()?; Ok(rows.next().is_some()) } @@ -661,11 +740,7 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { namespace: &str, ) -> Result { Self::check_namespace(namespace)?; - let session = SessionBuilder::new() - .known_node(config.uri.as_str()) - .build() - .boxed() - .await?; + let session = ScyllaDbClient::build_default_session(&config.uri).await?; let store = ScyllaDbClient::new(session, namespace).await?; let store = Arc::new(store); let semaphore = config @@ -696,13 +771,12 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { } async fn list_all(config: &Self::Config) -> Result, ScyllaDbStoreInternalError> { - let session = SessionBuilder::new() - .known_node(config.uri.as_str()) - .build() - .boxed() + let session = ScyllaDbClient::build_default_session(&config.uri).await?; + let statement = session + .prepare(format!("DESCRIBE KEYSPACE {}", KEYSPACE)) .await?; - let result = session.query_iter("DESCRIBE KEYSPACE kv", &[]).await; - let miss_msg = "'kv' not found in keyspaces"; + let result = session.execute_iter(statement, &[]).await; + let miss_msg = format!("'{}' not found in keyspaces", KEYSPACE); let result = match result { Ok(result) => result, Err(error) => { @@ -735,15 +809,16 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { namespace: &str, ) -> Result>, ScyllaDbStoreInternalError> { Self::check_namespace(namespace)?; - let session = SessionBuilder::new() - .known_node(config.uri.as_str()) - .build() - .boxed() + let session = ScyllaDbClient::build_default_session(&config.uri).await?; + let statement = session + .prepare(format!( + "SELECT root_key FROM {}.{} ALLOW FILTERING", + KEYSPACE, namespace + )) .await?; - let query = format!("SELECT root_key FROM kv.{} ALLOW FILTERING", namespace); // Execute the query - let rows = session.query_iter(query, &[]).await?; + let rows = session.execute_iter(statement, &[]).await?; let mut rows = rows.rows_stream::<(Vec,)>()?; let mut root_keys = BTreeSet::new(); while let Some(row) = rows.next().await { @@ -755,16 +830,14 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { } async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> { - let session = SessionBuilder::new() - .known_node(store_config.uri.as_str()) - .build() - .boxed() + let session = ScyllaDbClient::build_default_session(&store_config.uri).await?; + let statement = session + .prepare(format!("DROP KEYSPACE IF EXISTS {}", KEYSPACE)) .await?; - let query = "DROP KEYSPACE IF EXISTS kv;"; - let prepared = session.prepare(query).await?; - - session.execute_unpaged(&prepared, &[]).await?; + session + .execute_single_page(&statement, &[], PagingState::start()) + .await?; Ok(()) } @@ -773,25 +846,21 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { namespace: &str, ) -> Result { Self::check_namespace(namespace)?; - let session = SessionBuilder::new() - .known_node(config.uri.as_str()) - .build() - .boxed() - .await?; - // We check the way the test can fail. It can fail in different ways. - let query = format!( - "SELECT root_key FROM kv.{} LIMIT 1 ALLOW FILTERING", - namespace - ); + let session = ScyllaDbClient::build_default_session(&config.uri).await?; - // Execute the query - let result = session.prepare(&*query).await; + // We check the way the test can fail. It can fail in different ways. + let result = session + .prepare(format!( + "SELECT root_key FROM {}.{} LIMIT 1 ALLOW FILTERING", + KEYSPACE, namespace + )) + .await; // The missing table translates into a very specific error that we matched let miss_msg1 = format!("unconfigured table {}", namespace); let miss_msg1 = miss_msg1.as_str(); let miss_msg2 = "Undefined name root_key in selection clause"; - let miss_msg3 = "Keyspace kv does not exist"; + let miss_msg3 = format!("Keyspace {} does not exist", KEYSPACE); let Err(error) = result else { // If OK, then the table exists return Ok(true); @@ -822,35 +891,53 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { namespace: &str, ) -> Result<(), ScyllaDbStoreInternalError> { Self::check_namespace(namespace)?; - let session = SessionBuilder::new() - .known_node(config.uri.as_str()) - .build() - .boxed() - .await?; - // Create a keyspace if it doesn't exist - let query = format!( - "CREATE KEYSPACE IF NOT EXISTS kv WITH REPLICATION = {{ \ - 'class' : 'NetworkTopologyStrategy', \ - 'replication_factor' : {} \ - }}", - config.common_config.replication_factor - ); + let session = ScyllaDbClient::build_default_session(&config.uri).await?; - // Execute the query - let prepared = session.prepare(query).await?; - session.execute_unpaged(&prepared, &[]).await?; - - // Create a table if it doesn't exist - // The schema appears too complicated for non-trivial reasons. - // See TODO(#1069). - let query = format!( - "CREATE TABLE kv.{} (root_key blob, k blob, v blob, primary key (root_key, k))", - namespace - ); + // Create a keyspace if it doesn't exist + let statement = session + .prepare(format!( + "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \ + 'class' : 'NetworkTopologyStrategy', \ + 'replication_factor' : {} \ + }}", + KEYSPACE, config.common_config.replication_factor + )) + .await?; + session + .execute_single_page(&statement, &[], PagingState::start()) + .await?; - // Execute the query - let prepared = session.prepare(&*query).await?; - session.execute_unpaged(&prepared, &[]).await?; + // This explicitly sets a lot of default parameters for clarity and for making future + // changes easier. + let statement = session + .prepare(format!( + "CREATE TABLE {}.{} (\ + root_key blob, \ + k blob, \ + v blob, \ + PRIMARY KEY (root_key, k) \ + ) \ + WITH compaction = {{ \ + 'class' : 'SizeTieredCompactionStrategy', \ + 'min_sstable_size' : 52428800, \ + 'bucket_low' : 0.5, \ + 'bucket_high' : 1.5, \ + 'min_threshold' : 4, \ + 'max_threshold' : 32 \ + }} \ + AND compression = {{ \ + 'sstable_compression': 'LZ4Compressor', \ + 'chunk_length_in_kb':'4' \ + }} \ + AND caching = {{ \ + 'enabled': 'true' \ + }}", + KEYSPACE, namespace + )) + .await?; + session + .execute_single_page(&statement, &[], PagingState::start()) + .await?; Ok(()) } @@ -859,15 +946,13 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { namespace: &str, ) -> Result<(), ScyllaDbStoreInternalError> { Self::check_namespace(namespace)?; - let session = SessionBuilder::new() - .known_node(config.uri.as_str()) - .build() - .boxed() + let session = ScyllaDbClient::build_default_session(&config.uri).await?; + let statement = session + .prepare(format!("DROP TABLE IF EXISTS {}.{};", KEYSPACE, namespace)) + .await?; + session + .execute_single_page(&statement, &[], PagingState::start()) .await?; - - let query = format!("DROP TABLE IF EXISTS kv.{};", namespace); - let prepared = session.prepare(&*query).await?; - let _result = session.execute_unpaged(&prepared, &[]).await?; Ok(()) } }