Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ globset = "0.4.5"
hex = "0.4.2"
itertools = "0.10"
lazy_static = "1.4.0"
lru = "0.11"
mutants = "0.0.3"
rayon = "1.3.0"
readahead-iterator = "0.1.1"
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- S3 support! Enable it with `cargo install --features s3`, then e.g. `cargo backup s3://mybucket.example/`.

- Performance: A simple cache of retrieved decompressed blocks now speeds up restores, especially on relatively slow storage like S3.

- `--debug` now shows on stderr only debug messages from Conserve itself and not
from dependencies. All the messages are still recorded to the `--log-json` file
if that is given.
Expand Down
59 changes: 55 additions & 4 deletions src/blockdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Instant;

use bytes::Bytes;
use lru::LruCache;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
#[allow(unused_imports)]
use tracing::{debug, error, info, warn};
use tracing::{instrument, trace};

use crate::backup::BackupStats;
use crate::blockhash::BlockHash;
Expand All @@ -46,6 +48,9 @@ const BLOCKDIR_FILE_NAME_LEN: usize = crate::BLAKE_HASH_SIZE_BYTES * 2;
/// Take this many characters from the block hash to form the subdirectory name.
const SUBDIR_NAME_CHARS: usize = 3;

/// Cache this many blocks in memory, of up to 1MB each.
const CACHE_SIZE: usize = 1000;

