Skip to content

Commit aef9fbb

Browse files
committed
Parallel download for snapshots
1 parent 31bc1e1 commit aef9fbb

File tree

4 files changed

+85
-43
lines changed

4 files changed

+85
-43
lines changed

crates/sui-core/src/authority/authority_store.rs

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -636,17 +636,51 @@ impl AuthorityStore {
636636
Ok(())
637637
}
638638

639-
pub fn bulk_insert_live_objects(
640-
perpetual_db: &AuthorityPerpetualTables,
641-
live_objects: impl Iterator<Item = LiveObject>,
639+
pub async fn bulk_insert_live_objects(
640+
perpetual_db: Arc<AuthorityPerpetualTables>,
641+
objects: Vec<LiveObject>,
642642
expected_sha3_digest: &[u8; 32],
643643
) -> SuiResult<()> {
644+
// Verify SHA3 over the full object set before inserting.
644645
let mut hasher = Sha3_256::default();
646+
for object in &objects {
647+
hasher.update(object.object_reference().2.inner());
648+
}
649+
let sha3_digest = hasher.finalize().digest;
650+
if *expected_sha3_digest != sha3_digest {
651+
error!(
652+
"Sha does not match! expected: {:?}, actual: {:?}",
653+
expected_sha3_digest, sha3_digest
654+
);
655+
return Err(SuiError::from("Sha does not match"));
656+
}
657+
658+
const NUM_PARALLEL_CHUNKS: usize = 8;
659+
let chunk_size = ((objects.len() + NUM_PARALLEL_CHUNKS - 1) / NUM_PARALLEL_CHUNKS).max(1);
660+
let mut remaining = objects;
661+
let mut handles = Vec::new();
662+
while !remaining.is_empty() {
663+
let take = chunk_size.min(remaining.len());
664+
let chunk: Vec<LiveObject> = remaining.drain(..take).collect();
665+
let db = perpetual_db.clone();
666+
handles.push(tokio::task::spawn_blocking(move || {
667+
Self::insert_objects_chunk(db, chunk)
668+
}));
669+
}
670+
for handle in handles {
671+
handle.await.expect("insert task panicked")?;
672+
}
673+
Ok(())
674+
}
675+
676+
fn insert_objects_chunk(
677+
perpetual_db: Arc<AuthorityPerpetualTables>,
678+
objects: Vec<LiveObject>,
679+
) -> SuiResult<()> {
645680
let mut batch = perpetual_db.objects.batch();
646681
let mut written = 0usize;
647682
const MAX_BATCH_SIZE: usize = 100_000;
648-
for object in live_objects {
649-
hasher.update(object.object_reference().2.inner());
683+
for object in objects {
650684
match object {
651685
LiveObject::Normal(object) => {
652686
let store_object_wrapper = get_store_object(object.clone());
@@ -683,14 +717,6 @@ impl AuthorityStore {
683717
written = 0;
684718
}
685719
}
686-
let sha3_digest = hasher.finalize().digest;
687-
if *expected_sha3_digest != sha3_digest {
688-
error!(
689-
"Sha does not match! expected: {:?}, actual: {:?}",
690-
expected_sha3_digest, sha3_digest
691-
);
692-
return Err(SuiError::from("Sha does not match"));
693-
}
694720
batch.write()?;
695721
Ok(())
696722
}

crates/sui-snapshot/src/reader.rs

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ impl StateSnapshotReaderV1 {
294294

295295
pub async fn read(
296296
&mut self,
297-
perpetual_db: &AuthorityPerpetualTables,
297+
perpetual_db: Arc<AuthorityPerpetualTables>,
298298
abort_registration: AbortRegistration,
299299
sender: Option<tokio::sync::mpsc::Sender<(GlobalStateHash, u64)>>,
300300
) -> Result<()> {
@@ -306,7 +306,7 @@ impl StateSnapshotReaderV1 {
306306
let (sha3_digests, num_part_files) = self.compute_checksum().await?;
307307
let accum_handle =
308308
sender.map(|sender| self.spawn_accumulation_tasks(sender, num_part_files));
309-
self.sync_live_objects(perpetual_db, abort_registration, sha3_digests)
309+
self.sync_live_objects(perpetual_db.clone(), abort_registration, sha3_digests)
310310
.await?;
311311
if let Some(handle) = accum_handle {
312312
handle.await?;
@@ -479,7 +479,7 @@ impl StateSnapshotReaderV1 {
479479

480480
async fn sync_live_objects(
481481
&self,
482-
perpetual_db: &AuthorityPerpetualTables,
482+
perpetual_db: Arc<AuthorityPerpetualTables>,
483483
abort_registration: AbortRegistration,
484484
sha3_digests: Arc<Mutex<DigestByBucketAndPartition>>,
485485
) -> Result<(), anyhow::Error> {
@@ -507,7 +507,7 @@ impl StateSnapshotReaderV1 {
507507
);
508508
let obj_progress_bar_clone = obj_progress_bar.clone();
509509
let instant = Instant::now();
510-
let downloaded_bytes = AtomicUsize::new(0);
510+
let downloaded_bytes = Arc::new(AtomicUsize::new(0));
511511

512512
let ret = Abortable::new(
513513
async move {
@@ -538,25 +538,29 @@ impl StateSnapshotReaderV1 {
538538
.boxed()
539539
.buffer_unordered(concurrency)
540540
.try_for_each(|(bytes, file_metadata, sha3_digest)| {
541-
let bytes_len = bytes.len();
542-
let result: Result<(), anyhow::Error> =
543-
LiveObjectIter::new(&file_metadata, bytes).map(|obj_iter| {
544-
AuthorityStore::bulk_insert_live_objects(
545-
perpetual_db,
546-
obj_iter,
547-
&sha3_digest,
548-
)
549-
.expect("Failed to insert live objects");
550-
});
551-
downloaded_bytes.fetch_add(bytes_len, Ordering::Relaxed);
552-
obj_progress_bar_clone.inc(1);
553-
obj_progress_bar_clone.set_message(format!(
554-
"Download speed: {} MiB/s",
555-
downloaded_bytes.load(Ordering::Relaxed) as f64
556-
/ (1024 * 1024) as f64
557-
/ instant.elapsed().as_secs_f64(),
558-
));
559-
futures::future::ready(result)
541+
let perpetual_db = perpetual_db.clone();
542+
let obj_progress_bar_clone = obj_progress_bar_clone.clone();
543+
let downloaded_bytes = downloaded_bytes.clone();
544+
async move {
545+
let bytes_len = bytes.len();
546+
let objects: Vec<LiveObject> =
547+
LiveObjectIter::new(&file_metadata, bytes)?.collect();
548+
AuthorityStore::bulk_insert_live_objects(
549+
perpetual_db,
550+
objects,
551+
&sha3_digest,
552+
)
553+
.await?;
554+
downloaded_bytes.fetch_add(bytes_len, Ordering::Relaxed);
555+
obj_progress_bar_clone.inc(1);
556+
obj_progress_bar_clone.set_message(format!(
557+
"Download speed: {} MiB/s",
558+
downloaded_bytes.load(Ordering::Relaxed) as f64
559+
/ (1024 * 1024) as f64
560+
/ instant.elapsed().as_secs_f64(),
561+
));
562+
Ok(())
563+
}
560564
})
561565
.await
562566
},

crates/sui-snapshot/src/tests.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,14 @@ async fn test_snapshot_basic() -> Result<(), anyhow::Error> {
120120
3, // max_retries
121121
)
122122
.await?;
123-
let restored_perpetual_db = AuthorityPerpetualTables::open(&restored_db_path, None, None);
123+
let restored_perpetual_db = Arc::new(AuthorityPerpetualTables::open(
124+
&restored_db_path,
125+
None,
126+
None,
127+
));
124128
let (_abort_handle, abort_registration) = AbortHandle::new_pair();
125129
snapshot_reader
126-
.read(&restored_perpetual_db, abort_registration, None)
130+
.read(restored_perpetual_db.clone(), abort_registration, None)
127131
.await?;
128132
compare_live_objects(&perpetual_db, &restored_perpetual_db, true)?;
129133
Ok(())
@@ -176,10 +180,14 @@ async fn test_snapshot_empty_db() -> Result<(), anyhow::Error> {
176180
3, // max_retries
177181
)
178182
.await?;
179-
let restored_perpetual_db = AuthorityPerpetualTables::open(&restored_db_path, None, None);
183+
let restored_perpetual_db = Arc::new(AuthorityPerpetualTables::open(
184+
&restored_db_path,
185+
None,
186+
None,
187+
));
180188
let (_abort_handle, abort_registration) = AbortHandle::new_pair();
181189
snapshot_reader
182-
.read(&restored_perpetual_db, abort_registration, None)
190+
.read(restored_perpetual_db.clone(), abort_registration, None)
183191
.await?;
184192
compare_live_objects(
185193
&perpetual_db,
@@ -303,10 +311,14 @@ async fn test_snapshot_restore_from_archive() -> Result<(), anyhow::Error> {
303311
3, // max_retries
304312
)
305313
.await?;
306-
let restored_perpetual_db = AuthorityPerpetualTables::open(&restored_db_path, None, None);
314+
let restored_perpetual_db = Arc::new(AuthorityPerpetualTables::open(
315+
&restored_db_path,
316+
None,
317+
None,
318+
));
307319
let (_abort_handle, abort_registration) = AbortHandle::new_pair();
308320
snapshot_reader
309-
.read(&restored_perpetual_db, abort_registration, None)
321+
.read(restored_perpetual_db.clone(), abort_registration, None)
310322
.await?;
311323
compare_live_objects(&perpetual_db, &restored_perpetual_db, true)?;
312324
Ok(())

crates/sui-tool/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,7 @@ pub async fn download_formal_snapshot(
925925
.await
926926
.unwrap_or_else(|err| panic!("Failed to create reader: {}", err));
927927
reader
928-
.read(&perpetual_db_clone, abort_registration, Some(sender))
928+
.read(perpetual_db_clone.clone(), abort_registration, Some(sender))
929929
.await
930930
.unwrap_or_else(|err| panic!("Failed during read: {}", err));
931931
info!("Snapshot download complete");

0 commit comments

Comments
 (0)