Skip to content

Commit f3fee69

Browse files
pipelined extraction
- initial sketch of lexicographic trie for pipelining - move path splitting into a submodule - lex trie can now propagate entry data - outline handle allocation - mostly handle files - mostly handle dirs - clarify symlink FIXMEs - do symlink validation - extract writable dir setting to helper method - modify args to handle allocation method - handle allocation test passes - simplify perms a lot - outline evaluation - handle symlinks - BIGGER CHANGE! add EntryReader/etc - make initial pipelined extract work - fix file perms by writing them after finishing the file write - support directory entries by unix mode as well - impl split extraction - remove dependency on reader refactoring - add dead_code to methods we don't use yet - bzip2 support needed for benchmark test - correctly handle backslashes in entry names (i.e. don't) - make PathSplitError avoid consing a String until necessary - add repro_old423 test for pipelining - silence dead code warnings for windows - avoid erroring for top-level directory entries - use num_cpus by default for parallelism - we spawn three threads per chunk - add dynamically-generated test archive - initialize the test archives exactly once in statics - add benchmarks for dynamic and static test data - use lazy_static - add FIXME for follow-up work to support absolute paths - impl From<DirEntry<...>> for FSEntry - move handle_creation module to a separate file - downgrade HandleCreationError to io::Error - use ByAddress over ZipDataHandle - replace unsafe transmutes with Pod methods - add note about shared future dependency task DAG - box each level of the b-tree together with its values this may technically reduce heap fragmentation, but since this data structure only exists temporarily, that's probably not too important. instead, this change just reduces the amount of coercion and unboxing we need to do
1 parent 72cce40 commit f3fee69

File tree

12 files changed

+2558
-6
lines changed

12 files changed

+2558
-6
lines changed

Cargo.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,24 @@ time = { version = "0.3.37", default-features = false }
2828

2929
[dependencies]
3030
aes = { version = "0.8", optional = true }
31+
by_address = { version = "1.2.1", optional = true }
3132
bzip2 = { version = "0.6.0", optional = true }
3233
chrono = { version = "^0.4.27", optional = true }
3334
constant_time_eq = { version = "0.3.1", optional = true }
3435
crc32fast = "1.4"
36+
displaydoc = "0.2.5"
3537
flate2 = { version = "1.1.1", default-features = false, optional = true }
3638
getrandom = { version = "0.3.1", features = ["std"], optional = true }
3739
hmac = { version = "0.12", optional = true, features = ["reset"] }
3840
indexmap = "2"
3941
jiff = { version = "0.2.4", optional = true }
4042
memchr = "2.7"
4143
nt-time = { version = "0.10.6", default-features = false, optional = true }
44+
num_cpus = { version = "1.16", optional = true }
4245
ppmd-rust = { version = "1.2", optional = true }
4346
pbkdf2 = { version = "0.12", optional = true }
4447
sha1 = { version = "0.10", optional = true }
48+
thiserror = "2"
4549
time = { workspace = true, optional = true, features = [
4650
"std",
4751
] }
@@ -52,6 +56,10 @@ deflate64 = { version = "0.1.9", optional = true }
5256
lzma-rust2 = { version = "0.13", optional = true, default-features = false, features = ["std", "encoder", "optimization", "xz"] }
5357
bitstream-io = { version = "4.5.0", optional = true }
5458

59+
60+
[target.'cfg(unix)'.dependencies]
61+
libc = { version = "0.2.155", optional = true }
62+
5563
[target.'cfg(fuzzing)'.dependencies]
5664
arbitrary = { version = "1.4.1", features = ["derive"] }
5765

@@ -62,7 +70,9 @@ walkdir = "2.5"
6270
time = { workspace = true, features = ["formatting", "macros"] }
6371
anyhow = "1.0.95"
6472
clap = { version = "=4.4.18", features = ["derive"] }
73+
tempdir = "0.3.7"
6574
tempfile = "3.15"
75+
num_cpus = "1"
6676

