Skip to content

Commit 04e3af5

Browse files
committed
Scylla test changes
1 parent f1df5d1 commit 04e3af5

File tree

2 files changed

+67
-97
lines changed

2 files changed

+67
-97
lines changed

linera-storage/src/db_storage.rs

Lines changed: 15 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use linera_execution::{
2121
};
2222
use linera_views::{
2323
backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
24+
batch::Batch,
2425
context::ViewContext,
2526
store::{AdminKeyValueStore, KeyIterable as _, KeyValueStore},
2627
views::View,
@@ -222,26 +223,23 @@ pub mod metrics {
222223
});
223224
}
224225

225-
#[derive(Default)]
226-
struct Batch {
227-
key_value_bytes: Vec<(Vec<u8>, Vec<u8>)>,
228-
}
226+
trait BatchExt {
227+
fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError>;
229228

230-
impl Batch {
231-
fn new() -> Self {
232-
Self::default()
233-
}
229+
fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError>;
234230

235-
fn put_key_value_bytes(&mut self, key: Vec<u8>, value: Vec<u8>) {
236-
self.key_value_bytes.push((key, value));
237-
}
231+
fn add_certificate(&mut self, certificate: &ConfirmedBlockCertificate)
232+
-> Result<(), ViewError>;
238233

239-
fn put_key_value<T: Serialize>(&mut self, key: Vec<u8>, value: &T) -> Result<(), ViewError> {
240-
let bytes = bcs::to_bytes(value)?;
241-
self.key_value_bytes.push((key, bytes));
242-
Ok(())
243-
}
234+
fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError>;
244235

236+
fn add_network_description(
237+
&mut self,
238+
information: &NetworkDescription,
239+
) -> Result<(), ViewError>;
240+
}
241+
242+
impl BatchExt for Batch {
245243
fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> {
246244
#[cfg(with_metrics)]
247245
metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
@@ -937,23 +935,8 @@ where
937935
Ok(Some(certificate))
938936
}
939937

940-
async fn write_entry(store: &Store, key: Vec<u8>, bytes: Vec<u8>) -> Result<(), ViewError> {
941-
let mut batch = linera_views::batch::Batch::new();
942-
batch.put_key_value_bytes(key, bytes);
943-
store.write_batch(batch).await?;
944-
Ok(())
945-
}
946-
947938
async fn write_batch(&self, batch: Batch) -> Result<(), ViewError> {
948-
if batch.key_value_bytes.is_empty() {
949-
return Ok(());
950-
}
951-
let mut futures = Vec::new();
952-
for (key, bytes) in batch.key_value_bytes.into_iter() {
953-
let store = self.store.clone();
954-
futures.push(async move { Self::write_entry(&store, key, bytes).await });
955-
}
956-
futures::future::try_join_all(futures).await?;
939+
self.store.write_batch(batch).await?;
957940
Ok(())
958941
}
959942

linera-views/src/backends/scylla_db.rs

Lines changed: 52 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -376,61 +376,75 @@ impl ScyllaDbClient {
376376
Ok(())
377377
}
378378

379-
fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
379+
fn check_batch_len(batch: &Batch) -> Result<(), ScyllaDbStoreInternalError> {
380380
ensure!(
381-
batch.len() <= MAX_BATCH_SIZE,
381+
batch.statements.len() <= MAX_BATCH_SIZE,
382382
ScyllaDbStoreInternalError::BatchTooLong
383383
);
384384
Ok(())
385385
}
386386

387-
fn check_batch_partition_key(
388-
&self,
389-
partition_key_prefix: &[u8],
390-
key: &[u8],
391-
batch_partition_key: &mut Option<Vec<u8>>,
392-
) -> Result<(), ScyllaDbStoreInternalError> {
393-
let partition_key = self.get_partition_key(partition_key_prefix, key)?;
394-
if let Some(batch_partition_key) = batch_partition_key {
395-
ensure!(
396-
*batch_partition_key == partition_key,
397-
ScyllaDbStoreInternalError::MultiplePartitionKeysInBatch
398-
);
399-
} else {
400-
*batch_partition_key = Some(partition_key.to_vec());
401-
}
402-
Ok(())
403-
}
404-
405-
fn check_batch_and_partition_keys(
387+
fn get_per_partition_batches(
406388
&self,
407389
partition_key_prefix: &[u8],
408390
exclusive_mode: bool,
409391
batch: &UnorderedBatch,
410-
) -> Result<Vec<u8>, ScyllaDbStoreInternalError> {
392+
) -> Result<HashMap<Vec<u8>, (Batch, Vec<Vec<Vec<u8>>>)>, ScyllaDbStoreInternalError> {
411393
if !exclusive_mode {
412394
ensure!(
413395
batch.key_prefix_deletions.is_empty(),
414396
ScyllaDbStoreInternalError::PrefixDeletionsNotAllowedInNonExclusiveMode
415397
);
416398
}
417399

418-
let mut batch_partition_key = None;
400+
let mut batches = HashMap::new();
419401
for key_prefix in &batch.key_prefix_deletions {
420-
self.check_batch_partition_key(
421-
partition_key_prefix,
422-
key_prefix,
423-
&mut batch_partition_key,
424-
)?;
402+
Self::check_key_size(&key_prefix)?;
403+
let partition_key = self.get_partition_key(partition_key_prefix, &key_prefix)?;
404+
405+
let (batch_query, batch_values) = batches
406+
.entry(partition_key.clone())
407+
.or_insert((self.get_sticky_batch_query(&partition_key)?, Vec::new()));
408+
409+
match get_upper_bound_option(&key_prefix) {
410+
None => {
411+
batch_query.append_statement(self.write_batch_delete_prefix_unbounded.clone());
412+
batch_values.push(vec![partition_key.clone(), key_prefix.clone()]);
413+
}
414+
Some(upper_bound) => {
415+
batch_query.append_statement(self.write_batch_delete_prefix_bounded.clone());
416+
batch_values.push(vec![partition_key.clone(), key_prefix.clone(), upper_bound]);
417+
}
418+
}
425419
}
420+
426421
for key in &batch.simple_unordered_batch.deletions {
427-
self.check_batch_partition_key(partition_key_prefix, key, &mut batch_partition_key)?;
422+
Self::check_key_size(&key)?;
423+
let partition_key = self.get_partition_key(partition_key_prefix, &key)?;
424+
let (batch_query, batch_values) = batches
425+
.entry(partition_key.clone())
426+
.or_insert((self.get_sticky_batch_query(&partition_key)?, Vec::new()));
427+
428+
batch_query.append_statement(self.write_batch_deletion.clone());
429+
batch_values.push(vec![partition_key.clone(), key.clone()]);
430+
}
431+
for (key, value) in &batch.simple_unordered_batch.insertions {
432+
Self::check_key_size(&key)?;
433+
Self::check_value_size(&value)?;
434+
let partition_key = self.get_partition_key(partition_key_prefix, &key)?;
435+
let (batch_query, batch_values) = batches
436+
.entry(partition_key.clone())
437+
.or_insert((self.get_sticky_batch_query(&partition_key)?, Vec::new()));
438+
439+
batch_query.append_statement(self.write_batch_insertion.clone());
440+
batch_values.push(vec![partition_key.clone(), key.clone(), value.clone()]);
428441
}
429-
for (key, _) in &batch.simple_unordered_batch.insertions {
430-
self.check_batch_partition_key(partition_key_prefix, key, &mut batch_partition_key)?;
442+
443+
for (batch, _) in batches.values() {
444+
Self::check_batch_len(batch)?;
431445
}
432446

433-
batch_partition_key.ok_or(ScyllaDbStoreInternalError::NoPartitionKeyInBatch)
447+
Ok(batches)
434448
}
435449

436450
async fn read_value_internal(
@@ -675,42 +689,15 @@ impl ScyllaDbClient {
675689
return Ok(());
676690
}
677691

678-
Self::check_batch_len(&batch)?;
679-
let partition_key =
680-
self.check_batch_and_partition_keys(partition_key_prefix, exclusive_mode, &batch)?;
681-
let session = &self.session;
682-
let mut batch_query = self.get_sticky_batch_query(&partition_key)?;
683-
let mut batch_values: Vec<Vec<Vec<u8>>> = Vec::new();
692+
let batches =
693+
self.get_per_partition_batches(partition_key_prefix, exclusive_mode, &batch)?;
684694

685-
for key_prefix in batch.key_prefix_deletions {
686-
// We'll be always on exclusive mode here, which check_batch_and_partition_keys
687-
// guarantees.
688-
Self::check_key_size(&key_prefix)?;
689-
match get_upper_bound_option(&key_prefix) {
690-
None => {
691-
batch_query.append_statement(self.write_batch_delete_prefix_unbounded.clone());
692-
batch_values.push(vec![partition_key.clone(), key_prefix]);
693-
}
694-
Some(upper_bound) => {
695-
batch_query.append_statement(self.write_batch_delete_prefix_bounded.clone());
696-
batch_values.push(vec![partition_key.clone(), key_prefix, upper_bound]);
697-
}
698-
}
699-
}
700-
701-
for key in batch.simple_unordered_batch.deletions {
702-
Self::check_key_size(&key)?;
703-
batch_query.append_statement(self.write_batch_deletion.clone());
704-
batch_values.push(vec![partition_key.clone(), key]);
705-
}
706-
for (key, value) in batch.simple_unordered_batch.insertions {
707-
Self::check_key_size(&key)?;
708-
Self::check_value_size(&value)?;
709-
batch_query.append_statement(self.write_batch_insertion.clone());
710-
batch_values.push(vec![partition_key.clone(), key, value]);
695+
let mut futures = Vec::new();
696+
for (_, (batch, batch_values)) in batches {
697+
futures.push(async move { self.session.batch(&batch, batch_values).await });
711698
}
712699

713-
session.batch(&batch_query, batch_values).await?;
700+
try_join_all(futures).await?;
714701
Ok(())
715702
}
716703

0 commit comments

Comments
 (0)