Skip to content

Commit 239bcee

Browse files
mbp-stripesourcefrog
authored andcommitted
Replace BlockDir existence cache with a complete list of blocks
Individually looking up blocks when we want to know about them causes a lot of round trips. Let's just eagerly load the whole list up front instead.
1 parent f3f95f9 commit 239bcee

File tree

8 files changed

+328
-246
lines changed

8 files changed

+328
-246
lines changed

src/archive.rs

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
1616
use std::collections::{HashMap, HashSet};
1717
use std::path::Path;
18-
use std::sync::Arc;
18+
use std::sync::{Arc, RwLockReadGuard};
1919

2020
use std::time::Instant;
2121

@@ -101,7 +101,7 @@ impl Archive {
101101
version: header.conserve_archive_version,
102102
});
103103
}
104-
let block_dir = Arc::new(BlockDir::open(transport.chdir(BLOCK_DIR)));
104+
let block_dir = Arc::new(BlockDir::open(transport.chdir(BLOCK_DIR)).await?);
105105
debug!(?header, "Opened archive");
106106
Ok(Archive {
107107
block_dir,
@@ -110,8 +110,8 @@ impl Archive {
110110
}
111111

112112
/// Return an unsorted list of all blocks in the archive.
113-
pub async fn all_blocks(&self, monitor: Arc<dyn Monitor>) -> Result<Vec<BlockHash>> {
114-
self.block_dir.blocks(monitor).await
113+
pub async fn all_blocks(&self) -> Result<RwLockReadGuard<'_, HashSet<BlockHash>>> {
114+
Ok(self.block_dir.blocks())
115115
}
116116

117117
pub async fn band_exists(&self, band_id: BandId) -> Result<bool> {
@@ -227,10 +227,10 @@ impl Archive {
227227
.await?;
228228
Ok(self
229229
.block_dir
230-
.blocks(monitor)
231-
.await?
232-
.into_iter()
230+
.blocks()
231+
.iter()
233232
.filter(move |h| !referenced.contains(h))
233+
.cloned()
234234
.collect())
235235
}
236236

@@ -266,12 +266,7 @@ impl Archive {
266266
debug!(referenced.len = referenced.len());
267267

268268
debug!("Find present blocks...");
269-
let present: HashSet<BlockHash> = self
270-
.block_dir
271-
.blocks(monitor.clone())
272-
.await?
273-
.into_iter()
274-
.collect();
269+
let present: HashSet<BlockHash> = self.block_dir.blocks().iter().cloned().collect();
275270
debug!(present.len = present.len());
276271

277272
debug!("Find unreferenced blocks...");
@@ -343,12 +338,8 @@ impl Archive {
343338
// content.
344339
debug!("List blocks...");
345340
// TODO: Check for unexpected files or directories in the blockdir.
346-
let present_blocks: HashSet<BlockHash> = self
347-
.block_dir
348-
.blocks(monitor.clone())
349-
.await?
350-
.into_iter()
351-
.collect();
341+
let present_blocks: HashSet<BlockHash> =
342+
self.block_dir.blocks().iter().cloned().collect();
352343
for hash in referenced_lens.keys() {
353344
if !present_blocks.contains(hash) {
354345
monitor.error(Error::BlockMissing { hash: hash.clone() })
@@ -490,7 +481,7 @@ mod test {
490481
.len(),
491482
0
492483
);
493-
assert_eq!(af.all_blocks(TestMonitor::arc()).await.unwrap().len(), 0);
484+
assert_eq!(af.all_blocks().await.unwrap().len(), 0);
494485
}
495486

496487
#[tokio::test]

src/backup.rs

Lines changed: 15 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,11 @@ impl BackupWriter {
287287
trace!(?apath, "Copying file");
288288
let result = if let Some(basis_entry) = basis_entry {
289289
if content_heuristically_unchanged(source_entry, basis_entry) {
290-
if all_blocks_present(&basis_entry.addrs, &self.block_dir, &monitor).await {
290+
if basis_entry
291+
.addrs
292+
.iter()
293+
.all(|addr| self.block_dir.contains(&addr.hash))
294+
{
291295
self.stats.unmodified_files += 1;
292296
let new_entry = IndexEntry {
293297
addrs: basis_entry.addrs.clone(),
@@ -302,7 +306,7 @@ impl BackupWriter {
302306
self.index_writer.push_entry(new_entry);
303307
return Ok(Some(change));
304308
} else {
305-
warn!(%apath, "Some referenced blocks are missing or truncated; file will be stored again");
309+
warn!(%apath, ?basis_entry.addrs, "Some referenced blocks are missing or truncated; file will be stored again");
306310
self.stats.modified_files += 1;
307311
self.stats.replaced_damaged_blocks += 1;
308312
Some(EntryChange::changed(basis_entry, source_entry))
@@ -365,23 +369,6 @@ impl BackupWriter {
365369
}
366370
}
367371

368-
async fn all_blocks_present(
369-
addresses: &[Address],
370-
block_dir: &BlockDir,
371-
monitor: &Arc<dyn Monitor>,
372-
) -> bool {
373-
for addr in addresses {
374-
if !block_dir
375-
.contains(&addr.hash, monitor.clone())
376-
.await
377-
.unwrap_or(false)
378-
{
379-
return false;
380-
}
381-
}
382-
true
383-
}
384-
385372
async fn store_file_content(
386373
apath: &Apath,
387374
from_file: &mut dyn Read,
@@ -911,10 +898,10 @@ mod test {
911898
);
912899
assert_eq!(
913900
archive
914-
.all_blocks(TestMonitor::arc())
901+
.all_blocks()
915902
.await
916903
.unwrap()
917-
.into_iter()
904+
.iter()
918905
.map(|h| h.to_string())
919906
.collect::<Vec<String>>(),
920907
vec![HELLO_HASH]
@@ -1248,7 +1235,7 @@ mod test {
12481235
assert_eq!(stats2.written_blocks, 1);
12491236
assert_eq!(stats2.combined_blocks, 1);
12501237

1251-
assert_eq!(af.all_blocks(TestMonitor::arc()).await.unwrap().len(), 2);
1238+
assert_eq!(af.all_blocks().await.unwrap().len(), 2);
12521239
}
12531240

12541241
#[tokio::test]
@@ -1472,15 +1459,14 @@ mod test {
14721459
);
14731460
let metadata_calls = recording.verb_paths(Verb::Metadata);
14741461
println!("metadata calls for second backup without modification: {metadata_calls:#?}");
1475-
// TODO: Really we should not get metadata for data blocks, we should just read the directory up front.
14761462
assert_eq!(
1477-
metadata_calls.len(),
1478-
3,
1479-
"with no modification, backup is expected to get metadata for data blocks"
1463+
metadata_calls,
1464+
["GC_LOCK", "b0000/BANDTAIL"],
1465+
"don't get metadata for data blocks"
14801466
);
14811467
assert!(metadata_calls
14821468
.iter()
1483-
.all(|c| c.ends_with("BANDTAIL") || c.ends_with("GC_LOCK") || c.starts_with("d/")));
1469+
.all(|c| c.ends_with("BANDTAIL") || c.ends_with("GC_LOCK")));
14841470

14851471
// Change one of the files, and in a new backup it should be recognized
14861472
// as unmodified.
@@ -1507,12 +1493,8 @@ mod test {
15071493
let metadata_calls = recording.verb_paths(Verb::Metadata);
15081494
println!("metadata calls for third backup after modification: {metadata_calls:#?}");
15091495
assert_eq!(
1510-
metadata_calls.len(),
1511-
4,
1512-
"with modification to one file, backup is expected to get metadata for head, tail, one new data block, and one unchanged data block: {metadata_calls:#?}"
1496+
metadata_calls, ["GC_LOCK", "b0001/BANDTAIL"],
1497+
"with modification to one file, backup is expected to get metadata for lock and previous band tail: {metadata_calls:#?}"
15131498
);
1514-
assert!(metadata_calls
1515-
.iter()
1516-
.all(|c| c.ends_with("BANDTAIL") || c.ends_with("GC_LOCK") || c.starts_with("d/")));
15171499
}
15181500
}

src/band.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -435,10 +435,7 @@ mod tests {
435435
.len(),
436436
0
437437
);
438-
assert_eq!(
439-
archive.all_blocks(TestMonitor::arc()).await.unwrap().len(),
440-
0
441-
);
438+
assert_eq!(archive.all_blocks().await.unwrap().len(), 0);
442439
}
443440

444441
#[tokio::test]

src/bin/conserve.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -371,8 +371,8 @@ impl Command {
371371
Command::Debug(Debug::Blocks { archive }) => {
372372
let mut bw = BufWriter::new(stdout);
373373
let archive = Archive::open(Transport::new(archive).await?).await?;
374-
let blocks = archive.all_blocks(monitor).await?;
375-
for hash in blocks {
374+
let blocks = archive.all_blocks().await?;
375+
for hash in blocks.iter() {
376376
writeln!(bw, "{hash}")?;
377377
}
378378
}
@@ -384,7 +384,7 @@ impl Command {
384384
let mut bw = BufWriter::new(stdout);
385385
let archive = Archive::open(Transport::new(archive).await?).await?;
386386
for hash in archive
387-
.referenced_blocks(&archive.list_band_ids().await?, monitor)
387+
.referenced_blocks(&archive.list_band_ids().await?, monitor.clone())
388388
.await?
389389
{
390390
writeln!(bw, "{hash}")?;

0 commit comments

Comments
 (0)