diff --git a/linera-indexer/lib/src/scylla_db.rs b/linera-indexer/lib/src/scylla_db.rs index c3a57463847..b894b63adb1 100644 --- a/linera-indexer/lib/src/scylla_db.rs +++ b/linera-indexer/lib/src/scylla_db.rs @@ -3,7 +3,9 @@ use linera_views::{ lru_caching::StorageCacheConfig, - scylla_db::{ScyllaDbStore, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig}, + scylla_db::{ + ScyllaDbClientConfig, ScyllaDbStore, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig, + }, store::AdminKeyValueStore, }; @@ -62,6 +64,7 @@ impl ScyllaDbRunner { max_stream_queries: config.client.max_stream_queries, max_concurrent_queries: config.client.max_concurrent_queries, replication_factor: config.client.replication_factor, + client_config: ScyllaDbClientConfig::default(), }; let store_config = ScyllaDbStoreConfig { inner_config, diff --git a/linera-service/src/storage.rs b/linera-service/src/storage.rs index d84b1e49a62..c8e59bb943b 100644 --- a/linera-service/src/storage.rs +++ b/linera-service/src/storage.rs @@ -34,7 +34,9 @@ use { }; #[cfg(feature = "scylladb")] use { - linera_views::scylla_db::{ScyllaDbStore, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig}, + linera_views::scylla_db::{ + ScyllaDbClientConfig, ScyllaDbStore, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig, + }, std::num::NonZeroU16, tracing::debug, }; @@ -486,6 +488,7 @@ impl StorageConfig { max_stream_queries: options.storage_max_stream_queries, max_concurrent_queries: options.storage_max_concurrent_queries, replication_factor: options.storage_replication_factor, + client_config: ScyllaDbClientConfig::default(), }; let config = ScyllaDbStoreConfig { inner_config, @@ -514,6 +517,7 @@ impl StorageConfig { max_stream_queries: options.storage_max_stream_queries, max_concurrent_queries: options.storage_max_concurrent_queries, replication_factor: options.storage_replication_factor, + client_config: ScyllaDbClientConfig::default(), }; let second_config = ScyllaDbStoreConfig { inner_config, diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 2edd4d2d593..42eb83ce6fb 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -19,11 +19,15 @@ use std::{ use async_lock::{Semaphore, SemaphoreGuard}; use dashmap::DashMap; -use futures::{future::join_all, StreamExt as _}; +use futures::{ + future::{join_all, try_join_all}, + StreamExt as _, +}; use linera_base::ensure; use scylla::{ client::{ execution_profile::{ExecutionProfile, ExecutionProfileHandle}, + pager::QueryPager, session::Session, session_builder::SessionBuilder, }, @@ -106,6 +110,25 @@ const MAX_BATCH_SIZE: usize = 5000; /// The keyspace to use for the ScyllaDB database. const KEYSPACE: &str = "kv"; +/// The configuration of the ScyllaDB client. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ScyllaDbClientConfig { + /// This is the length of the prefix of the key that we'll be using as partition key in + /// non-exclusive mode. In exclusive mode, the partition key is the root key. + pub cluster_key_prefix_length_bytes: usize, +} + +impl Default for ScyllaDbClientConfig { + fn default() -> Self { + Self { + cluster_key_prefix_length_bytes: 4, + } + } +} + +/// Map from partition_key to a map from keys to a list of their occurrences in the original vector. +type OccurrencesMap = HashMap, HashMap, Vec>>; + /// The client for ScyllaDB: /// * The session allows to pass queries /// * The namespace that is being assigned to the database @@ -113,6 +136,7 @@ const KEYSPACE: &str = "kv"; struct ScyllaDbClient { session: Session, namespace: String, + config: ScyllaDbClientConfig, read_value: PreparedStatement, contains_key: PreparedStatement, write_batch_delete_prefix_unbounded: PreparedStatement, @@ -120,82 +144,118 @@ struct ScyllaDbClient { write_batch_deletion: PreparedStatement, write_batch_insertion: PreparedStatement, find_keys_by_prefix_unbounded: PreparedStatement, + find_keys_by_prefix_unbounded_full_scan: PreparedStatement, find_keys_by_prefix_bounded: PreparedStatement, + find_keys_by_prefix_bounded_full_scan: PreparedStatement, find_key_values_by_prefix_unbounded: PreparedStatement, + find_key_values_by_prefix_unbounded_full_scan: PreparedStatement, find_key_values_by_prefix_bounded: PreparedStatement, + find_key_values_by_prefix_bounded_full_scan: PreparedStatement, multi_key_values: DashMap, multi_keys: DashMap, } impl ScyllaDbClient { - async fn new(session: Session, namespace: &str) -> Result { + async fn new( + session: Session, + namespace: &str, + config: ScyllaDbClientConfig, + ) -> Result { let namespace = namespace.to_string(); let read_value = session .prepare(format!( - "SELECT v FROM {}.{} WHERE root_key = ? AND k = ?", + "SELECT value FROM {}.{} WHERE partition_key = ? AND cluster_key = ?", KEYSPACE, namespace )) .await?; let contains_key = session .prepare(format!( - "SELECT root_key FROM {}.{} WHERE root_key = ? AND k = ?", + "SELECT partition_key FROM {}.{} WHERE partition_key = ? AND cluster_key = ?", KEYSPACE, namespace )) .await?; let write_batch_delete_prefix_unbounded = session .prepare(format!( - "DELETE FROM {}.{} WHERE root_key = ? AND k >= ?", + "DELETE FROM {}.{} WHERE partition_key = ? AND cluster_key >= ?", KEYSPACE, namespace )) .await?; let write_batch_delete_prefix_bounded = session .prepare(format!( - "DELETE FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?", + "DELETE FROM {}.{} WHERE partition_key = ? AND cluster_key >= ? AND cluster_key < ?", KEYSPACE, namespace )) .await?; let write_batch_deletion = session .prepare(format!( - "DELETE FROM {}.{} WHERE root_key = ? AND k = ?", + "DELETE FROM {}.{} WHERE partition_key = ? AND cluster_key = ?", KEYSPACE, namespace )) .await?; let write_batch_insertion = session .prepare(format!( - "INSERT INTO {}.{} (root_key, k, v) VALUES (?, ?, ?)", + "INSERT INTO {}.{} (partition_key, cluster_key, value) VALUES (?, ?, ?)", KEYSPACE, namespace )) .await?; let find_keys_by_prefix_unbounded = session .prepare(format!( - "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ?", + "SELECT cluster_key FROM {}.{} WHERE partition_key = ? AND cluster_key >= ?", + KEYSPACE, namespace + )) + .await?; + + let find_keys_by_prefix_unbounded_full_scan = session + .prepare(format!( + "SELECT cluster_key FROM {}.{} WHERE cluster_key >= ? ALLOW FILTERING", KEYSPACE, namespace )) .await?; let find_keys_by_prefix_bounded = session .prepare(format!( - "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?", + "SELECT cluster_key FROM {}.{} WHERE partition_key = ? AND cluster_key >= ? AND cluster_key < ?", + KEYSPACE, namespace + )) + .await?; + + let find_keys_by_prefix_bounded_full_scan = session + .prepare(format!( + "SELECT cluster_key FROM {}.{} WHERE cluster_key >= ? AND cluster_key < ? ALLOW FILTERING", KEYSPACE, namespace )) .await?; let find_key_values_by_prefix_unbounded = session .prepare(format!( - "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ?", + "SELECT cluster_key, value FROM {}.{} WHERE partition_key = ? AND cluster_key >= ?", + KEYSPACE, namespace + )) + .await?; + + let find_key_values_by_prefix_unbounded_full_scan = session + .prepare(format!( + "SELECT cluster_key, value FROM {}.{} WHERE cluster_key >= ? ALLOW FILTERING", KEYSPACE, namespace )) .await?; let find_key_values_by_prefix_bounded = session .prepare(format!( - "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?", + "SELECT cluster_key, value FROM {}.{} WHERE partition_key = ? AND cluster_key >= ? AND cluster_key < ?", + KEYSPACE, namespace + )) + .await?; + + let find_key_values_by_prefix_bounded_full_scan = session + .prepare(format!( + "SELECT cluster_key, value FROM {}.{} WHERE cluster_key >= ? AND cluster_key < ? ALLOW FILTERING", KEYSPACE, namespace )) .await?; @@ -203,6 +263,7 @@ impl ScyllaDbClient { Ok(Self { session, namespace, + config, read_value, contains_key, write_batch_delete_prefix_unbounded, @@ -210,9 +271,13 @@ impl ScyllaDbClient { write_batch_deletion, write_batch_insertion, find_keys_by_prefix_unbounded, + find_keys_by_prefix_unbounded_full_scan, find_keys_by_prefix_bounded, + find_keys_by_prefix_bounded_full_scan, find_key_values_by_prefix_unbounded, + find_key_values_by_prefix_unbounded_full_scan, find_key_values_by_prefix_bounded, + find_key_values_by_prefix_bounded_full_scan, multi_key_values: DashMap::new(), multi_keys: DashMap::new(), }) @@ -263,7 +328,7 @@ impl ScyllaDbClient { let prepared_statement = self .session .prepare(format!( - "SELECT k,v FROM {}.{} WHERE root_key = ? AND k IN ({})", + "SELECT cluster_key, value FROM {}.{} WHERE partition_key = ? AND cluster_key IN ({})", KEYSPACE, self.namespace, markers )) .await?; @@ -286,7 +351,7 @@ impl ScyllaDbClient { let prepared_statement = self .session .prepare(format!( - "SELECT k FROM {}.{} WHERE root_key = ? AND k IN ({})", + "SELECT cluster_key FROM {}.{} WHERE partition_key = ? AND cluster_key IN ({})", KEYSPACE, self.namespace, markers )) .await?; @@ -319,15 +384,64 @@ impl ScyllaDbClient { Ok(()) } + fn check_batch_partition_key( + &self, + partition_key_prefix: &[u8], + key: &[u8], + batch_partition_key: &mut Option>, + ) -> Result<(), ScyllaDbStoreInternalError> { + let partition_key = self.get_partition_key(partition_key_prefix, key)?; + if let Some(batch_partition_key) = batch_partition_key { + ensure!( + *batch_partition_key == partition_key, + ScyllaDbStoreInternalError::MultiplePartitionKeysInBatch + ); + } else { + *batch_partition_key = Some(partition_key.to_vec()); + } + Ok(()) + } + + fn check_batch_and_partition_keys( + &self, + partition_key_prefix: &[u8], + exclusive_mode: bool, + batch: &UnorderedBatch, + ) -> Result, ScyllaDbStoreInternalError> { + if !exclusive_mode { + ensure!( + batch.key_prefix_deletions.is_empty(), + ScyllaDbStoreInternalError::PrefixDeletionsNotAllowedInNonExclusiveMode + ); + } + + let mut batch_partition_key = None; + for key_prefix in &batch.key_prefix_deletions { + self.check_batch_partition_key( + partition_key_prefix, + key_prefix, + &mut batch_partition_key, + )?; + } + for key in &batch.simple_unordered_batch.deletions { + self.check_batch_partition_key(partition_key_prefix, key, &mut batch_partition_key)?; + } + for (key, _) in &batch.simple_unordered_batch.insertions { + self.check_batch_partition_key(partition_key_prefix, key, &mut batch_partition_key)?; + } + + batch_partition_key.ok_or(ScyllaDbStoreInternalError::NoPartitionKeyInBatch) + } + async fn read_value_internal( &self, - root_key: &[u8], + partition_key_prefix: &[u8], key: Vec, ) -> Result>, ScyllaDbStoreInternalError> { Self::check_key_size(&key)?; let session = &self.session; // Read the value of a key - let values = (root_key.to_vec(), key); + let values = (self.get_partition_key(partition_key_prefix, &key)?, key); let (result, _) = session .execute_single_page(&self.read_value, &values, PagingState::start()) @@ -340,63 +454,160 @@ impl ScyllaDbClient { }) } + fn get_partition_key( + &self, + partition_key_prefix: &[u8], + key: &[u8], + ) -> Result, ScyllaDbStoreInternalError> { + match partition_key_prefix[0] { + 0 => Ok(partition_key_prefix.to_vec()), + 1 => { + let mut partition_key = partition_key_prefix.to_vec(); + let range_end = key.len().min(self.config.cluster_key_prefix_length_bytes); + let mut buf = vec![0; self.config.cluster_key_prefix_length_bytes]; + // Make sure we always return cluster_key_prefix_length_bytes bytes. + buf[..range_end].copy_from_slice(&key[..range_end]); + partition_key.extend(buf); + Ok(partition_key) + } + _ => Err(ScyllaDbStoreInternalError::InvalidPartitionKeyPrefix), + } + } + fn get_occurrences_map( + &self, + partition_key_prefix: &[u8], keys: Vec>, - ) -> Result, Vec>, ScyllaDbStoreInternalError> { - let mut map = HashMap::, Vec>::new(); + ) -> Result { + let mut occurrences_map: OccurrencesMap = HashMap::new(); for (i_key, key) in keys.into_iter().enumerate() { Self::check_key_size(&key)?; - map.entry(key).or_default().push(i_key); + let partition_key = self.get_partition_key(partition_key_prefix, &key)?; + + occurrences_map + .entry(partition_key) + .or_default() + .entry(key) + .or_default() + .push(i_key); } - Ok(map) + Ok(occurrences_map) } async fn read_multi_values_internal( &self, - root_key: &[u8], + partition_key_prefix: &[u8], + exclusive_mode: bool, keys: Vec>, ) -> Result>>, ScyllaDbStoreInternalError> { let mut values = vec![None; keys.len()]; - let map = Self::get_occurrences_map(keys)?; - let statement = self.get_multi_key_values_statement(map.len()).await?; - let mut inputs = vec![root_key.to_vec()]; - inputs.extend(map.keys().cloned()); - let mut rows = self - .session - .execute_iter(statement, &inputs) - .await? - .rows_stream::<(Vec, Vec)>()?; + let occurrences_map = self.get_occurrences_map(partition_key_prefix, keys)?; + if exclusive_mode { + ensure!( + occurrences_map.len() == 1, + ScyllaDbStoreInternalError::MultiplePartitionKeysInExclusiveMode + ); + } + let statements = occurrences_map + .iter() + .map(|(partition_key, keys_map)| async { + let statement = self.get_multi_key_values_statement(keys_map.len()).await?; + let mut inputs = vec![partition_key.clone()]; + inputs.extend(keys_map.keys().cloned()); + Ok::<_, ScyllaDbStoreInternalError>((partition_key.clone(), statement, inputs)) + }) + .collect::>(); + let statements = try_join_all(statements).await?; + + let mut futures = Vec::new(); + let map_ref = &occurrences_map; + for (partition_key, statement, inputs) in statements { + futures.push(async move { + let mut rows = self + .session + .execute_iter(statement, &inputs) + .await? + .rows_stream::<(Vec, Vec)>()?; + let mut value_pairs = Vec::new(); + while let Some(row) = rows.next().await { + let (key, value) = row?; + for i_key in map_ref + .get(&partition_key) + .expect("partition_key is supposed to be in map") + .get(&key) + .expect("key is supposed to be in map") + { + value_pairs.push((*i_key, value.clone())); + } + } - while let Some(row) = rows.next().await { - let (key, value) = row?; - for i_key in &map[&key] { - values[*i_key] = Some(value.clone()); - } + Ok::<_, ScyllaDbStoreInternalError>(value_pairs) + }); + } + + let values_pairs = try_join_all(futures).await?; + for (i_key, value) in values_pairs.iter().flatten() { + values[*i_key] = Some(value.clone()); } + Ok(values) } async fn contains_keys_internal( &self, - root_key: &[u8], + partition_key_prefix: &[u8], + exclusive_mode: bool, keys: Vec>, ) -> Result, ScyllaDbStoreInternalError> { let mut values = vec![false; keys.len()]; - let map = Self::get_occurrences_map(keys)?; - let statement = self.get_multi_keys_statement(map.len()).await?; - let mut inputs = vec![root_key.to_vec()]; - inputs.extend(map.keys().cloned()); - let mut rows = self - .session - .execute_iter(statement, &inputs) - .await? - .rows_stream::<(Vec,)>()?; + let occurrences_map = self.get_occurrences_map(partition_key_prefix, keys)?; + if exclusive_mode { + ensure!( + occurrences_map.len() == 1, + ScyllaDbStoreInternalError::MultiplePartitionKeysInExclusiveMode + ); + } - while let Some(row) = rows.next().await { - let (key,) = row?; - for i_key in &map[&key] { - values[*i_key] = true; - } + let statements = occurrences_map + .iter() + .map(|(partition_key, keys_map)| async { + let statement = self.get_multi_keys_statement(keys_map.len()).await?; + let mut inputs = vec![partition_key.clone()]; + inputs.extend(keys_map.keys().cloned()); + Ok::<_, ScyllaDbStoreInternalError>((partition_key.clone(), statement, inputs)) + }) + .collect::>(); + let statements = try_join_all(statements).await?; + + let mut futures = Vec::new(); + let map_ref = &occurrences_map; + for (partition_key, statement, inputs) in statements { + futures.push(async move { + let mut rows = self + .session + .execute_iter(statement, &inputs) + .await? + .rows_stream::<(Vec,)>()?; + let mut keys = Vec::new(); + while let Some(row) = rows.next().await { + let (key,) = row?; + for i_key in map_ref + .get(&partition_key) + .expect("partition_key is supposed to be in map") + .get(&key) + .expect("key is supposed to be in map") + { + keys.push(*i_key); + } + } + + Ok::<_, ScyllaDbStoreInternalError>(keys) + }); + } + let keys = try_join_all(futures).await?; + + for i_key in keys.iter().flatten() { + values[*i_key] = true; } Ok(values) @@ -404,13 +615,13 @@ impl ScyllaDbClient { async fn contains_key_internal( &self, - root_key: &[u8], + partition_key_prefix: &[u8], key: Vec, ) -> Result { Self::check_key_size(&key)?; let session = &self.session; // Read the value of a key - let values = (root_key.to_vec(), key); + let values = (self.get_partition_key(partition_key_prefix, &key)?, key); let (result, _) = session .execute_single_page(&self.contains_key, &values, PagingState::start()) @@ -453,110 +664,208 @@ impl ScyllaDbClient { } // Batches should be always to the same partition key. Batches across different partitions - // will not be atomic. If the caller wants atomicity, it's the caller's responsibility to - // make sure that the batch only has statements to the same partition key. + // will return an error. async fn write_batch_internal( &self, - root_key: &[u8], + partition_key_prefix: &[u8], + exclusive_mode: bool, batch: UnorderedBatch, ) -> Result<(), ScyllaDbStoreInternalError> { + if batch.is_empty() { + return Ok(()); + } + Self::check_batch_len(&batch)?; + let partition_key = + self.check_batch_and_partition_keys(partition_key_prefix, exclusive_mode, &batch)?; let session = &self.session; - let mut batch_query = self.get_sticky_batch_query(root_key)?; - let mut batch_values = Vec::with_capacity(batch.len()); + let mut batch_query = self.get_sticky_batch_query(&partition_key)?; + let mut batch_values: Vec>> = Vec::new(); for key_prefix in batch.key_prefix_deletions { + // We'll be always on exclusive mode here, which check_batch_and_partition_keys + // guarantees. Self::check_key_size(&key_prefix)?; match get_upper_bound_option(&key_prefix) { None => { batch_query.append_statement(self.write_batch_delete_prefix_unbounded.clone()); - batch_values.push(vec![root_key.to_vec(), key_prefix]); + batch_values.push(vec![partition_key.clone(), key_prefix]); } Some(upper_bound) => { batch_query.append_statement(self.write_batch_delete_prefix_bounded.clone()); - batch_values.push(vec![root_key.to_vec(), key_prefix, upper_bound]); + batch_values.push(vec![partition_key.clone(), key_prefix, upper_bound]); } } } + for key in batch.simple_unordered_batch.deletions { Self::check_key_size(&key)?; batch_query.append_statement(self.write_batch_deletion.clone()); - batch_values.push(vec![root_key.to_vec(), key]); + batch_values.push(vec![partition_key.clone(), key]); } for (key, value) in batch.simple_unordered_batch.insertions { Self::check_key_size(&key)?; Self::check_value_size(&value)?; batch_query.append_statement(self.write_batch_insertion.clone()); - batch_values.push(vec![root_key.to_vec(), key, value]); + batch_values.push(vec![partition_key.clone(), key, value]); } + session.batch(&batch_query, batch_values).await?; Ok(()) } + async fn get_one_column_result_from_query_pager( + &self, + query_pager: QueryPager, + len: usize, + ) -> Result>, ScyllaDbStoreInternalError> { + let mut rows = query_pager.rows_stream::<(Vec,)>()?; + let mut keys = Vec::new(); + while let Some(row) = rows.next().await { + let (key,) = row?; + let short_key = key[len..].to_vec(); + keys.push(short_key); + } + + Ok(keys) + } + + async fn get_two_columns_result_from_query_pager( + &self, + rows: QueryPager, + len: usize, + ) -> Result, Vec)>, ScyllaDbStoreInternalError> { + let mut rows = rows.rows_stream::<(Vec, Vec)>()?; + let mut key_values = Vec::new(); + while let Some(row) = rows.next().await { + let (key, value) = row?; + let short_key = key[len..].to_vec(); + key_values.push((short_key, value)); + } + + Ok(key_values) + } + + fn key_prefix_len_ge_partition_key_len(&self, key_prefix: &[u8]) -> bool { + key_prefix.len() >= self.config.cluster_key_prefix_length_bytes + } + async fn find_keys_by_prefix_internal( &self, - root_key: &[u8], + partition_key_prefix: &[u8], + exclusive_mode: bool, key_prefix: Vec, ) -> Result>, ScyllaDbStoreInternalError> { Self::check_key_size(&key_prefix)?; let session = &self.session; // Read the value of a key let len = key_prefix.len(); - let query_unbounded = &self.find_keys_by_prefix_unbounded; - let query_bounded = &self.find_keys_by_prefix_bounded; - let rows = match get_upper_bound_option(&key_prefix) { + match get_upper_bound_option(&key_prefix) { None => { - let values = (root_key.to_vec(), key_prefix.clone()); - session - .execute_iter(query_unbounded.clone(), values) - .await? + let query_pager = + if exclusive_mode || self.key_prefix_len_ge_partition_key_len(&key_prefix) { + session + .execute_iter( + self.find_keys_by_prefix_unbounded.clone(), + vec![ + self.get_partition_key(partition_key_prefix, &key_prefix)?, + key_prefix, + ], + ) + .await? + } else { + session + .execute_iter( + self.find_keys_by_prefix_unbounded_full_scan.clone(), + vec![key_prefix], + ) + .await? + }; + + self.get_one_column_result_from_query_pager(query_pager, len) + .await } Some(upper_bound) => { - let values = (root_key.to_vec(), key_prefix.clone(), upper_bound); - session.execute_iter(query_bounded.clone(), values).await? + let query_pager = + if exclusive_mode || self.key_prefix_len_ge_partition_key_len(&key_prefix) { + session + .execute_iter( + self.find_keys_by_prefix_bounded.clone(), + vec![ + self.get_partition_key(partition_key_prefix, &key_prefix)?, + key_prefix, + upper_bound, + ], + ) + .await? + } else { + session + .execute_iter( + self.find_keys_by_prefix_bounded_full_scan.clone(), + vec![key_prefix, upper_bound], + ) + .await? + }; + + self.get_one_column_result_from_query_pager(query_pager, len) + .await } - }; - let mut rows = rows.rows_stream::<(Vec,)>()?; - let mut keys = Vec::new(); - while let Some(row) = rows.next().await { - let (key,) = row?; - let short_key = key[len..].to_vec(); - keys.push(short_key); } - Ok(keys) } async fn find_key_values_by_prefix_internal( &self, - root_key: &[u8], + partition_key_prefix: &[u8], + exclusive_mode: bool, key_prefix: Vec, ) -> Result, Vec)>, ScyllaDbStoreInternalError> { Self::check_key_size(&key_prefix)?; let session = &self.session; - // Read the value of a key let len = key_prefix.len(); - let query_unbounded = &self.find_key_values_by_prefix_unbounded; - let query_bounded = &self.find_key_values_by_prefix_bounded; - let rows = match get_upper_bound_option(&key_prefix) { + match get_upper_bound_option(&key_prefix) { None => { - let values = (root_key.to_vec(), key_prefix.clone()); - session - .execute_iter(query_unbounded.clone(), values) - .await? + let partition_key = self.get_partition_key(partition_key_prefix, &key_prefix)?; + let query_pager = + if exclusive_mode || self.key_prefix_len_ge_partition_key_len(&key_prefix) { + session + .execute_iter( + self.find_key_values_by_prefix_unbounded.clone(), + vec![partition_key, key_prefix], + ) + .await? + } else { + session + .execute_iter( + self.find_key_values_by_prefix_unbounded_full_scan.clone(), + vec![key_prefix], + ) + .await? + }; + self.get_two_columns_result_from_query_pager(query_pager, len) + .await } Some(upper_bound) => { - let values = (root_key.to_vec(), key_prefix.clone(), upper_bound); - session.execute_iter(query_bounded.clone(), values).await? + let partition_key = self.get_partition_key(partition_key_prefix, &key_prefix)?; + let query_pager = + if exclusive_mode || self.key_prefix_len_ge_partition_key_len(&key_prefix) { + session + .execute_iter( + self.find_key_values_by_prefix_bounded.clone(), + vec![partition_key, key_prefix, upper_bound], + ) + .await? + } else { + session + .execute_iter( + self.find_key_values_by_prefix_bounded_full_scan.clone(), + vec![key_prefix, upper_bound], + ) + .await? + }; + self.get_two_columns_result_from_query_pager(query_pager, len) + .await } - }; - let mut rows = rows.rows_stream::<(Vec, Vec)>()?; - let mut key_values = Vec::new(); - while let Some(row) = rows.next().await { - let (key, value) = row?; - let short_key = key[len..].to_vec(); - key_values.push((short_key, value)); } - Ok(key_values) } } @@ -629,7 +938,8 @@ pub struct ScyllaDbStoreInternal { store: Arc, semaphore: Option>, max_stream_queries: usize, - root_key: Vec, + partition_key_prefix: Vec, + exclusive_mode: bool, } /// The error type for [`ScyllaDbStoreInternal`] @@ -710,6 +1020,26 @@ pub enum ScyllaDbStoreInternalError { /// A metadata error in ScyllaDB #[error(transparent)] MetadataError(#[from] MetadataError), + + /// The partition key prefix is invalid + #[error("The partition key prefix is invalid")] + InvalidPartitionKeyPrefix, + + /// The batch contains multiple partition keys + #[error("Multiple partition keys in batch is not allowed")] + MultiplePartitionKeysInBatch, + + /// The batch contains no partition key + #[error("The batch contains no partition key. Every batch must contain a partition key")] + NoPartitionKeyInBatch, + + /// Prefix deletions are not allowed in non-exclusive mode + #[error("Prefix deletions are not allowed in non-exclusive mode")] + PrefixDeletionsNotAllowedInNonExclusiveMode, + + /// Multiple partition keys in a query are not allowed in exclusive mode + #[error("Multiple partition keys in a query are not allowed in exclusive mode")] + MultiplePartitionKeysInExclusiveMode, } impl KeyValueStoreError for ScyllaDbStoreInternalError { @@ -736,7 +1066,7 @@ impl ReadableKeyValueStore for ScyllaDbStoreInternal { let store = self.store.deref(); let _guard = self.acquire().await; store - .read_value_internal(&self.root_key, key.to_vec()) + .read_value_internal(&self.partition_key_prefix, key.to_vec()) .await } @@ -744,7 +1074,7 @@ impl ReadableKeyValueStore for ScyllaDbStoreInternal { let store = self.store.deref(); let _guard = self.acquire().await; store - .contains_key_internal(&self.root_key, key.to_vec()) + .contains_key_internal(&self.partition_key_prefix, key.to_vec()) .await } @@ -757,9 +1087,13 @@ impl ReadableKeyValueStore for ScyllaDbStoreInternal { } let store = self.store.deref(); let _guard = self.acquire().await; - let handles = keys - .chunks(MAX_MULTI_KEYS) - .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec())); + let handles = keys.chunks(MAX_MULTI_KEYS).map(|keys| { + store.contains_keys_internal( + &self.partition_key_prefix, + self.exclusive_mode, + keys.to_vec(), + ) + }); let results: Vec<_> = join_all(handles) .await .into_iter() @@ -776,9 +1110,13 @@ impl ReadableKeyValueStore for ScyllaDbStoreInternal { } let store = self.store.deref(); let _guard = self.acquire().await; - let handles = keys - .chunks(MAX_MULTI_KEYS) - .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec())); + let handles = keys.chunks(MAX_MULTI_KEYS).map(|keys| { + store.read_multi_values_internal( + &self.partition_key_prefix, + self.exclusive_mode, + keys.to_vec(), + ) + }); let results: Vec<_> = join_all(handles) .await .into_iter() @@ -793,7 +1131,11 @@ impl ReadableKeyValueStore for ScyllaDbStoreInternal { let store = self.store.deref(); let _guard = self.acquire().await; store - .find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec()) + .find_keys_by_prefix_internal( + &self.partition_key_prefix, + self.exclusive_mode, + key_prefix.to_vec(), + ) .await } @@ -804,7 +1146,11 @@ impl ReadableKeyValueStore for ScyllaDbStoreInternal { let store = self.store.deref(); let _guard = self.acquire().await; store - .find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec()) + .find_key_values_by_prefix_internal( + &self.partition_key_prefix, + self.exclusive_mode, + key_prefix.to_vec(), + ) .await } } @@ -827,15 +1173,23 @@ impl DirectWritableKeyValueStore for ScyllaDbStoreInternal { async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> { let store = self.store.deref(); let _guard = self.acquire().await; - store.write_batch_internal(&self.root_key, batch).await + store + .write_batch_internal(&self.partition_key_prefix, self.exclusive_mode, batch) + .await } } -// ScyllaDB requires that the keys are non-empty. -fn get_big_root_key(root_key: &[u8]) -> Vec { - let mut big_key = vec![0]; - big_key.extend(root_key); - big_key +fn get_exclusive_partition_key(root_key: &[u8]) -> Vec { + // This is for views (mutable data). This is the final partition key. + let mut partition_key = vec![0]; + partition_key.extend(root_key); + partition_key +} + +fn get_non_exclusive_partition_key_prefix() -> Vec { + // This is for immutable data. A prefix of the key will be added to this to make the + // partition key. + vec![1] } /// The type for building a new ScyllaDB Key Value Store @@ -849,6 +1203,8 @@ pub struct ScyllaDbStoreInternalConfig { pub max_stream_queries: usize, /// The replication factor. pub replication_factor: u32, + /// The configuration of the ScyllaDB client + pub client_config: ScyllaDbClientConfig, } impl AdminKeyValueStore for ScyllaDbStoreInternal { @@ -864,18 +1220,19 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { ) -> Result { Self::check_namespace(namespace)?; let session = ScyllaDbClient::build_default_session(&config.uri).await?; - let store = ScyllaDbClient::new(session, namespace).await?; + let store = ScyllaDbClient::new(session, namespace, config.client_config.clone()).await?; let store = Arc::new(store); let semaphore = config .max_concurrent_queries .map(|n| Arc::new(Semaphore::new(n))); let max_stream_queries = config.max_stream_queries; - let root_key = get_big_root_key(&[]); + let partition_key_prefix = get_non_exclusive_partition_key_prefix(); Ok(Self { store, semaphore, max_stream_queries, - root_key, + partition_key_prefix, + exclusive_mode: false, }) } @@ -883,12 +1240,13 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { let store = self.store.clone(); let semaphore = self.semaphore.clone(); let max_stream_queries = self.max_stream_queries; - let root_key = get_big_root_key(root_key); + let partition_key_prefix = get_exclusive_partition_key(root_key); Ok(Self { store, semaphore, max_stream_queries, - root_key, + partition_key_prefix, + exclusive_mode: true, }) } @@ -934,7 +1292,7 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { let session = ScyllaDbClient::build_default_session(&config.uri).await?; let statement = session .prepare(format!( - "SELECT root_key FROM {}.{} ALLOW FILTERING", + "SELECT partition_key FROM {}.{} ALLOW FILTERING", KEYSPACE, namespace )) .await?; @@ -944,9 +1302,11 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { let mut rows = rows.rows_stream::<(Vec,)>()?; let mut root_keys = BTreeSet::new(); while let Some(row) = rows.next().await { - let (root_key,) = row?; - let root_key = root_key[1..].to_vec(); - root_keys.insert(root_key); + let (partition_key,) = row?; + if partition_key[0] == 0 { + let root_key = partition_key[1..].to_vec(); + root_keys.insert(root_key); + } } Ok(root_keys.into_iter().collect::>()) } @@ -973,7 +1333,7 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { // We check the way the test can fail. It can fail in different ways. let result = session .prepare(format!( - "SELECT root_key FROM {}.{} LIMIT 1 ALLOW FILTERING", + "SELECT partition_key FROM {}.{} LIMIT 1 ALLOW FILTERING", KEYSPACE, namespace )) .await; @@ -981,7 +1341,7 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { // The missing table translates into a very specific error that we matched let miss_msg1 = format!("unconfigured table {}", namespace); let miss_msg1 = miss_msg1.as_str(); - let miss_msg2 = "Undefined name root_key in selection clause"; + let miss_msg2 = "Undefined name partition_key in selection clause"; let miss_msg3 = format!("Keyspace {} does not exist", KEYSPACE); let Err(error) = result else { // If OK, then the table exists @@ -1034,10 +1394,10 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { let statement = session .prepare(format!( "CREATE TABLE {}.{} (\ - root_key blob, \ - k blob, \ - v blob, \ - PRIMARY KEY (root_key, k) \ + partition_key blob, \ + cluster_key blob, \ + value blob, \ + PRIMARY KEY (partition_key, cluster_key) \ ) \ WITH compaction = {{ \ 'class' : 'SizeTieredCompactionStrategy', \ @@ -1111,6 +1471,7 @@ impl TestKeyValueStore for JournalingKeyValueStore { max_concurrent_queries: Some(10), max_stream_queries: 10, replication_factor: 1, + client_config: ScyllaDbClientConfig::default(), }) } } diff --git a/linera-views/src/test_utils/mod.rs b/linera-views/src/test_utils/mod.rs index 6dc615307c3..03191b8dd80 100644 --- a/linera-views/src/test_utils/mod.rs +++ b/linera-views/src/test_utils/mod.rs @@ -252,7 +252,16 @@ pub fn get_random_key_values_with_sizes( len_key: usize, len_value: usize, ) -> Vec<(Vec, Vec)> { - let key_prefix = vec![0]; + get_random_key_values_with_sizes_with_key_prefix(num_entries, len_key, len_value, vec![0]) +} + +/// Generates a list of random key-values with no duplicates, and a common key prefix. +pub fn get_random_key_values_with_sizes_with_key_prefix( + num_entries: usize, + len_key: usize, + len_value: usize, + key_prefix: Vec, +) -> Vec<(Vec, Vec)> { let mut rng = make_deterministic_rng(); get_random_key_values_prefix( &mut rng, @@ -269,7 +278,15 @@ fn get_random_key_values_with_small_keys( len_key: usize, len_value: usize, ) -> Vec<(Vec, Vec)> { - let key_prefix = vec![0]; + get_random_key_values_with_small_keys_with_key_prefix(num_entries, len_key, len_value, vec![0]) +} + +fn get_random_key_values_with_small_keys_with_key_prefix( + num_entries: usize, + len_key: usize, + len_value: usize, + key_prefix: Vec, +) -> Vec<(Vec, Vec)> { let mut rng = make_deterministic_rng(); get_random_key_values_prefix( &mut rng, @@ -304,6 +321,19 @@ pub fn get_random_test_scenarios() -> Vec, Vec)>> { ] } +/// We build a number of scenarios for testing the reads, with a specific key prefix. +pub fn get_random_test_scenarios_with_key_prefix( + key_prefix: &[u8], +) -> Vec, Vec)>> { + vec![ + get_random_key_values_with_sizes_with_key_prefix(7, 8, 3, key_prefix.to_vec()), + get_random_key_values_with_sizes_with_key_prefix(150, 8, 3, key_prefix.to_vec()), + get_random_key_values_with_sizes_with_key_prefix(30, 8, 10, key_prefix.to_vec()), + get_random_key_values_with_small_keys_with_key_prefix(30, 4, 10, key_prefix.to_vec()), + get_random_key_values_with_small_keys_with_key_prefix(30, 4, 100, key_prefix.to_vec()), + ] +} + fn generate_random_batch(rng: &mut R, key_prefix: &[u8], batch_size: usize) -> Batch { let mut batch = Batch::new(); // Fully random batch @@ -779,10 +809,11 @@ pub async fn root_key_admin_test() { let mut keys = BTreeSet::new(); S::create(&config, &namespace).await.expect("creation"); let prefix = vec![0]; + { - let size = 3; let mut rng = make_deterministic_rng(); let store = S::connect(&config, &namespace).await.expect("store"); + let store = store.open_exclusive(&[]).expect("exclusive store"); root_keys.push(vec![]); let mut batch = Batch::new(); for _ in 0..2 { @@ -792,11 +823,12 @@ pub async fn root_key_admin_test() { } store.write_batch(batch).await.expect("write batch"); + let size = 3; for _ in 0..20 { let root_key = get_random_byte_vector(&mut rng, &[], 4); - let cloned_store = store.open_exclusive(&root_key).expect("cloned store"); + let cloned_store = store.open_exclusive(&root_key).expect("exclusive store"); root_keys.push(root_key.clone()); - let size_select = rng.gen_range(0..size); + let size_select = rng.gen_range(1..size); let mut batch = Batch::new(); for _ in 0..size_select { let key = get_random_byte_vector(&mut rng, &prefix, 4); diff --git a/linera-views/src/views/unit_tests/views.rs b/linera-views/src/views/unit_tests/views.rs index f638e3dc696..f982ad2a989 100644 --- a/linera-views/src/views/unit_tests/views.rs +++ b/linera-views/src/views/unit_tests/views.rs @@ -243,6 +243,7 @@ impl TestContextFactory for ScyllaDbContextFactory { let config = ScyllaDbStore::new_test_config().await?; let namespace = generate_test_namespace(); let store = ScyllaDbStore::recreate_and_connect(&config, &namespace).await?; + let store = store.open_exclusive(&[])?; let context = ViewContext::create_root_context(store, ()).await?; Ok(context) } diff --git a/linera-views/tests/store_tests.rs b/linera-views/tests/store_tests.rs index 9d954e5eac6..946d2c81d20 100644 --- a/linera-views/tests/store_tests.rs +++ b/linera-views/tests/store_tests.rs @@ -106,11 +106,19 @@ async fn test_reads_scylla_db() { #[cfg(with_scylladb)] #[tokio::test] async fn test_reads_scylla_db_no_root_key() { - for scenario in get_random_test_scenarios() { - let store = linera_views::scylla_db::ScyllaDbStore::new_test_store() - .await - .unwrap(); - run_reads(store, scenario).await; + use linera_views::test_utils::{ + get_random_byte_vector, get_random_test_scenarios_with_key_prefix, + }; + + let mut rng = make_deterministic_rng(); + for _ in 0..3 { + let key_prefix = get_random_byte_vector(&mut rng, &[], 4); + for scenario in get_random_test_scenarios_with_key_prefix(&key_prefix) { + let store = linera_views::scylla_db::ScyllaDbStore::new_test_store() + .await + .unwrap(); + run_reads(store, scenario).await; + } } } @@ -184,9 +192,12 @@ async fn test_dynamo_db_writes_from_blank() { #[cfg(with_scylladb)] #[tokio::test] async fn test_scylla_db_writes_from_blank() { + use linera_views::store::AdminKeyValueStore as _; + let store = linera_views::scylla_db::ScyllaDbStore::new_test_store() .await .unwrap(); + let store = store.open_exclusive(&[]).unwrap(); run_writes_from_blank(&store).await; } @@ -333,9 +344,12 @@ async fn test_dynamo_db_writes_from_state() { #[cfg(with_scylladb)] #[tokio::test] async fn test_scylla_db_writes_from_state() { + use linera_views::store::AdminKeyValueStore as _; + let store = linera_views::scylla_db::ScyllaDbStore::new_test_store() .await .unwrap(); + let store = store.open_exclusive(&[]).unwrap(); run_writes_from_state(&store).await; }