Skip to content

Commit 1be9170

Browse files
authored
Parallelize batches for blobs in linera-storage (#4050)
## Motivation We're going to introduce partitioning in ScyllaDb and cross-partition batches won't be supported. ## Proposal Create parallel batches of one element. Note: This PR was meant to be minimal but we can definitely clean up things later. ## Test Plan CI ## Release Plan - Nothing to do / These changes follow the usual release cycle.
1 parent dbeb3e3 commit 1be9170

File tree

1 file changed

+29
-15
lines changed

1 file changed

+29
-15
lines changed

linera-storage/src/db_storage.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use linera_execution::{
2121
};
2222
use linera_views::{
2323
backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
24-
batch::Batch,
2524
context::ViewContext,
2625
store::{AdminKeyValueStore, KeyIterable as _, KeyValueStore},
2726
views::View,
@@ -223,23 +222,26 @@ pub mod metrics {
223222
});
224223
}
225224

226-
trait BatchExt {
227-
fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError>;
228-
229-
fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError>;
225+
#[derive(Default)]
226+
struct Batch {
227+
key_value_bytes: Vec<(Vec<u8>, Vec<u8>)>,
228+
}
230229

231-
fn add_certificate(&mut self, certificate: &ConfirmedBlockCertificate)
232-
-> Result<(), ViewError>;
230+
impl Batch {
231+
fn new() -> Self {
232+
Self::default()
233+
}
233234

234-
fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError>;
235+
fn put_key_value_bytes(&mut self, key: Vec<u8>, value: Vec<u8>) {
236+
self.key_value_bytes.push((key, value));
237+
}
235238

236-
fn add_network_description(
237-
&mut self,
238-
information: &NetworkDescription,
239-
) -> Result<(), ViewError>;
240-
}
239+
fn put_key_value<T: Serialize>(&mut self, key: Vec<u8>, value: &T) -> Result<(), ViewError> {
240+
let bytes = bcs::to_bytes(value)?;
241+
self.key_value_bytes.push((key, bytes));
242+
Ok(())
243+
}
241244

242-
impl BatchExt for Batch {
243245
fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> {
244246
#[cfg(with_metrics)]
245247
metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
@@ -987,8 +989,20 @@ where
987989
Ok(certificate)
988990
}
989991

992+
async fn write_entry(store: &Store, key: Vec<u8>, bytes: Vec<u8>) -> Result<(), ViewError> {
993+
let mut batch = linera_views::batch::Batch::new();
994+
batch.put_key_value_bytes(key, bytes);
995+
store.write_batch(batch).await?;
996+
Ok(())
997+
}
998+
990999
async fn write_batch(&self, batch: Batch) -> Result<(), ViewError> {
991-
self.store.write_batch(batch).await?;
1000+
let mut futures = Vec::new();
1001+
for (key, bytes) in batch.key_value_bytes.into_iter() {
1002+
let store = self.store.clone();
1003+
futures.push(async move { Self::write_entry(&store, key, bytes).await });
1004+
}
1005+
futures::future::try_join_all(futures).await?;
9921006
Ok(())
9931007
}
9941008

0 commit comments

Comments
 (0)