Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 124 additions & 52 deletions linera-views/src/backends/scylla_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
/// correct.
const MAX_BATCH_SIZE: usize = 5000;

type OccurencesMap = HashMap<Vec<u8>, Vec<usize>>;
type OccurencesMapByType = HashMap<u8, OccurencesMap>;

/// The client for ScyllaDB:
/// * The session allows to pass queries
/// * The namespace that is being assigned to the database
Expand Down Expand Up @@ -126,7 +129,7 @@ impl ScyllaDbClient {
.collect::<Vec<_>>()
.join(",");
let query = format!(
"SELECT k,v FROM kv.{} WHERE root_key = ? AND k IN ({})",
"SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k IN ({})",
self.namespace, markers
);
let prepared_statement = self.session.prepare(query).await?;
Expand All @@ -146,7 +149,7 @@ impl ScyllaDbClient {
.collect::<Vec<_>>()
.join(",");
let query = format!(
"SELECT k FROM kv.{} WHERE root_key = ? AND k IN ({})",
"SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k IN ({})",
self.namespace, markers
);
let prepared_statement = self.session.prepare(query).await?;
Expand Down Expand Up @@ -183,70 +186,70 @@ impl ScyllaDbClient {
let namespace = namespace.to_string();
let read_value = session
.prepare(format!(
"SELECT v FROM kv.{} WHERE root_key = ? AND k = ?",
"SELECT v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?",
namespace
))
.await?;

let contains_key = session
.prepare(format!(
"SELECT root_key FROM kv.{} WHERE root_key = ? AND k = ?",
"SELECT root_key FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?",
namespace
))
.await?;

let write_batch_delete_prefix_unbounded = session
.prepare(format!(
"DELETE FROM kv.{} WHERE root_key = ? AND k >= ?",
"DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?",
namespace
))
.await?;

let write_batch_delete_prefix_bounded = session
.prepare(format!(
"DELETE FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?",
"DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?",
namespace
))
.await?;

let write_batch_deletion = session
.prepare(format!(
"DELETE FROM kv.{} WHERE root_key = ? AND k = ?",
"DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?",
namespace
))
.await?;

let write_batch_insertion = session
.prepare(format!(
"INSERT INTO kv.{} (root_key, k, v) VALUES (?, ?, ?)",
"INSERT INTO kv.{} (root_key, type_id, k, v) VALUES (?, ?, ?, ?)",
namespace
))
.await?;

let find_keys_by_prefix_unbounded = session
.prepare(format!(
"SELECT k FROM kv.{} WHERE root_key = ? AND k >= ?",
"SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?",
namespace
))
.await?;

let find_keys_by_prefix_bounded = session
.prepare(format!(
"SELECT k FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?",
"SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?",
namespace
))
.await?;

let find_key_values_by_prefix_unbounded = session
.prepare(format!(
"SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ?",
"SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?",
namespace
))
.await?;

let find_key_values_by_prefix_bounded = session
.prepare(format!(
"SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?",
"SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?",
namespace
))
.await?;
Expand Down Expand Up @@ -277,7 +280,7 @@ impl ScyllaDbClient {
Self::check_key_size(&key)?;
let session = &self.session;
// Read the value of a key
let values = (root_key.to_vec(), key);
let values = (root_key.to_vec(), vec![key[0]], key);

let (result, _) = session
.execute_single_page(&self.read_value, &values, PagingState::start())
Expand All @@ -292,11 +295,16 @@ impl ScyllaDbClient {

fn get_occurences_map(
keys: Vec<Vec<u8>>,
) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
) -> Result<OccurencesMapByType, ScyllaDbStoreInternalError> {
let mut map = OccurencesMapByType::new();
for (i_key, key) in keys.into_iter().enumerate() {
Self::check_key_size(&key)?;
map.entry(key).or_default().push(i_key);
let type_id = key[0];
map.entry(type_id)
.or_default()
.entry(key)
.or_default()
.push(i_key);
}
Ok(map)
}
Expand All @@ -308,21 +316,48 @@ impl ScyllaDbClient {
) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
let mut values = vec![None; keys.len()];
let map = Self::get_occurences_map(keys)?;
let statement = self.get_multi_key_values_statement(map.len()).await?;
let mut inputs = vec![root_key.to_vec()];
inputs.extend(map.keys().cloned());
let mut rows = self
.session
.execute_iter(statement, &inputs)
.await?
.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
let statements = map
.iter()
.map(|(type_id, map)| async {
let statement = self.get_multi_key_values_statement(map.len()).await?;
let mut inputs = vec![root_key.to_vec(), vec![*type_id]];
inputs.extend(map.keys().cloned());
Ok::<_, ScyllaDbStoreInternalError>((*type_id, statement, inputs))
})
.collect::<Vec<_>>();
let statements = try_join_all(statements).await?;

let mut futures = Vec::new();
let map_ref = &map;
for (type_id, statement, inputs) in statements {
futures.push(async move {
let mut rows = self
.session
.execute_iter(statement, &inputs)
.await?
.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
let mut value_pairs = Vec::new();
while let Some(row) = rows.next().await {
let (key, value) = row?;
for i_key in map_ref
.get(&type_id)
.expect("type_id is supposed to be in map")
.get(&key)
.expect("key is supposed to be in map")
{
value_pairs.push((*i_key, value.clone()));
}
}

while let Some(row) = rows.next().await {
let (key, value) = row?;
for i_key in map.get(&key).expect("key is supposed to be in map") {
values[*i_key] = Some(value.clone());
}
Ok::<_, ScyllaDbStoreInternalError>(value_pairs)
});
}

let values_pairs = try_join_all(futures).await?;
for (i_key, value) in values_pairs.iter().flatten() {
values[*i_key] = Some(value.clone());
}

Ok(values)
}

Expand All @@ -333,20 +368,46 @@ impl ScyllaDbClient {
) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
let mut values = vec![false; keys.len()];
let map = Self::get_occurences_map(keys)?;
let statement = self.get_multi_keys_statement(map.len()).await?;
let mut inputs = vec![root_key.to_vec()];
inputs.extend(map.keys().cloned());
let mut rows = self
.session
.execute_iter(statement, &inputs)
.await?
.rows_stream::<(Vec<u8>,)>()?;
let statements = map
.iter()
.map(|(type_id, map)| async {
let statement = self.get_multi_keys_statement(map.len()).await?;
let mut inputs = vec![root_key.to_vec(), vec![*type_id]];
inputs.extend(map.keys().cloned());
Ok::<_, ScyllaDbStoreInternalError>((*type_id, statement, inputs))
})
.collect::<Vec<_>>();
let statements = try_join_all(statements).await?;

let mut futures = Vec::new();
let map_ref = &map;
for (type_id, statement, inputs) in statements {
futures.push(async move {
let mut rows = self
.session
.execute_iter(statement, &inputs)
.await?
.rows_stream::<(Vec<u8>,)>()?;
let mut keys = Vec::new();
while let Some(row) = rows.next().await {
let (key,) = row?;
for i_key in map_ref
.get(&type_id)
.expect("type_id is supposed to be in map")
.get(&key)
.expect("key is supposed to be in map")
{
keys.push(*i_key);
}
}

while let Some(row) = rows.next().await {
let (key,) = row?;
for i_key in map.get(&key).expect("key is supposed to be in map") {
values[*i_key] = true;
}
Ok::<_, ScyllaDbStoreInternalError>(keys)
});
}
let keys = try_join_all(futures).await?;

for i_key in keys.iter().flatten() {
values[*i_key] = true;
}

Ok(values)
Expand All @@ -360,7 +421,7 @@ impl ScyllaDbClient {
Self::check_key_size(&key)?;
let session = &self.session;
// Read the value of a key
let values = (root_key.to_vec(), key);
let values = (root_key.to_vec(), vec![key[0]], key);

let (result, _) = session
.execute_single_page(&self.contains_key, &values, PagingState::start())
Expand All @@ -385,7 +446,7 @@ impl ScyllaDbClient {
match get_upper_bound_option(&key_prefix) {
None => {
let prepared_statement = &self.write_batch_delete_prefix_unbounded;
let values = (root_key.clone(), key_prefix);
let values = (root_key.clone(), vec![key_prefix[0]], key_prefix);
futures.push(Box::pin(async move {
session
.execute_single_page(prepared_statement, values, PagingState::start())
Expand All @@ -396,7 +457,7 @@ impl ScyllaDbClient {
}
Some(upper) => {
let prepared_statement = &self.write_batch_delete_prefix_bounded;
let values = (root_key.clone(), key_prefix, upper);
let values = (root_key.clone(), vec![key_prefix[0]], key_prefix, upper);
futures.push(Box::pin(async move {
session
.execute_single_page(prepared_statement, values, PagingState::start())
Expand All @@ -413,7 +474,7 @@ impl ScyllaDbClient {
for key in batch.simple_unordered_batch.deletions {
Self::check_key_size(&key)?;
let prepared_statement = &self.write_batch_deletion;
let values = (root_key.clone(), key);
let values = (root_key.clone(), vec![key[0]], key);
futures.push(Box::pin(async move {
session
.execute_single_page(prepared_statement, values, PagingState::start())
Expand All @@ -427,7 +488,7 @@ impl ScyllaDbClient {
Self::check_key_size(&key)?;
Self::check_value_size(&value)?;
let prepared_statement = &self.write_batch_insertion;
let values = (root_key.clone(), key, value);
let values = (root_key.clone(), vec![key[0]], key, value);
futures.push(Box::pin(async move {
session
.execute_single_page(prepared_statement, values, PagingState::start())
Expand All @@ -454,13 +515,18 @@ impl ScyllaDbClient {
let query_bounded = &self.find_keys_by_prefix_bounded;
let rows = match get_upper_bound_option(&key_prefix) {
None => {
let values = (root_key.to_vec(), key_prefix.clone());
let values = (root_key.to_vec(), vec![key_prefix[0]], key_prefix.clone());
session
.execute_iter(query_unbounded.clone(), values)
.await?
}
Some(upper_bound) => {
let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
let values = (
root_key.to_vec(),
vec![key_prefix[0]],
key_prefix.clone(),
upper_bound,
);
session.execute_iter(query_bounded.clone(), values).await?
}
};
Expand All @@ -487,13 +553,18 @@ impl ScyllaDbClient {
let query_bounded = &self.find_key_values_by_prefix_bounded;
let rows = match get_upper_bound_option(&key_prefix) {
None => {
let values = (root_key.to_vec(), key_prefix.clone());
let values = (root_key.to_vec(), vec![key_prefix[0]], key_prefix.clone());
session
.execute_iter(query_unbounded.clone(), values)
.await?
}
Some(upper_bound) => {
let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
let values = (
root_key.to_vec(),
vec![key_prefix[0]],
key_prefix.clone(),
upper_bound,
);
session.execute_iter(query_bounded.clone(), values).await?
}
};
Expand Down Expand Up @@ -910,9 +981,10 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal {
.prepare(format!(
"CREATE TABLE kv.{} (\
root_key blob, \
type_id blob, \
k blob, \
v blob, \
PRIMARY KEY (root_key, k) \
PRIMARY KEY ((root_key, type_id), k) \
) \
WITH compaction = {{ \
'class' : 'SizeTieredCompactionStrategy', \
Expand Down
Loading