6777
[features]
6878
aes-crypto = ["dep:aes", "dep:constant_time_eq", "hmac", "pbkdf2", "sha1", "getrandom", "zeroize"]
@@ -86,6 +96,7 @@ unreserved = []
8696
xz = ["dep:lzma-rust2"]
8797
xz-static = ["lzma"]
8898
legacy-zip = ["bitstream-io"]
99+
parallelism = ["libc", "num_cpus", "by_address"]
89100
default = [
90101
"aes-crypto",
91102
"bzip2",
@@ -109,3 +120,7 @@ harness = false
109120
[[bench]]
110121
name = "merge_archive"
111122
harness = false
123+
124+
[[bench]]
125+
name = "extract"
126+
harness = false

benches/extract.rs

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
use bencher::{benchmark_group, benchmark_main};
2+
3+
use bencher::Bencher;
4+
use tempdir::TempDir;
5+
use tempfile::tempfile;
6+
7+
use std::fs;
8+
use std::path::Path;
9+
use std::sync::{LazyLock, Mutex};
10+
11+
use zip::result::ZipResult;
12+
use zip::write::ZipWriter;
13+
use zip::ZipArchive;
14+
15+
#[cfg(all(feature = "parallelism", unix))]
16+
use zip::read::{split_extract, ExtractionParameters};
17+
18+
/* This archive has a set of entries repeated 20x:
19+
* - 200K random data, stored uncompressed (CompressionMethod::Stored)
20+
* - 246K text data (the project gutenberg html version of king lear)
21+
* (CompressionMethod::Bzip2, compression level 1) (project gutenberg ebooks are public domain)
22+
*
23+
* The full archive file is 5.3MB.
24+
*/
25+
fn static_test_archive() -> ZipResult<ZipArchive<fs::File>> {
26+
assert!(
27+
cfg!(feature = "bzip2"),
28+
"this test archive requires bzip2 support"
29+
);
30+
let path =
31+
Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/data/stored-and-compressed-text.zip");
32+
let file = fs::File::open(path)?;
33+
ZipArchive::new(file)
34+
}
35+
36+
static STATIC_TEST_ARCHIVE: LazyLock<Mutex<ZipArchive<fs::File>>> = LazyLock::new(|| {
37+
let archive = static_test_archive().unwrap();
38+
Mutex::new(archive)
39+
});
40+
41+
/* This archive is generated dynamically, in order to scale with the number of reported CPUs.
42+
* - We want at least 768 files (4 per VCPU on EC2 *.48xlarge instances) to run in CI.
43+
* - We want to retain the interspersed random/text entries from static_test_archive().
44+
*
45+
* We will copy over entries from the static archive repeatedly until we reach the desired file
46+
* count.
47+
*/
48+
fn dynamic_test_archive(src_archive: &mut ZipArchive<fs::File>) -> ZipResult<ZipArchive<fs::File>> {
49+
let desired_num_entries: usize = num_cpus::get() * 4;
50+
let mut output_archive = ZipWriter::new(tempfile()?);
51+
52+
for (src_index, output_index) in (0..src_archive.len()).cycle().zip(0..desired_num_entries) {
53+
let src_file = src_archive.by_index_raw(src_index)?;
54+
let output_name = if src_file.name().starts_with("random-") {
55+
format!("random-{output_index}.dat")
56+
} else {
57+
assert!(src_file.name().starts_with("text-"));
58+
format!("text-{output_index}.dat")
59+
};
60+
output_archive.raw_copy_file_rename(src_file, output_name)?;
61+
}
62+
63+
output_archive.finish_into_readable()
64+
}
65+
66+
static DYNAMIC_TEST_ARCHIVE: LazyLock<Mutex<ZipArchive<fs::File>>> = LazyLock::new(|| {
67+
let mut src = STATIC_TEST_ARCHIVE.lock().unwrap();
68+
let archive = dynamic_test_archive(&mut src).unwrap();
69+
Mutex::new(archive)
70+
});
71+
72+
fn do_extract_basic(bench: &mut Bencher, archive: &mut ZipArchive<fs::File>) {
73+
let total_size: u64 = archive.decompressed_size().unwrap().try_into().unwrap();
74+
75+
let parent = TempDir::new("zip-extract").unwrap();
76+
77+
bench.bytes = total_size;
78+
bench.bench_n(1, |bench| {
79+
bench.iter(move || {
80+
let outdir = TempDir::new_in(parent.path(), "bench-subdir")
81+
.unwrap()
82+
.into_path();
83+
archive.extract(outdir).unwrap();
84+
});
85+
});
86+
}
87+
88+
fn extract_basic_static(bench: &mut Bencher) {
89+
let mut archive = STATIC_TEST_ARCHIVE.lock().unwrap();
90+
do_extract_basic(bench, &mut archive);
91+
}
92+
93+
fn extract_basic_dynamic(bench: &mut Bencher) {
94+
let mut archive = DYNAMIC_TEST_ARCHIVE.lock().unwrap();
95+
do_extract_basic(bench, &mut archive);
96+
}
97+
98+
#[cfg(all(feature = "parallelism", unix))]
99+
fn do_extract_split(bench: &mut Bencher, archive: &ZipArchive<fs::File>) {
100+
let total_size: u64 = archive.decompressed_size().unwrap().try_into().unwrap();
101+
102+
let params = ExtractionParameters {
103+
decompression_threads: num_cpus::get() / 3,
104+
..Default::default()
105+
};
106+
107+
let parent = TempDir::new("zip-extract").unwrap();
108+
109+
bench.bytes = total_size;
110+
bench.bench_n(1, |bench| {
111+
bench.iter(move || {
112+
let outdir = TempDir::new_in(parent.path(), "bench-subdir")
113+
.unwrap()
114+
.into_path();
115+
split_extract(archive, &outdir, params.clone()).unwrap();
116+
});
117+
});
118+
}
119+
120+
#[cfg(all(feature = "parallelism", unix))]
121+
fn extract_split_static(bench: &mut Bencher) {
122+
let archive = STATIC_TEST_ARCHIVE.lock().unwrap();
123+
do_extract_split(bench, &archive);
124+
}
125+
126+
#[cfg(all(feature = "parallelism", unix))]
127+
fn extract_split_dynamic(bench: &mut Bencher) {
128+
let archive = DYNAMIC_TEST_ARCHIVE.lock().unwrap();
129+
do_extract_split(bench, &archive);
130+
}
131+
132+
#[cfg(not(all(feature = "parallelism", unix)))]
133+
benchmark_group!(benches, extract_basic_static, extract_basic_dynamic);
134+
135+
#[cfg(all(feature = "parallelism", unix))]
136+
benchmark_group!(
137+
benches,
138+
extract_basic_static,
139+
extract_basic_dynamic,
140+
extract_split_static,
141+
extract_split_dynamic
142+
);
143+
144+
benchmark_main!(benches);

benches/read_metadata.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ fn parse_large_non_zip(bench: &mut Bencher) {
123123
let dir = TempDir::with_prefix("large-non-zip-bench").unwrap();
124124
let file = dir.path().join("zeros");
125125
let buf = vec![0u8; FILE_SIZE];
126-
fs::write(&file, &buf).unwrap();
126+
fs::write(&file, buf).unwrap();
127127

128128
bench.iter(|| {
129129
assert!(zip::ZipArchive::new(std::fs::File::open(&file).unwrap()).is_err());

src/read.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ pub(crate) mod stream;
3737

3838
pub(crate) mod magic_finder;
3939

40+
#[cfg(feature = "parallelism")]
41+
pub(crate) mod handle_creation;
42+
#[cfg(feature = "parallelism")]
43+
pub(crate) mod pipelining;
44+
#[cfg(all(unix, feature = "parallelism"))]
45+
pub use pipelining::split_extraction::{split_extract, ExtractionParameters, SplitExtractionError};
46+
#[cfg(feature = "parallelism")]
47+
pub(crate) mod split;
48+
4049
// Put the struct declaration in a private module to convince rustdoc to display ZipArchive nicely
4150
pub(crate) mod zip_archive {
4251
use indexmap::IndexMap;

0 commit comments

Comments
 (0)