diff --git a/linera-storage/src/db_storage.rs b/linera-storage/src/db_storage.rs index 15d6248a0d8..4120cc8a0f9 100644 --- a/linera-storage/src/db_storage.rs +++ b/linera-storage/src/db_storage.rs @@ -21,6 +21,7 @@ use linera_execution::{ }; use linera_views::{ backends::dual::{DualStoreRootKeyAssignment, StoreInUse}, + batch::Batch, context::ViewContext, store::{AdminKeyValueStore, KeyIterable as _, KeyValueStore}, views::View, @@ -222,26 +223,23 @@ pub mod metrics { }); } -#[derive(Default)] -struct Batch { - key_value_bytes: Vec<(Vec, Vec)>, -} +trait BatchExt { + fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError>; -impl Batch { - fn new() -> Self { - Self::default() - } + fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError>; - fn put_key_value_bytes(&mut self, key: Vec, value: Vec) { - self.key_value_bytes.push((key, value)); - } + fn add_certificate(&mut self, certificate: &ConfirmedBlockCertificate) + -> Result<(), ViewError>; - fn put_key_value(&mut self, key: Vec, value: &T) -> Result<(), ViewError> { - let bytes = bcs::to_bytes(value)?; - self.key_value_bytes.push((key, bytes)); - Ok(()) - } + fn add_event(&mut self, event_id: EventId, value: Vec) -> Result<(), ViewError>; + fn add_network_description( + &mut self, + information: &NetworkDescription, + ) -> Result<(), ViewError>; +} + +impl BatchExt for Batch { fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> { #[cfg(with_metrics)] metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc(); @@ -937,23 +935,8 @@ where Ok(Some(certificate)) } - async fn write_entry(store: &Store, key: Vec, bytes: Vec) -> Result<(), ViewError> { - let mut batch = linera_views::batch::Batch::new(); - batch.put_key_value_bytes(key, bytes); - store.write_batch(batch).await?; - Ok(()) - } - async fn write_batch(&self, batch: Batch) -> Result<(), ViewError> { - if batch.key_value_bytes.is_empty() { - return Ok(()); - } - let mut futures = Vec::new(); - for (key, bytes) in batch.key_value_bytes.into_iter() { - let store = self.store.clone(); - futures.push(async move { Self::write_entry(&store, key, bytes).await }); - } - futures::future::try_join_all(futures).await?; + self.store.write_batch(batch).await?; Ok(()) } diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 42eb83ce6fb..d2631a0d7dc 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -128,6 +128,7 @@ impl Default for ScyllaDbClientConfig { /// Map from partition_key to a map from keys to a list of their occurrences in the original vector. type OccurrencesMap = HashMap, HashMap, Vec>>; +type BatchAndValues = (Batch, Vec>>); /// The client for ScyllaDB: /// * The session allows to pass queries @@ -376,38 +377,20 @@ impl ScyllaDbClient { Ok(()) } - fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> { + fn check_batch_len(batch: &Batch) -> Result<(), ScyllaDbStoreInternalError> { ensure!( - batch.len() <= MAX_BATCH_SIZE, + batch.statements.len() <= MAX_BATCH_SIZE, ScyllaDbStoreInternalError::BatchTooLong ); Ok(()) } - fn check_batch_partition_key( - &self, - partition_key_prefix: &[u8], - key: &[u8], - batch_partition_key: &mut Option>, - ) -> Result<(), ScyllaDbStoreInternalError> { - let partition_key = self.get_partition_key(partition_key_prefix, key)?; - if let Some(batch_partition_key) = batch_partition_key { - ensure!( - *batch_partition_key == partition_key, - ScyllaDbStoreInternalError::MultiplePartitionKeysInBatch - ); - } else { - *batch_partition_key = Some(partition_key.to_vec()); - } - Ok(()) - } - - fn check_batch_and_partition_keys( + fn get_per_partition_batches( &self, partition_key_prefix: &[u8], exclusive_mode: bool, batch: &UnorderedBatch, - ) -> Result, ScyllaDbStoreInternalError> { + ) -> Result, BatchAndValues>, ScyllaDbStoreInternalError> { if !exclusive_mode { ensure!( batch.key_prefix_deletions.is_empty(), @@ -415,22 +398,54 @@ impl ScyllaDbClient { ); } - let mut batch_partition_key = None; + let mut batches = HashMap::new(); for key_prefix in &batch.key_prefix_deletions { - self.check_batch_partition_key( - partition_key_prefix, - key_prefix, - &mut batch_partition_key, - )?; + Self::check_key_size(key_prefix)?; + let partition_key = self.get_partition_key(partition_key_prefix, key_prefix)?; + + let (batch_query, batch_values) = batches + .entry(partition_key.clone()) + .or_insert((self.get_sticky_batch_query(&partition_key)?, Vec::new())); + + match get_upper_bound_option(key_prefix) { + None => { + batch_query.append_statement(self.write_batch_delete_prefix_unbounded.clone()); + batch_values.push(vec![partition_key.clone(), key_prefix.clone()]); + } + Some(upper_bound) => { + batch_query.append_statement(self.write_batch_delete_prefix_bounded.clone()); + batch_values.push(vec![partition_key.clone(), key_prefix.clone(), upper_bound]); + } + } } + for key in &batch.simple_unordered_batch.deletions { - self.check_batch_partition_key(partition_key_prefix, key, &mut batch_partition_key)?; + Self::check_key_size(key)?; + let partition_key = self.get_partition_key(partition_key_prefix, key)?; + let (batch_query, batch_values) = batches + .entry(partition_key.clone()) + .or_insert((self.get_sticky_batch_query(&partition_key)?, Vec::new())); + + batch_query.append_statement(self.write_batch_deletion.clone()); + batch_values.push(vec![partition_key.clone(), key.clone()]); + } + for (key, value) in &batch.simple_unordered_batch.insertions { + Self::check_key_size(key)?; + Self::check_value_size(value)?; + let partition_key = self.get_partition_key(partition_key_prefix, key)?; + let (batch_query, batch_values) = batches + .entry(partition_key.clone()) + .or_insert((self.get_sticky_batch_query(&partition_key)?, Vec::new())); + + batch_query.append_statement(self.write_batch_insertion.clone()); + batch_values.push(vec![partition_key.clone(), key.clone(), value.clone()]); } - for (key, _) in &batch.simple_unordered_batch.insertions { - self.check_batch_partition_key(partition_key_prefix, key, &mut batch_partition_key)?; + + for (batch, _) in batches.values() { + Self::check_batch_len(batch)?; } - batch_partition_key.ok_or(ScyllaDbStoreInternalError::NoPartitionKeyInBatch) + Ok(batches) } async fn read_value_internal( @@ -675,42 +690,15 @@ impl ScyllaDbClient { return Ok(()); } - Self::check_batch_len(&batch)?; - let partition_key = - self.check_batch_and_partition_keys(partition_key_prefix, exclusive_mode, &batch)?; - let session = &self.session; - let mut batch_query = self.get_sticky_batch_query(&partition_key)?; - let mut batch_values: Vec>> = Vec::new(); - - for key_prefix in batch.key_prefix_deletions { - // We'll be always on exclusive mode here, which check_batch_and_partition_keys - // guarantees. - Self::check_key_size(&key_prefix)?; - match get_upper_bound_option(&key_prefix) { - None => { - batch_query.append_statement(self.write_batch_delete_prefix_unbounded.clone()); - batch_values.push(vec![partition_key.clone(), key_prefix]); - } - Some(upper_bound) => { - batch_query.append_statement(self.write_batch_delete_prefix_bounded.clone()); - batch_values.push(vec![partition_key.clone(), key_prefix, upper_bound]); - } - } - } + let batches = + self.get_per_partition_batches(partition_key_prefix, exclusive_mode, &batch)?; - for key in batch.simple_unordered_batch.deletions { - Self::check_key_size(&key)?; - batch_query.append_statement(self.write_batch_deletion.clone()); - batch_values.push(vec![partition_key.clone(), key]); - } - for (key, value) in batch.simple_unordered_batch.insertions { - Self::check_key_size(&key)?; - Self::check_value_size(&value)?; - batch_query.append_statement(self.write_batch_insertion.clone()); - batch_values.push(vec![partition_key.clone(), key, value]); + let mut futures = Vec::new(); + for (_, (batch, batch_values)) in batches { + futures.push(async move { self.session.batch(&batch, batch_values).await }); } - session.batch(&batch_query, batch_values).await?; + try_join_all(futures).await?; Ok(()) }