Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 15 additions & 32 deletions linera-storage/src/db_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use linera_execution::{
};
use linera_views::{
backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
batch::Batch,
context::ViewContext,
store::{AdminKeyValueStore, KeyIterable as _, KeyValueStore},
views::View,
Expand Down Expand Up @@ -222,26 +223,23 @@ pub mod metrics {
});
}

#[derive(Default)]
struct Batch {
key_value_bytes: Vec<(Vec<u8>, Vec<u8>)>,
}
trait BatchExt {
fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError>;

impl Batch {
fn new() -> Self {
Self::default()
}
fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError>;

fn put_key_value_bytes(&mut self, key: Vec<u8>, value: Vec<u8>) {
self.key_value_bytes.push((key, value));
}
fn add_certificate(&mut self, certificate: &ConfirmedBlockCertificate)
-> Result<(), ViewError>;

fn put_key_value<T: Serialize>(&mut self, key: Vec<u8>, value: &T) -> Result<(), ViewError> {
let bytes = bcs::to_bytes(value)?;
self.key_value_bytes.push((key, bytes));
Ok(())
}
fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError>;

fn add_network_description(
&mut self,
information: &NetworkDescription,
) -> Result<(), ViewError>;
}

impl BatchExt for Batch {
fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> {
#[cfg(with_metrics)]
metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
Expand Down Expand Up @@ -937,23 +935,8 @@ where
Ok(Some(certificate))
}

async fn write_entry(store: &Store, key: Vec<u8>, bytes: Vec<u8>) -> Result<(), ViewError> {
let mut batch = linera_views::batch::Batch::new();
batch.put_key_value_bytes(key, bytes);
store.write_batch(batch).await?;
Ok(())
}

async fn write_batch(&self, batch: Batch) -> Result<(), ViewError> {
if batch.key_value_bytes.is_empty() {
return Ok(());
}
let mut futures = Vec::new();
for (key, bytes) in batch.key_value_bytes.into_iter() {
let store = self.store.clone();
futures.push(async move { Self::write_entry(&store, key, bytes).await });
}
futures::future::try_join_all(futures).await?;
self.store.write_batch(batch).await?;
Ok(())
}

Expand Down
118 changes: 53 additions & 65 deletions linera-views/src/backends/scylla_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl Default for ScyllaDbClientConfig {

/// Map from partition_key to a map from keys to a list of their occurrences in the original vector.
type OccurrencesMap = HashMap<Vec<u8>, HashMap<Vec<u8>, Vec<usize>>>;
type BatchAndValues = (Batch, Vec<Vec<Vec<u8>>>);

/// The client for ScyllaDB:
/// * The session allows to pass queries
Expand Down Expand Up @@ -376,61 +377,75 @@ impl ScyllaDbClient {
Ok(())
}

fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
fn check_batch_len(batch: &Batch) -> Result<(), ScyllaDbStoreInternalError> {
ensure!(
batch.len() <= MAX_BATCH_SIZE,
batch.statements.len() <= MAX_BATCH_SIZE,
ScyllaDbStoreInternalError::BatchTooLong
);
Ok(())
}

fn check_batch_partition_key(
&self,
partition_key_prefix: &[u8],
key: &[u8],
batch_partition_key: &mut Option<Vec<u8>>,
) -> Result<(), ScyllaDbStoreInternalError> {
let partition_key = self.get_partition_key(partition_key_prefix, key)?;
if let Some(batch_partition_key) = batch_partition_key {
ensure!(
*batch_partition_key == partition_key,
ScyllaDbStoreInternalError::MultiplePartitionKeysInBatch
);
} else {
*batch_partition_key = Some(partition_key.to_vec());
}
Ok(())
}

fn check_batch_and_partition_keys(
fn get_per_partition_batches(
&self,
partition_key_prefix: &[u8],
exclusive_mode: bool,
batch: &UnorderedBatch,
) -> Result<Vec<u8>, ScyllaDbStoreInternalError> {
) -> Result<HashMap<Vec<u8>, BatchAndValues>, ScyllaDbStoreInternalError> {
if !exclusive_mode {
ensure!(
batch.key_prefix_deletions.is_empty(),
ScyllaDbStoreInternalError::PrefixDeletionsNotAllowedInNonExclusiveMode
);
}

let mut batch_partition_key = None;
let mut batches = HashMap::new();
for key_prefix in &batch.key_prefix_deletions {
self.check_batch_partition_key(
partition_key_prefix,
key_prefix,
&mut batch_partition_key,
)?;
Self::check_key_size(key_prefix)?;
let partition_key = self.get_partition_key(partition_key_prefix, key_prefix)?;

let (batch_query, batch_values) = batches
.entry(partition_key.clone())
.or_insert((self.get_sticky_batch_query(&partition_key)?, Vec::new()));

match get_upper_bound_option(key_prefix) {
None => {
batch_query.append_statement(self.write_batch_delete_prefix_unbounded.clone());
batch_values.push(vec![partition_key.clone(), key_prefix.clone()]);
}
Some(upper_bound) => {
batch_query.append_statement(self.write_batch_delete_prefix_bounded.clone());
batch_values.push(vec![partition_key.clone(), key_prefix.clone(), upper_bound]);
}
}
}

for key in &batch.simple_unordered_batch.deletions {
self.check_batch_partition_key(partition_key_prefix, key, &mut batch_partition_key)?;
Self::check_key_size(key)?;
let partition_key = self.get_partition_key(partition_key_prefix, key)?;
let (batch_query, batch_values) = batches
.entry(partition_key.clone())
.or_insert((self.get_sticky_batch_query(&partition_key)?, Vec::new()));

batch_query.append_statement(self.write_batch_deletion.clone());
batch_values.push(vec![partition_key.clone(), key.clone()]);
}
for (key, value) in &batch.simple_unordered_batch.insertions {
Self::check_key_size(key)?;
Self::check_value_size(value)?;
let partition_key = self.get_partition_key(partition_key_prefix, key)?;
let (batch_query, batch_values) = batches
.entry(partition_key.clone())
.or_insert((self.get_sticky_batch_query(&partition_key)?, Vec::new()));

batch_query.append_statement(self.write_batch_insertion.clone());
batch_values.push(vec![partition_key.clone(), key.clone(), value.clone()]);
}
for (key, _) in &batch.simple_unordered_batch.insertions {
self.check_batch_partition_key(partition_key_prefix, key, &mut batch_partition_key)?;

for (batch, _) in batches.values() {
Self::check_batch_len(batch)?;
}

batch_partition_key.ok_or(ScyllaDbStoreInternalError::NoPartitionKeyInBatch)
Ok(batches)
}

async fn read_value_internal(
Expand Down Expand Up @@ -675,42 +690,15 @@ impl ScyllaDbClient {
return Ok(());
}

Self::check_batch_len(&batch)?;
let partition_key =
self.check_batch_and_partition_keys(partition_key_prefix, exclusive_mode, &batch)?;
let session = &self.session;
let mut batch_query = self.get_sticky_batch_query(&partition_key)?;
let mut batch_values: Vec<Vec<Vec<u8>>> = Vec::new();

for key_prefix in batch.key_prefix_deletions {
// We'll be always on exclusive mode here, which check_batch_and_partition_keys
// guarantees.
Self::check_key_size(&key_prefix)?;
match get_upper_bound_option(&key_prefix) {
None => {
batch_query.append_statement(self.write_batch_delete_prefix_unbounded.clone());
batch_values.push(vec![partition_key.clone(), key_prefix]);
}
Some(upper_bound) => {
batch_query.append_statement(self.write_batch_delete_prefix_bounded.clone());
batch_values.push(vec![partition_key.clone(), key_prefix, upper_bound]);
}
}
}
let batches =
self.get_per_partition_batches(partition_key_prefix, exclusive_mode, &batch)?;

for key in batch.simple_unordered_batch.deletions {
Self::check_key_size(&key)?;
batch_query.append_statement(self.write_batch_deletion.clone());
batch_values.push(vec![partition_key.clone(), key]);
}
for (key, value) in batch.simple_unordered_batch.insertions {
Self::check_key_size(&key)?;
Self::check_value_size(&value)?;
batch_query.append_statement(self.write_batch_insertion.clone());
batch_values.push(vec![partition_key.clone(), key, value]);
let mut futures = Vec::new();
for (_, (batch, batch_values)) in batches {
futures.push(async move { self.session.batch(&batch, batch_values).await });
}

session.batch(&batch_query, batch_values).await?;
try_join_all(futures).await?;
Ok(())
}

Expand Down
Loading