Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ path = "src/bin/xtool.rs"
name = "chunk"
path = "examples/chunk/main.rs"

[[example]]
name = "parallel-chunk"
path = "examples/parallel-chunk/main.rs"

[[example]]
name = "hash"
path = "examples/hash/main.rs"
Expand Down Expand Up @@ -76,3 +80,4 @@ ctor = { workspace = true }
[features]
strict = []
expensive_tests = []
parallel-chunking = ["deduplication/parallel-chunking"]
61 changes: 61 additions & 0 deletions data/examples/parallel-chunk/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#[cfg(feature = "parallel-chunking")]
use std::fs::File;
#[cfg(feature = "parallel-chunking")]
use std::io::{BufWriter, Write};
use std::path::PathBuf;

use clap::Parser;
#[cfg(feature = "parallel-chunking")]
use deduplication::chunk_file_parallel;

#[derive(Debug, Parser)]
#[command(
version,
about,
long_about = "Parallel chunker with memory mapping and multi-threading. Requires --features parallel-chunking."
)]
struct ParallelChunkArgs {
/// Input file (required - stdin not supported in parallel mode)
#[arg(short, long)]
input: PathBuf,

/// Output file or uses stdout if not specified
#[arg(short, long)]
output: Option<PathBuf>,

/// Number of threads for parallel processing (0 = auto-detect)
#[arg(short, long, default_value = "0")]
threads: usize,
}

#[cfg(feature = "parallel-chunking")]
fn main() -> std::io::Result<()> {
let args = ParallelChunkArgs::parse();

// Process file with parallel implementation
let chunks = chunk_file_parallel(&args.input, Some(args.threads as u32))?;

// Setup output writer
let mut output: Box<dyn Write> = if let Some(save) = &args.output {
Box::new(BufWriter::new(File::create(save)?))
} else {
Box::new(std::io::stdout())
};

// Write results
for chunk in chunks {
output.write_all(format!("{} {}\n", chunk.hash, chunk.data.len()).as_bytes())?;
}

output.flush()?;

Ok(())
}

#[cfg(not(feature = "parallel-chunking"))]
fn main() -> std::io::Result<()> {
eprintln!("Error: parallel-chunk requires --features parallel-chunking");
eprintln!("Build with: cargo build --features parallel-chunking");
eprintln!("Or use the regular 'chunk' example for sequential processing.");
std::process::exit(1);
}
10 changes: 10 additions & 0 deletions deduplication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,15 @@ bytes = { workspace = true }
gearhash = { workspace = true }
more-asserts = { workspace = true }

# Parallel chunking dependencies (feature-gated)
crossbeam = { version = "0.8.4", optional = true }
memmap2 = { version = "0.9.8", optional = true }
num_cpus = { version = "1.16", optional = true }

[dev-dependencies]
rand = { workspace = true }
tempfile = { workspace = true }

[features]
default = []
parallel-chunking = ["dep:crossbeam", "dep:memmap2", "dep:num_cpus"]
34 changes: 34 additions & 0 deletions deduplication/src/chunking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,40 @@ impl Chunker {
}
}

/// Parallel chunking for files
///
/// Uses memory mapping and multi-threading for content-defined chunking.
/// Requires the parallel-chunking feature to be enabled.
///
/// # Arguments
/// * `file_path` - Path to the file to chunk
/// * `thread_count` - Number of threads (None = auto-detect)
///
/// # Returns
/// * `Ok(Vec<Chunk>)` - Successfully chunked file
/// * `Err` - File too small for parallel processing or feature not enabled
#[cfg(feature = "parallel-chunking")]
pub fn chunk_file_parallel<P: AsRef<std::path::Path>>(
&self,
file_path: P,
thread_count: Option<usize>,
) -> std::io::Result<Vec<Chunk>> {
crate::parallel_chunking::chunk_file_parallel(file_path, thread_count.map(|t| t as u32))
}

/// Fallback method when parallel chunking is not available
#[cfg(not(feature = "parallel-chunking"))]
pub fn chunk_file_parallel<P: AsRef<std::path::Path>>(
&self,
_file_path: P,
_thread_count: Option<usize>,
) -> std::io::Result<Vec<Chunk>> {
Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Parallel chunking not available. Enable with --features parallel-chunking",
))
}

/// Keeps chunking until no more chunks can be reliably produced, returning a
/// vector of the resulting chunks.
///
Expand Down
5 changes: 5 additions & 0 deletions deduplication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@ mod file_deduplication;
mod interface;
mod raw_xorb_data;

#[cfg(feature = "parallel-chunking")]
mod parallel_chunking;

pub use chunk::Chunk;
pub use chunking::{Chunker, find_partitions};
pub use data_aggregator::DataAggregator;
pub use dedup_metrics::DeduplicationMetrics;
pub use file_deduplication::FileDeduper;
pub use interface::DeduplicationDataInterface;
#[cfg(feature = "parallel-chunking")]
pub use parallel_chunking::chunk_file_parallel;
pub use raw_xorb_data::{RawXorbData, test_utils};
Loading
Loading