/// Points to some compressed data inside the block dir.
///
/// Identifiers are: which file contains it, at what (pre-compression) offset,
Expand All @@ -69,6 +74,8 @@ pub struct Address {
pub struct BlockDir {
transport: Arc<dyn Transport>,
pub stats: BlockDirStats,
// TODO: There are fancier caches and they might help, but this one works, and Stretto did not work for me.
cache: RwLock<LruCache<BlockHash, Bytes>>,
}

/// Returns the transport-relative subdirectory name.
Expand All @@ -87,6 +94,7 @@ impl BlockDir {
BlockDir {
transport,
stats: BlockDirStats::default(),
cache: RwLock::new(LruCache::new(CACHE_SIZE.try_into().unwrap())),
}
}

Expand All @@ -111,6 +119,10 @@ impl BlockDir {
return Ok(hash);
}
let compressed = Compressor::new().compress(&block_data)?;
self.cache
.write()
.expect("Lock cache")
.put(hash.clone(), block_data);
let comp_len: u64 = compressed.len().try_into().unwrap();
let hex_hash = hash.to_string();
let relpath = block_relpath(&hash);
Expand All @@ -131,6 +143,10 @@ impl BlockDir {
/// So, these are specifically treated as missing, so there's a chance to heal
/// them later.
pub fn contains(&self, hash: &BlockHash) -> Result<bool> {
if self.cache.read().expect("Lock cache").contains(hash) {
self.stats.cache_hit.fetch_add(1, Relaxed);
return Ok(true);
}
match self.transport.metadata(&block_relpath(hash)) {
Err(err) if err.is_not_found() => Ok(false),
Err(err) => {
Expand Down Expand Up @@ -165,10 +181,13 @@ impl BlockDir {
/// Return the entire contents of the block.
///
/// Checks that the hash is correct with the contents.
#[instrument(skip(self))]
pub fn get_block_content(&self, hash: &BlockHash) -> Result<Bytes> {
// TODO: Reuse decompressor buffer.
// TODO: Most importantly, cache decompressed blocks!
// TODO: Stats for block reads, maybe in the blockdir?
if let Some(hit) = self.cache.write().expect("Lock cache").get(hash) {
self.stats.cache_hit.fetch_add(1, Relaxed);
trace!("Block cache hit");
return Ok(hit.clone());
}
let mut decompressor = Decompressor::new();
let block_relpath = block_relpath(hash);
let compressed_bytes = self.transport.read_file(&block_relpath)?;
Expand All @@ -178,6 +197,10 @@ impl BlockDir {
error!(%hash, %actual_hash, %block_relpath, "Block file has wrong hash");
return Err(Error::BlockCorrupt { hash: hash.clone() });
}
self.cache
.write()
.expect("Lock cache")
.put(hash.clone(), decompressed_bytes.clone());
self.stats.read_blocks.fetch_add(1, Relaxed);
self.stats
.read_block_compressed_bytes
Expand All @@ -189,6 +212,7 @@ impl BlockDir {
}

pub fn delete_block(&self, hash: &BlockHash) -> Result<()> {
self.cache.write().expect("Lock cache").pop(hash);
self.transport
.remove_file(&block_relpath(hash))
.map_err(Error::from)
Expand Down Expand Up @@ -290,6 +314,7 @@ pub struct BlockDirStats {
pub read_blocks: AtomicUsize,
pub read_block_compressed_bytes: AtomicUsize,
pub read_block_uncompressed_bytes: AtomicUsize,
pub cache_hit: AtomicUsize,
}

#[cfg(test)]
Expand All @@ -309,6 +334,9 @@ mod test {
.store_or_deduplicate(Bytes::from("stuff"), &mut stats)
.unwrap();
assert!(blockdir.contains(&hash).unwrap());

// Open again to get a fresh cache
let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap());
OpenOptions::new()
.write(true)
.truncate(true)
Expand All @@ -317,4 +345,27 @@ mod test {
.expect("Truncate block");
assert!(!blockdir.contains(&hash).unwrap());
}

#[test]
fn cache_hit() {
let tempdir = TempDir::new().unwrap();
let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap());
let mut stats = BackupStats::default();
let content = Bytes::from("stuff");
let hash = blockdir
.store_or_deduplicate(content.clone(), &mut stats)
.unwrap();
assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 0);

assert!(blockdir.contains(&hash).unwrap());
assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 1);

let retrieved = blockdir.get_block_content(&hash).unwrap();
assert_eq!(content, retrieved);
assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 2); // hit against the value written

let retrieved = blockdir.get_block_content(&hash).unwrap();
assert_eq!(content, retrieved);
assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 3); // hit again
}
}
5 changes: 4 additions & 1 deletion src/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ use std::fs::File;
use std::io;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering::Relaxed;
use std::{fs, time::Instant};

use filetime::set_file_handle_times;
#[cfg(unix)]
use filetime::set_symlink_file_times;
use time::OffsetDateTime;
use tracing::{error, instrument, warn};
use tracing::{error, instrument, trace, warn};

use crate::band::BandSelectionPolicy;
use crate::io::{directory_is_empty, ensure_dir_exists};
Expand Down Expand Up @@ -148,6 +149,7 @@ pub fn restore(
}
stats += apply_deferrals(&deferrals)?;
stats.elapsed = start.elapsed();
stats.block_cache_hits = block_dir.stats.cache_hit.load(Relaxed);
// TODO: Merge in stats from the tree iter and maybe the source tree?
Ok(stats)
}
Expand Down Expand Up @@ -244,6 +246,7 @@ fn restore_file(
stats.errors += 1;
}
// TODO: Accumulate more stats.
trace!("Restored file");
Ok(stats)
}

Expand Down
5 changes: 5 additions & 0 deletions src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ pub struct RestoreStats {
pub uncompressed_file_bytes: u64,

pub elapsed: Duration,

pub block_cache_hits: usize,
}

impl fmt::Display for RestoreStats {
Expand All @@ -122,6 +124,9 @@ impl fmt::Display for RestoreStats {
write_count(w, "unsupported file kind", self.unknown_kind);
writeln!(w).unwrap();

write_count(w, "block cache hits", self.block_cache_hits);
writeln!(w).unwrap();

write_count(w, "errors", self.errors);
write_duration(w, "elapsed", self.elapsed)?;

Expand Down
6 changes: 6 additions & 0 deletions tests/damage/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,14 @@ fn backup_after_damage(
let backup_options = BackupOptions::default();
backup(&archive, source_dir.path(), &backup_options).expect("initial backup");

drop(archive);
action.damage(&location.to_path(&archive_dir));

// Open the archive again to avoid cache effects.
let archive =
Archive::open(conserve::transport::open_local_transport(archive_dir.path()).unwrap())
.expect("open archive");

// A second backup should succeed.
changes.apply(&source_dir);
let backup_stats = backup(&archive, source_dir.path(), &backup_options)
Expand Down
Loading