diff --git a/Cargo.lock b/Cargo.lock index 0504e8234b7..fb1b210352c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5757,6 +5757,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-test", "web-sys", + "xxhash-rust", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 146b04616fc..408c4f3bdfb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -272,6 +272,7 @@ wasmtimer = "0.2.0" web-sys = "0.3.69" web-time = "1.1.0" wit-bindgen = "0.24.0" +xxhash-rust = { version = "0.8.15", features = ["xxh3"] } zstd = "0.13.2" linera-base = { version = "0.15.0", path = "./linera-base" } diff --git a/linera-views/Cargo.toml b/linera-views/Cargo.toml index 631a2859c33..126593c776e 100644 --- a/linera-views/Cargo.toml +++ b/linera-views/Cargo.toml @@ -26,7 +26,7 @@ indexeddb = ["indexed_db_futures", "wasm-bindgen"] web-default = ["web", "indexeddb"] dynamodb = ["aws-config", "aws-sdk-dynamodb", "aws-smithy-types"] -scylladb = ["scylla"] +scylladb = ["scylla", "xxhash-rust"] [dependencies] anyhow.workspace = true @@ -59,6 +59,7 @@ thiserror.workspace = true tokio = { workspace = true, features = ["rt", "sync"] } tracing.workspace = true trait-variant.workspace = true +xxhash-rust = { workspace = true, optional = true } [target.wasm32-unknown-unknown.dependencies] indexed_db_futures = { workspace = true, optional = true } diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 02972ccffb4..7ea002855e4 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -22,7 +22,8 @@ use futures::{ use linera_base::ensure; use scylla::{ client::{ - execution_profile::ExecutionProfile, session::Session, session_builder::SessionBuilder, + execution_profile::ExecutionProfile, pager::QueryPager, session::Session, + session_builder::SessionBuilder, }, deserialize::{DeserializationError, TypeCheckError}, errors::{ @@ -36,6 +37,7 @@ use scylla::{ }; use serde::{Deserialize, Serialize}; use thiserror::Error; +use xxhash_rust::xxh3::xxh3_64; #[cfg(with_metrics)] use crate::metering::MeteredStore; @@ -93,6 +95,16 @@ const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE /// correct. const MAX_BATCH_SIZE: usize = 5000; +/// This is the length of the prefix of the key that we'll be hashing to determine the partition +/// bucket. +const K_PREFIX_LENGTH_BYTES: usize = 3; + +/// The number of bucket partitions in the table. +const NUM_BUCKETS: u16 = 1 << 10; + +/// Map from bucket_id to a map from keys to a list of their occurrences in the original vector. +type OccurrencesMap = HashMap, HashMap, Vec>>; + /// The client for ScyllaDB: /// * The session allows to pass queries /// * The namespace that is being assigned to the database @@ -126,7 +138,7 @@ impl ScyllaDbClient { .collect::>() .join(","); let query = format!( - "SELECT k,v FROM kv.{} WHERE root_key = ? AND k IN ({})", + "SELECT k,v FROM kv.{} WHERE root_key = ? AND bucket_id = ? AND k IN ({})", self.namespace, markers ); let prepared_statement = self.session.prepare(query).await?; @@ -146,7 +158,7 @@ impl ScyllaDbClient { .collect::>() .join(","); let query = format!( - "SELECT k FROM kv.{} WHERE root_key = ? AND k IN ({})", + "SELECT k FROM kv.{} WHERE root_key = ? AND bucket_id = ? AND k IN ({})", self.namespace, markers ); let prepared_statement = self.session.prepare(query).await?; @@ -183,70 +195,70 @@ impl ScyllaDbClient { let namespace = namespace.to_string(); let read_value = session .prepare(format!( - "SELECT v FROM kv.{} WHERE root_key = ? AND k = ?", + "SELECT v FROM kv.{} WHERE root_key = ? AND bucket_id = ? AND k = ?", namespace )) .await?; let contains_key = session .prepare(format!( - "SELECT root_key FROM kv.{} WHERE root_key = ? AND k = ?", + "SELECT root_key FROM kv.{} WHERE root_key = ? AND bucket_id = ? AND k = ?", namespace )) .await?; let write_batch_delete_prefix_unbounded = session .prepare(format!( - "DELETE FROM kv.{} WHERE root_key = ? AND k >= ?", + "DELETE FROM kv.{} WHERE root_key = ? AND bucket_id = ? AND k >= ?", namespace )) .await?; let write_batch_delete_prefix_bounded = session .prepare(format!( - "DELETE FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?", + "DELETE FROM kv.{} WHERE root_key = ? AND bucket_id = ? AND k >= ? AND k < ?", namespace )) .await?; let write_batch_deletion = session .prepare(format!( - "DELETE FROM kv.{} WHERE root_key = ? AND k = ?", + "DELETE FROM kv.{} WHERE root_key = ? AND bucket_id = ? AND k = ?", namespace )) .await?; let write_batch_insertion = session .prepare(format!( - "INSERT INTO kv.{} (root_key, k, v) VALUES (?, ?, ?)", + "INSERT INTO kv.{} (root_key, bucket_id, k, v) VALUES (?, ?, ?, ?)", namespace )) .await?; let find_keys_by_prefix_unbounded = session .prepare(format!( - "SELECT k FROM kv.{} WHERE root_key = ? AND k >= ?", + "SELECT k FROM kv.{} WHERE root_key = ? AND bucket_id = ? AND k >= ?", namespace )) .await?; let find_keys_by_prefix_bounded = session .prepare(format!( - "SELECT k FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?", + "SELECT k FROM kv.{} WHERE root_key = ? AND bucket_id = ? AND k >= ? AND k < ?", namespace )) .await?; let find_key_values_by_prefix_unbounded = session .prepare(format!( - "SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ?", + "SELECT k,v FROM kv.{} WHERE root_key = ? AND bucket_id = ? AND k >= ?", namespace )) .await?; let find_key_values_by_prefix_bounded = session .prepare(format!( - "SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?", + "SELECT k,v FROM kv.{} WHERE root_key = ? AND bucket_id = ? AND k >= ? AND k < ?", namespace )) .await?; @@ -277,7 +289,7 @@ impl ScyllaDbClient { Self::check_key_size(&key)?; let session = &self.session; // Read the value of a key - let values = (root_key.to_vec(), key); + let values = (root_key.to_vec(), Self::get_bucket_id(root_key, &key), key); let (result, _) = session .execute_single_page(&self.read_value, &values, PagingState::start()) @@ -290,13 +302,32 @@ impl ScyllaDbClient { }) } + fn get_bucket_id(root_key: &[u8], key: &[u8]) -> Vec { + if root_key.len() > 1 { + // A root key longer than 1 means that we used `clone_with_root_key` and we don't need + // to add extra partitioning. So return the same bucket id always. + vec![0, 0] + } else { + let range_end = key.len().min(K_PREFIX_LENGTH_BYTES); + let bucket_id = (xxh3_64(&key[..range_end]) as u16) & (NUM_BUCKETS - 1); + bucket_id.to_be_bytes().to_vec() + } + } + fn get_occurences_map( + root_key: &[u8], keys: Vec>, - ) -> Result, Vec>, ScyllaDbStoreInternalError> { - let mut map = HashMap::, Vec>::new(); + ) -> Result { + let mut map: OccurrencesMap = HashMap::new(); for (i_key, key) in keys.into_iter().enumerate() { Self::check_key_size(&key)?; - map.entry(key).or_default().push(i_key); + let bucket_id = Self::get_bucket_id(root_key, &key); + + map.entry(bucket_id) + .or_default() + .entry(key) + .or_default() + .push(i_key); } Ok(map) } @@ -307,22 +338,49 @@ impl ScyllaDbClient { keys: Vec>, ) -> Result>>, ScyllaDbStoreInternalError> { let mut values = vec![None; keys.len()]; - let map = Self::get_occurences_map(keys)?; - let statement = self.get_multi_key_values_statement(map.len()).await?; - let mut inputs = vec![root_key.to_vec()]; - inputs.extend(map.keys().cloned()); - let mut rows = self - .session - .execute_iter(statement, &inputs) - .await? - .rows_stream::<(Vec, Vec)>()?; + let map = Self::get_occurences_map(root_key, keys)?; + let statements = map + .iter() + .map(|(bucket_id, keys_map)| async { + let statement = self.get_multi_key_values_statement(keys_map.len()).await?; + let mut inputs = vec![root_key.to_vec(), bucket_id.clone()]; + inputs.extend(keys_map.keys().cloned()); + Ok::<_, ScyllaDbStoreInternalError>((bucket_id.clone(), statement, inputs)) + }) + .collect::>(); + let statements = try_join_all(statements).await?; + + let mut futures = Vec::new(); + let map_ref = ↦ + for (bucket_id, statement, inputs) in statements { + futures.push(async move { + let mut rows = self + .session + .execute_iter(statement, &inputs) + .await? + .rows_stream::<(Vec, Vec)>()?; + let mut value_pairs = Vec::new(); + while let Some(row) = rows.next().await { + let (key, value) = row?; + for i_key in map_ref + .get(&bucket_id) + .expect("bucket_id is supposed to be in map") + .get(&key) + .expect("key is supposed to be in map") + { + value_pairs.push((*i_key, value.clone())); + } + } - while let Some(row) = rows.next().await { - let (key, value) = row?; - for i_key in map.get(&key).expect("key is supposed to be in map") { - values[*i_key] = Some(value.clone()); - } + Ok::<_, ScyllaDbStoreInternalError>(value_pairs) + }); + } + + let values_pairs = try_join_all(futures).await?; + for (i_key, value) in values_pairs.iter().flatten() { + values[*i_key] = Some(value.clone()); } + Ok(values) } @@ -332,21 +390,47 @@ impl ScyllaDbClient { keys: Vec>, ) -> Result, ScyllaDbStoreInternalError> { let mut values = vec![false; keys.len()]; - let map = Self::get_occurences_map(keys)?; - let statement = self.get_multi_keys_statement(map.len()).await?; - let mut inputs = vec![root_key.to_vec()]; - inputs.extend(map.keys().cloned()); - let mut rows = self - .session - .execute_iter(statement, &inputs) - .await? - .rows_stream::<(Vec,)>()?; + let map = Self::get_occurences_map(root_key, keys)?; + let statements = map + .iter() + .map(|(bucket_id, keys_map)| async { + let statement = self.get_multi_keys_statement(keys_map.len()).await?; + let mut inputs = vec![root_key.to_vec(), bucket_id.clone()]; + inputs.extend(keys_map.keys().cloned()); + Ok::<_, ScyllaDbStoreInternalError>((bucket_id.clone(), statement, inputs)) + }) + .collect::>(); + let statements = try_join_all(statements).await?; + + let mut futures = Vec::new(); + let map_ref = ↦ + for (bucket_id, statement, inputs) in statements { + futures.push(async move { + let mut rows = self + .session + .execute_iter(statement, &inputs) + .await? + .rows_stream::<(Vec,)>()?; + let mut keys = Vec::new(); + while let Some(row) = rows.next().await { + let (key,) = row?; + for i_key in map_ref + .get(&bucket_id) + .expect("bucket_id is supposed to be in map") + .get(&key) + .expect("key is supposed to be in map") + { + keys.push(*i_key); + } + } - while let Some(row) = rows.next().await { - let (key,) = row?; - for i_key in map.get(&key).expect("key is supposed to be in map") { - values[*i_key] = true; - } + Ok::<_, ScyllaDbStoreInternalError>(keys) + }); + } + let keys = try_join_all(futures).await?; + + for i_key in keys.iter().flatten() { + values[*i_key] = true; } Ok(values) @@ -360,7 +444,7 @@ impl ScyllaDbClient { Self::check_key_size(&key)?; let session = &self.session; // Read the value of a key - let values = (root_key.to_vec(), key); + let values = (root_key.to_vec(), Self::get_bucket_id(root_key, &key), key); let (result, _) = session .execute_single_page(&self.contains_key, &values, PagingState::start()) @@ -384,25 +468,77 @@ impl ScyllaDbClient { Self::check_key_size(&key_prefix)?; match get_upper_bound_option(&key_prefix) { None => { - let prepared_statement = &self.write_batch_delete_prefix_unbounded; - let values = (root_key.clone(), key_prefix); - futures.push(Box::pin(async move { - session - .execute_single_page(prepared_statement, values, PagingState::start()) - .await - .map(|_| ()) - .map_err(Into::into) + let values = if key_prefix.len() > K_PREFIX_LENGTH_BYTES { + vec![( + root_key.clone(), + Self::get_bucket_id(&root_key, &key_prefix), + key_prefix, + )] + } else { + // If the key prefix is shorter than K_PREFIX_LENGTH_BYTES, we need to + // go through all buckets. + let mut inner_values = Vec::new(); + for bucket_id in 0..NUM_BUCKETS { + inner_values.push(( + root_key.clone(), + bucket_id.to_be_bytes().to_vec(), + key_prefix.clone(), + )); + } + inner_values + }; + + futures.extend(values.into_iter().map(|values| { + async move { + session + .execute_single_page( + &self.write_batch_delete_prefix_unbounded, + values, + PagingState::start(), + ) + .await + .map(|_| ()) + .map_err(Into::into) + } + .boxed() })); } Some(upper) => { - let prepared_statement = &self.write_batch_delete_prefix_bounded; - let values = (root_key.clone(), key_prefix, upper); - futures.push(Box::pin(async move { - session - .execute_single_page(prepared_statement, values, PagingState::start()) - .await - .map(|_| ()) - .map_err(Into::into) + let values = if key_prefix.len() > K_PREFIX_LENGTH_BYTES { + vec![( + root_key.clone(), + Self::get_bucket_id(&root_key, &key_prefix), + key_prefix, + upper, + )] + } else { + // If the key prefix is shorter than K_PREFIX_LENGTH_BYTES, we need to + // go through all buckets. + let mut inner_values = Vec::new(); + for bucket_id in 0..NUM_BUCKETS { + inner_values.push(( + root_key.clone(), + bucket_id.to_be_bytes().to_vec(), + key_prefix.clone(), + upper.clone(), + )); + } + inner_values + }; + + futures.extend(values.into_iter().map(|values| { + async move { + session + .execute_single_page( + &self.write_batch_delete_prefix_bounded, + values, + PagingState::start(), + ) + .await + .map(|_| ()) + .map_err(Into::into) + } + .boxed() })); } } @@ -413,7 +549,7 @@ impl ScyllaDbClient { for key in batch.simple_unordered_batch.deletions { Self::check_key_size(&key)?; let prepared_statement = &self.write_batch_deletion; - let values = (root_key.clone(), key); + let values = (root_key.clone(), Self::get_bucket_id(&root_key, &key), key); futures.push(Box::pin(async move { session .execute_single_page(prepared_statement, values, PagingState::start()) @@ -427,7 +563,12 @@ impl ScyllaDbClient { Self::check_key_size(&key)?; Self::check_value_size(&value)?; let prepared_statement = &self.write_batch_insertion; - let values = (root_key.clone(), key, value); + let values = ( + root_key.clone(), + Self::get_bucket_id(&root_key, &key), + key, + value, + ); futures.push(Box::pin(async move { session .execute_single_page(prepared_statement, values, PagingState::start()) @@ -441,6 +582,67 @@ impl ScyllaDbClient { Ok(()) } + async fn get_keys_from_query_pager( + &self, + query_pager: QueryPager, + len: usize, + ) -> Result>, ScyllaDbStoreInternalError> { + let mut rows = query_pager.rows_stream::<(Vec,)>()?; + let mut keys = Vec::new(); + while let Some(row) = rows.next().await { + let (key,) = row?; + let short_key = key[len..].to_vec(); + keys.push(short_key); + } + + Ok(keys) + } + + async fn get_key_values_from_query_pager( + &self, + rows: QueryPager, + len: usize, + ) -> Result, Vec)>, ScyllaDbStoreInternalError> { + let mut rows = rows.rows_stream::<(Vec, Vec)>()?; + let mut key_values = Vec::new(); + while let Some(row) = rows.next().await { + let (key, value) = row?; + let short_key = key[len..].to_vec(); + key_values.push((short_key, value)); + } + + Ok(key_values) + } + + async fn get_flattened_ordered_keys( + keys: Vec>, + returned_keys_len: usize, + ) -> Result, ScyllaDbStoreInternalError> { + let mut flattened_keys = Vec::with_capacity(returned_keys_len); + let mut idxs = vec![0; keys.len()]; + for _ in 0..returned_keys_len { + let mut smallest_idx = 0; + let mut smallest_key: Option = None; + for (j, inner_keys) in keys.iter().enumerate() { + if idxs[j] >= inner_keys.len() { + continue; + } + + if smallest_key.is_none() + || inner_keys[idxs[j]] + < smallest_key.clone().expect("Should have a smallest key") + { + smallest_idx = j; + smallest_key = Some(inner_keys[idxs[j]].clone()); + } + } + + flattened_keys.push(smallest_key.expect("Should have a smallest key")); + idxs[smallest_idx] += 1; + } + Ok(flattened_keys) + } + async fn find_keys_by_prefix_internal( &self, root_key: &[u8], @@ -450,28 +652,94 @@ impl ScyllaDbClient { let session = &self.session; // Read the value of a key let len = key_prefix.len(); - let query_unbounded = &self.find_keys_by_prefix_unbounded; - let query_bounded = &self.find_keys_by_prefix_bounded; - let rows = match get_upper_bound_option(&key_prefix) { + let returned_keys_len; + let keys = match get_upper_bound_option(&key_prefix) { None => { - let values = (root_key.to_vec(), key_prefix.clone()); - session - .execute_iter(query_unbounded.clone(), values) - .await? + if key_prefix.len() > K_PREFIX_LENGTH_BYTES { + let values = ( + root_key.to_vec(), + Self::get_bucket_id(root_key, &key_prefix), + key_prefix.clone(), + ); + let query_pager = session + .execute_iter(self.find_keys_by_prefix_unbounded.clone(), values) + .await?; + let inner_keys = self.get_keys_from_query_pager(query_pager, len).await?; + returned_keys_len = inner_keys.len(); + vec![inner_keys] + } else { + let mut futures = Vec::new(); + for bucket_id in 0..NUM_BUCKETS { + let values = ( + root_key.to_vec(), + bucket_id.to_be_bytes().to_vec(), + key_prefix.clone(), + ); + futures.push(Box::pin(async move { + let query_pager = session + .execute_iter(self.find_keys_by_prefix_unbounded.clone(), values) + .await?; + let inner_keys = + self.get_keys_from_query_pager(query_pager, len).await?; + let inner_keys_len = inner_keys.len(); + Ok::<(Vec>, usize), ScyllaDbStoreInternalError>(( + inner_keys, + inner_keys_len, + )) + })); + } + let results = try_join_all(futures).await?; + returned_keys_len = results.iter().map(|(_, len)| len).sum(); + let keys: Vec>> = + results.into_iter().map(|(keys, _)| keys).collect(); + keys + } } Some(upper_bound) => { - let values = (root_key.to_vec(), key_prefix.clone(), upper_bound); - session.execute_iter(query_bounded.clone(), values).await? + if key_prefix.len() > K_PREFIX_LENGTH_BYTES { + let values = ( + root_key.to_vec(), + Self::get_bucket_id(root_key, &key_prefix), + key_prefix.clone(), + upper_bound, + ); + let query_pager = session + .execute_iter(self.find_keys_by_prefix_bounded.clone(), values) + .await?; + let inner_keys = self.get_keys_from_query_pager(query_pager, len).await?; + returned_keys_len = inner_keys.len(); + vec![inner_keys] + } else { + let mut futures = Vec::new(); + for bucket_id in 0..NUM_BUCKETS { + let values = ( + root_key.to_vec(), + bucket_id.to_be_bytes().to_vec(), + key_prefix.clone(), + upper_bound.clone(), + ); + futures.push(Box::pin(async move { + let query_pager = session + .execute_iter(self.find_keys_by_prefix_bounded.clone(), values) + .await?; + let inner_keys = + self.get_keys_from_query_pager(query_pager, len).await?; + let inner_keys_len = inner_keys.len(); + Ok::<(Vec>, usize), ScyllaDbStoreInternalError>(( + inner_keys, + inner_keys_len, + )) + })); + } + let results = try_join_all(futures).await?; + returned_keys_len = results.iter().map(|(_, len)| len).sum(); + let keys: Vec>> = + results.into_iter().map(|(keys, _)| keys).collect(); + keys + } } }; - let mut rows = rows.rows_stream::<(Vec,)>()?; - let mut keys = Vec::new(); - while let Some(row) = rows.next().await { - let (key,) = row?; - let short_key = key[len..].to_vec(); - keys.push(short_key); - } - Ok(keys) + Self::get_flattened_ordered_keys(keys, returned_keys_len).await } async fn find_key_values_by_prefix_internal( @@ -483,28 +751,105 @@ impl ScyllaDbClient { let session = &self.session; // Read the value of a key let len = key_prefix.len(); - let query_unbounded = &self.find_key_values_by_prefix_unbounded; - let query_bounded = &self.find_key_values_by_prefix_bounded; - let rows = match get_upper_bound_option(&key_prefix) { - None => { - let values = (root_key.to_vec(), key_prefix.clone()); - session - .execute_iter(query_unbounded.clone(), values) - .await? - } - Some(upper_bound) => { - let values = (root_key.to_vec(), key_prefix.clone(), upper_bound); - session.execute_iter(query_bounded.clone(), values).await? - } - }; - let mut rows = rows.rows_stream::<(Vec, Vec)>()?; - let mut key_values = Vec::new(); - while let Some(row) = rows.next().await { - let (key, value) = row?; - let short_key = key[len..].to_vec(); - key_values.push((short_key, value)); - } - Ok(key_values) + let returned_keys_len; + let key_values = + match get_upper_bound_option(&key_prefix) { + None => { + if key_prefix.len() > K_PREFIX_LENGTH_BYTES { + let values = ( + root_key.to_vec(), + Self::get_bucket_id(root_key, &key_prefix), + key_prefix.clone(), + ); + let query_pager = session + .execute_iter(self.find_key_values_by_prefix_unbounded.clone(), values) + .await?; + let inner_keys = self + .get_key_values_from_query_pager(query_pager, len) + .await?; + returned_keys_len = inner_keys.len(); + vec![inner_keys] + } else { + let mut futures = Vec::new(); + for bucket_id in 0..NUM_BUCKETS { + let values = ( + root_key.to_vec(), + bucket_id.to_be_bytes().to_vec(), + key_prefix.clone(), + ); + futures.push(Box::pin(async move { + let query_pager = session + .execute_iter( + self.find_key_values_by_prefix_unbounded.clone(), + values, + ) + .await?; + let inner_keys = self + .get_key_values_from_query_pager(query_pager, len) + .await?; + let inner_keys_len = inner_keys.len(); + Ok::<(Vec<(Vec, Vec)>, usize), ScyllaDbStoreInternalError>( + (inner_keys, inner_keys_len), + ) + })); + } + let results = try_join_all(futures).await?; + returned_keys_len = results.iter().map(|(_, len)| len).sum(); + let keys: Vec, Vec)>> = + results.into_iter().map(|(keys, _)| keys).collect(); + keys + } + } + Some(upper_bound) => { + if key_prefix.len() > K_PREFIX_LENGTH_BYTES { + let values = ( + root_key.to_vec(), + Self::get_bucket_id(root_key, &key_prefix), + key_prefix.clone(), + upper_bound, + ); + let query_pager = session + .execute_iter(self.find_key_values_by_prefix_bounded.clone(), values) + .await?; + let inner_keys = self + .get_key_values_from_query_pager(query_pager, len) + .await?; + returned_keys_len = inner_keys.len(); + vec![inner_keys] + } else { + let mut futures = Vec::new(); + for bucket_id in 0..NUM_BUCKETS { + let values = ( + root_key.to_vec(), + bucket_id.to_be_bytes().to_vec(), + key_prefix.clone(), + upper_bound.clone(), + ); + futures.push(Box::pin(async move { + let query_pager = session + .execute_iter( + self.find_key_values_by_prefix_bounded.clone(), + values, + ) + .await?; + let inner_keys = self + .get_key_values_from_query_pager(query_pager, len) + .await?; + let inner_keys_len = inner_keys.len(); + Ok::<(Vec<(Vec, Vec)>, usize), ScyllaDbStoreInternalError>( + (inner_keys, inner_keys_len), + ) + })); + } + let results = try_join_all(futures).await?; + returned_keys_len = results.iter().map(|(_, len)| len).sum(); + let keys: Vec, Vec)>> = + results.into_iter().map(|(keys, _)| keys).collect(); + keys + } + } + }; + Self::get_flattened_ordered_keys(key_values, returned_keys_len).await } } @@ -910,9 +1255,10 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { .prepare(format!( "CREATE TABLE kv.{} (\ root_key blob, \ + bucket_id blob, \ k blob, \ v blob, \ - PRIMARY KEY (root_key, k) \ + PRIMARY KEY ((root_key, bucket_id), k) \ ) \ WITH compaction = {{ \ 'class' : 'SizeTieredCompactionStrategy', \ diff --git a/linera-views/src/common.rs b/linera-views/src/common.rs index ab059c44eb7..7621a5d526e 100644 --- a/linera-views/src/common.rs +++ b/linera-views/src/common.rs @@ -74,9 +74,8 @@ impl DeletionSet { pub(crate) fn get_upper_bound_option(key_prefix: &[u8]) -> Option> { let len = key_prefix.len(); for i in (0..len).rev() { - let val = key_prefix[i]; - if val < u8::MAX { - let mut upper_bound = key_prefix[0..i + 1].to_vec(); + if key_prefix[i] < u8::MAX { + let mut upper_bound = key_prefix[0..=i].to_vec(); upper_bound[i] += 1; return Some(upper_bound); }