diff --git a/Cargo.toml b/Cargo.toml index 2b1efcee4..3258974cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ time = { version = "0.3.36", default-features = false } [dependencies] aes = { version = "0.8.4", optional = true } +bytes = "1.5.0" bzip2 = { version = "0.4.4", optional = true } chrono = { version = "0.4.38", optional = true } constant_time_eq = { version = "0.3.0", optional = true } @@ -32,8 +33,10 @@ flate2 = { version = "1.0.28", default-features = false, optional = true } indexmap = "2" hmac = { version = "0.12.1", optional = true, features = ["reset"] } pbkdf2 = { version = "0.12.2", optional = true } +rayon = { version = "1.7.0" } rand = { version = "0.8.5", optional = true } sha1 = { version = "0.10.6", optional = true } +tempfile = "3.8.0" thiserror = "1.0.48" time = { workspace = true, optional = true, features = [ "std", @@ -92,6 +95,10 @@ harness = false name = "read_metadata" harness = false +[[bench]] +name = "extract_pipelined" +harness = false + [[bench]] name = "merge_archive" harness = false diff --git a/benches/extract_pipelined.rs b/benches/extract_pipelined.rs new file mode 100644 index 000000000..0abffde71 --- /dev/null +++ b/benches/extract_pipelined.rs @@ -0,0 +1,151 @@ +use bencher::{benchmark_group, benchmark_main}; + +use std::io::{Cursor, Read, Seek, Write}; +use std::path::{Path, PathBuf}; + +use bencher::Bencher; +use getrandom::getrandom; +use once_cell::sync::Lazy; +use tempfile::tempdir; +use zip::{read::IntermediateFile, result::ZipResult, write::FileOptions, ZipArchive, ZipWriter}; + +fn generate_random_archive( + num_entries: usize, + entry_size: usize, + options: FileOptions, +) -> ZipResult> { + let buf = Cursor::new(Vec::new()); + let mut zip = ZipWriter::new(buf); + + let mut bytes = vec![0u8; entry_size]; + for i in 0..num_entries { + let name = format!("random{}.dat", i); + zip.start_file(name, options)?; + getrandom(&mut bytes).unwrap(); + zip.write_all(&bytes)?; + } + + let buf = zip.finish()?.into_inner(); + + Ok(buf) +} + +static BIG_ARCHIVE_PATH: Lazy = + Lazy::new(|| Path::new(env!("CARGO_MANIFEST_DIR")).join("benches/target.zip")); + +fn get_archive(path: impl AsRef) -> ZipResult<(u64, ZipArchive)> { + let f = IntermediateFile::from_path(path)?; + let len = f.len(); + let archive = ZipArchive::new(f)?; + Ok((len as u64, archive)) +} + +static SMALL_ARCHIVE_PATH: Lazy = + Lazy::new(|| Path::new(env!("CARGO_MANIFEST_DIR")).join("benches/small-target.zip")); + +fn get_big_archive() -> ZipResult<(u64, ZipArchive)> { + get_archive(&*BIG_ARCHIVE_PATH) +} + +fn get_small_archive() -> ZipResult<(u64, ZipArchive)> { + get_archive(&*SMALL_ARCHIVE_PATH) +} + +fn perform_pipelined>( + src: ZipArchive, + target: P, +) -> ZipResult<()> { + src.extract_pipelined(target) +} + +fn perform_sync>( + mut src: ZipArchive, + target: P, +) -> ZipResult<()> { + src.extract(target) +} + +const NUM_ENTRIES: usize = 1_000; +const ENTRY_SIZE: usize = 10_000; + +fn extract_pipelined_random(bench: &mut Bencher) { + let options = FileOptions::default().compression_method(zip::CompressionMethod::Deflated); + let src = generate_random_archive(NUM_ENTRIES, ENTRY_SIZE, options).unwrap(); + bench.bytes = src.len() as u64; + let src = ZipArchive::new(IntermediateFile::from_bytes(&src)).unwrap(); + + bench.iter(|| { + let td = tempdir().unwrap(); + perform_pipelined(src.clone(), td).unwrap(); + }); +} + +fn extract_sync_random(bench: &mut Bencher) { + let options = FileOptions::default().compression_method(zip::CompressionMethod::Deflated); + let src = generate_random_archive(NUM_ENTRIES, ENTRY_SIZE, options).unwrap(); + bench.bytes = src.len() as u64; + + bench.iter(|| { + let td = tempdir().unwrap(); + perform_sync::>, Cursor>, _>( + ZipArchive::new(Cursor::new(src.clone())).unwrap(), + td, + ) + .unwrap(); + }); +} + +fn extract_pipelined_compressible_big(bench: &mut Bencher) { + let (len, src) = get_big_archive().unwrap(); + bench.bytes = len; + + bench.bench_n(2, |_| ()); + bench.iter(|| { + let td = tempdir().unwrap(); + perform_pipelined(src.clone(), td).unwrap(); + }); +} + +fn extract_sync_compressible_big(bench: &mut Bencher) { + let (len, src) = get_big_archive().unwrap(); + bench.bytes = len; + + bench.bench_n(2, |_| ()); + bench.iter(|| { + let td = tempdir().unwrap(); + perform_sync::<_, IntermediateFile, _>(src.clone(), td).unwrap(); + }); +} + +fn extract_pipelined_compressible_small(bench: &mut Bencher) { + let (len, src) = get_small_archive().unwrap(); + bench.bytes = len; + + bench.bench_n(100, |_| ()); + bench.iter(|| { + let td = tempdir().unwrap(); + perform_pipelined(src.clone(), td).unwrap(); + }); +} + +fn extract_sync_compressible_small(bench: &mut Bencher) { + let (len, src) = get_small_archive().unwrap(); + bench.bytes = len; + + bench.bench_n(100, |_| ()); + bench.iter(|| { + let td = tempdir().unwrap(); + perform_sync::<_, IntermediateFile, _>(src.clone(), td).unwrap(); + }); +} + +benchmark_group!( + benches, + extract_pipelined_random, + extract_sync_random, + extract_pipelined_compressible_big, + extract_sync_compressible_big, + extract_pipelined_compressible_small, + extract_sync_compressible_small, +); +benchmark_main!(benches); diff --git a/benches/small-target.zip b/benches/small-target.zip new file mode 100644 index 000000000..c25cd6635 Binary files /dev/null and b/benches/small-target.zip differ diff --git a/benches/target.zip b/benches/target.zip new file mode 100644 index 000000000..f7835cb33 Binary files /dev/null and b/benches/target.zip differ diff --git a/src/read.rs b/src/read.rs index 053292ff7..cc228fe71 100644 --- a/src/read.rs +++ b/src/read.rs @@ -12,11 +12,16 @@ use crate::spec; use crate::types::{AesMode, AesVendorVersion, DateTime, System, ZipFileData}; use crate::zipcrypto::{ZipCryptoReader, ZipCryptoReaderValid, ZipCryptoValidator}; use indexmap::IndexMap; -use std::borrow::Cow; +use std::borrow::{Borrow, Cow}; +use std::cell::UnsafeCell; +use std::cmp; +use std::collections::HashSet; +use std::fmt; +use std::fs; use std::io::{self, copy, prelude::*, sink}; use std::ops::Deref; use std::path::{Path, PathBuf}; -use std::sync::{Arc, OnceLock}; +use std::sync::{Arc, RwLock}; #[cfg(any( feature = "deflate", @@ -135,6 +140,60 @@ impl<'a> CryptoReader<'a> { } } +enum ZipEntry<'a, R: Read + 'a> { + Stored(R), + #[cfg(any( + feature = "deflate", + feature = "deflate-miniz", + feature = "deflate-zlib" + ))] + Deflated(flate2::read::DeflateDecoder), + #[cfg(feature = "bzip2")] + Bzip2(BzDecoder), + #[cfg(feature = "zstd")] + Zstd(ZstdDecoder<'a, io::BufReader>), +} + +impl<'a, R: Read + 'a> ZipEntry<'a, R> { + pub fn from_data(data: &'a ZipFileData, source_handle: R) -> Self { + match data.compression_method { + CompressionMethod::Stored => Self::Stored(source_handle), + #[cfg(any( + feature = "deflate", + feature = "deflate-miniz", + feature = "deflate-zlib" + ))] + CompressionMethod::Deflated => Self::Deflated(DeflateDecoder::new(source_handle)), + #[cfg(feature = "bzip2")] + CompressionMethod::Bzip2 => Self::Bzip2(BzDecoder::new(source_handle)), + #[cfg(feature = "zstd")] + CompressionMethod::Zstd => { + let zstd_reader = ZstdDecoder::new(source_handle).unwrap(); + Self::Zstd(zstd_reader) + } + _ => panic!("Compression method not supported"), + } + } +} + +impl<'a, R: Read + 'a> Read for ZipEntry<'a, R> { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match self { + Self::Stored(r) => r.read(buf), + #[cfg(any( + feature = "deflate", + feature = "deflate-miniz", + feature = "deflate-zlib" + ))] + Self::Deflated(r) => r.read(buf), + #[cfg(feature = "bzip2")] + Self::Bzip2(r) => r.read(buf), + #[cfg(feature = "zstd")] + Self::Zstd(r) => r.read(buf), + } + } +} + pub(crate) enum ZipFileReader<'a> { NoReader, Raw(io::Take<&'a mut dyn Read>), @@ -660,8 +719,6 @@ impl ZipArchive { /// Extraction is not atomic; If an error is encountered, some of the files /// may be left on disk. pub fn extract>(&mut self, directory: P) -> ZipResult<()> { - use std::fs; - for i in 0..self.len() { let mut file = self.by_index(i)?; let filepath = file @@ -860,6 +917,568 @@ impl ZipArchive { } } +#[derive(Debug, Clone)] +struct CompletedPaths { + seen: HashSet, +} + +impl CompletedPaths { + pub fn new() -> Self { + Self { + seen: HashSet::new(), + } + } + + pub fn contains(&self, path: impl AsRef) -> bool { + self.seen.contains(path.as_ref()) + } + + pub fn containing_dirs<'a>( + path: &'a (impl AsRef + ?Sized), + ) -> impl Iterator { + let is_dir = path.as_ref().to_string_lossy().ends_with('/'); + path.as_ref() + .ancestors() + .inspect(|p| { + if p == &Path::new("/") { + unreachable!("did not expect absolute paths") + } + }) + .filter_map(move |p| { + if &p == &path.as_ref() { + if is_dir { + Some(p) + } else { + None + } + } else if p == Path::new("") { + None + } else { + Some(p) + } + }) + } + + pub fn new_containing_dirs_needed<'a>( + &self, + path: &'a (impl AsRef + ?Sized), + ) -> Vec<&'a Path> { + let mut ret: Vec<_> = Self::containing_dirs(path) + /* Assuming we are given ancestors in order from child to parent. */ + .take_while(|p| !self.contains(p)) + .collect(); + /* Get dirs in order from parent to child. */ + ret.reverse(); + ret + } + + pub fn write_dirs<'a>(&mut self, paths: &[&'a Path]) { + for path in paths.iter() { + if !self.contains(path) { + self.seen.insert(path.to_path_buf()); + } + } + } +} + +#[derive(Debug)] +#[allow(missing_docs)] +pub enum IntermediateFile { + Immediate(Arc>>, usize), + Paging(UnsafeCell, PathBuf, usize), +} + +unsafe impl Sync for IntermediateFile {} + +impl fmt::Display for IntermediateFile { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let len = self.len(); + match self { + Self::Immediate(arc, pos) => match str::from_utf8(arc.read().unwrap().as_ref()) { + Ok(s) => write!(f, "Immediate(@{})[{}](\"{}\")", pos, s.len(), s), + Err(_) => write!(f, "Immediate[{}]()", len), + /* Err(_) => write!( */ + /* f, */ + /* "Immediate(@{})[{}]( = \"{}\")", */ + /* pos, */ + /* arc.read().unwrap().len(), */ + /* String::from_utf8_lossy(arc.read().unwrap().as_ref()), */ + /* ), */ + }, + Self::Paging(_, path, len) => write!(f, "Paging[{}]({})", len, path.display()), + } + } +} + +impl IntermediateFile { + #[allow(missing_docs)] + pub fn len(&self) -> usize { + match self { + Self::Immediate(arc, _) => arc.read().unwrap().len(), + Self::Paging(_, _, len) => *len, + } + } + #[allow(missing_docs)] + pub fn tell(&self) -> io::Result { + match self { + Self::Immediate(_, pos) => Ok(*pos as u64), + Self::Paging(f, _, _) => { + let f: &mut fs::File = unsafe { &mut *f.get() }; + Ok(f.stream_position()?) + } + } + } + #[allow(missing_docs)] + pub fn immediate(len: usize) -> Self { + Self::Immediate(Arc::new(RwLock::new(vec![0; len].into_boxed_slice())), 0) + } + #[allow(missing_docs)] + pub fn paging(len: usize) -> io::Result { + let f = tempfile::NamedTempFile::with_prefix("intermediate")?; + let (mut f, path) = f.keep().unwrap(); + f.set_len(len as u64)?; + f.rewind()?; + Ok(Self::Paging(UnsafeCell::new(f), path, len)) + } + #[allow(missing_docs)] + pub fn from_path(path: impl AsRef) -> io::Result { + let mut f = fs::File::open(path.as_ref())?; + let len = f.seek(io::SeekFrom::End(0))?; + f.rewind()?; + Ok(Self::Paging( + UnsafeCell::new(f), + path.as_ref().to_path_buf(), + len as usize, + )) + } + #[allow(missing_docs)] + pub fn from_bytes(bytes: &[u8]) -> Self { + Self::Immediate(Arc::new(RwLock::new(bytes.into())), 0) + } + #[allow(missing_docs)] + pub fn remove_backing_file(&mut self) -> io::Result<()> { + match self { + Self::Immediate(_, _) => Ok(()), + Self::Paging(_, path, _) => fs::remove_file(path), + } + } +} + +impl Clone for IntermediateFile { + fn clone(&self) -> Self { + let pos = self.tell().unwrap(); + /* eprintln!("cloning! {}", &self); */ + match self { + Self::Immediate(arc, pos) => Self::Immediate(arc.clone(), *pos), + Self::Paging(_, path, len) => { + /* let prev_f: &mut fs::File = unsafe { &mut *prev_f.get() }; */ + /* prev_f.sync_data().unwrap(); */ + let mut new_f = fs::OpenOptions::new().read(true).open(&path).unwrap(); + new_f.seek(io::SeekFrom::Start(pos)).unwrap(); + Self::Paging(UnsafeCell::new(new_f), path.clone(), *len) + } + } + } +} + +impl io::Read for IntermediateFile { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match self { + Self::Immediate(arc, pos) => { + let beg = *pos; + let full_len = arc.read().unwrap().as_ref().len(); + assert!(full_len >= beg); + let end = cmp::min(beg + buf.len(), full_len); + let src = &arc.read().unwrap()[beg..end]; + let cur_len = src.len(); + buf[..cur_len].copy_from_slice(src); + *pos += cur_len; + Ok(cur_len) + } + Self::Paging(file, _, _) => file.get_mut().read(buf), + } + } +} + +impl io::Seek for IntermediateFile { + fn seek(&mut self, pos_arg: io::SeekFrom) -> io::Result { + let len = self.len(); + match self { + Self::Immediate(_, pos) => { + match pos_arg { + io::SeekFrom::Start(s) => { + *pos = s as usize; + } + io::SeekFrom::End(from_end) => { + *pos = ((len as isize) + from_end as isize) as usize; + } + io::SeekFrom::Current(from_cur) => { + *pos = ((*pos as isize) + from_cur as isize) as usize; + } + }; + Ok(*pos as u64) + } + Self::Paging(file, _, _) => file.get_mut().seek(pos_arg), + } + } +} + +impl io::Write for IntermediateFile { + fn write(&mut self, buf: &[u8]) -> io::Result { + let full_len = self.len(); + match self { + Self::Immediate(arc, pos) => { + let beg = *pos; + assert!(beg <= full_len); + let end = cmp::min(beg + buf.len(), full_len); + let cur_len = end - beg; + arc.write().unwrap()[beg..end].copy_from_slice(&buf[..cur_len]); + *pos += cur_len; + Ok(cur_len) + } + Self::Paging(file, _, _) => file.get_mut().write(buf), + } + } + + fn flush(&mut self) -> io::Result<()> { + match self { + Self::Immediate(_, _) => Ok(()), + Self::Paging(file, _, _) => file.get_mut().flush(), + } + } +} + +static NUM_CPUS: OnceLock = OnceLock::new(); + +fn build_thread_pool(n: Option, prefix: &str) -> rayon::ThreadPool { + let prefix = prefix.to_string(); + let num_cpus = NUM_CPUS.get_or_init(|| + match std::thread::available_parallelism() { + Ok(x) => x.into(), + /* Default to 2 if any error occurs. */ + Err(_) => 2, + } + ); + rayon::ThreadPoolBuilder::new() + .num_threads(n.unwrap_or(*NUM_CPUS)) + .thread_name(move |i| format!("{}: {}", &prefix, i)) + .build() + .unwrap() +} + +impl ZipArchive { + /// Extract a Zip archive into a directory, overwriting files if they + /// already exist. Paths are sanitized with [`ZipFile::enclosed_name`]. + /// + /// Extraction is not atomic; If an error is encountered, some of the files + /// may be left on disk. + pub fn extract_pipelined>(&self, directory: P) -> ZipResult<()> { + use rayon::prelude::*; + + use std::sync::mpsc; + + let directory = directory.as_ref().to_path_buf(); + fs::create_dir_all(&directory)?; + + let (paths_tx, paths_rx) = mpsc::channel::<&Path>(); + let (dirs_task_tx, dirs_task_rx) = mpsc::channel::>(); + let (stops_prior_tx, stops_prior_rx) = mpsc::sync_channel::>(1); + let (stops_tx, stops_rx) = + mpsc::sync_channel::<(&ZipFileData, &Path, IntermediateFile)>(200); + let (processed_tx, processed_rx) = + mpsc::sync_channel::<(&ZipFileData, &Path, IntermediateFile)>(200); + + static TOP_POOL: Lazy = Lazy::new(|| build_thread_pool(Some(64), "TOP")); + static STOPS_POOL: Lazy = Lazy::new(|| build_thread_pool(None, "stops")); + static READER_POOL: Lazy = + Lazy::new(|| build_thread_pool(None, "reader")); + static WRITER_POOL: Lazy = + Lazy::new(|| build_thread_pool(None, "writer")); + static EXTRACTOR_POOL: Lazy = + Lazy::new(|| build_thread_pool(None, "extractor")); + static DIR_POOL: Lazy = Lazy::new(|| build_thread_pool(None, "dir")); + + let completed_paths = Arc::new(RwLock::new(CompletedPaths::new())); + let completed_paths2 = Arc::clone(&completed_paths); + + let shared = &self.shared; + /* eprintln!("here1"); */ + let reader = self.reader.clone(); + + let dirs_task_tx2 = dirs_task_tx.clone(); + TOP_POOL.in_place_scope(move |s| { + let directory = directory; + let directory2 = directory.clone(); + + let dirs_task_tx3 = dirs_task_tx2.clone(); + /* (1) Collect a plan of where we'll need to seek and read in the underlying reader. */ + s.spawn(move |_| { + dirs_task_tx3 + .send(STOPS_POOL.install(move || { + let entries: Vec<_> = shared + .files + .par_iter() + .map(|data| { + data.enclosed_name() + .ok_or(ZipError::InvalidArchive("Invalid file path")) + .map(|relative_path| (data, relative_path)) + }) + .collect::, ZipError>>()?; + + let stops: Vec<_> = entries + .into_par_iter() + .inspect(move |(_, relative_path)| { + paths_tx.send(relative_path).expect("paths_rx hung up!"); + }) + .filter(|(_, relative_path)| { + !relative_path.to_string_lossy().ends_with('/') + }) + .collect(); + + stops_prior_tx + .try_send(stops) + .expect("expected send to work without blocking"); + Ok::<_, ZipError>(()) + })) + .expect("dirs_task_rx hung up! -1") + }); + + let dirs_task_tx3 = dirs_task_tx2.clone(); + s.spawn(move |_| { + dirs_task_tx3 + .send(READER_POOL.install(move || { + let stops = stops_prior_rx.recv().expect("stops_prior_tx hung up!"); + + /* (2) Execute the seek plan by splitting up the reader's extent into N contiguous + * chunks. */ + let mut chunk_size = stops.len() / *NUM_CPUS; + if chunk_size == 0 { + chunk_size = stops.len(); + } + + /* eprintln!("here2"); */ + stops + .par_chunks(chunk_size) + .map(|chunk| (chunk.to_vec(), reader.clone())) + .try_for_each(move |(chunk, mut reader)| { + for (data, relative_path) in chunk.into_iter() { + /* eprintln!("%%%%%%%%%"); */ + /* dbg!(relative_path); */ + + let mut reader = find_content(data, &mut reader)?; + + /* eprintln!("2: %%%%%%%%%"); */ + /* reader.seek(io::SeekFrom::Start(start))?; */ + /* reader.read_exact(buf)?; */ + + /* eprintln!("buf.len() = {}", buf.len()); */ + /* eprintln!( */ + /* "buf[..20] = {:?}", */ + /* &buf[..20], */ + /* /\* String::from_utf8_lossy(&buf[..20]), *\/ */ + /* ); */ + + /* eprintln!("3: %%%%%%%%%"); */ + const SPOOL_THRESHOLD: usize = 2_000; + let len = data.uncompressed_size as usize; + let mut outfile = if len < SPOOL_THRESHOLD { + IntermediateFile::immediate(len) + } else { + IntermediateFile::paging(len)? + }; + /* eprintln!("4: %%%%%%%%%"); */ + io::copy(&mut reader, &mut outfile)?; + /* eprintln!("5: %%%%%%%%%"); */ + outfile.rewind()?; + + /* eprintln!("@{}", &outfile); */ + + match stops_tx.send((data, relative_path, outfile)) { + Ok(()) => { + /* eprintln!("DONE: %% {}", relative_path.display()); */ + } + Err(mpsc::SendError((_, relative_path, _))) => { + panic!( + "stops_rx hung up! was: {}", + relative_path.display(), + ); + } + } + } + Ok::<_, ZipError>(()) + })?; + Ok(()) + })) + .expect("dirs_task_rx hung up!0"); + }); + + s.spawn(move |_| { + /* (0) create dirs/??? */ + dirs_task_tx + .send(DIR_POOL.install(move || { + let completed_paths2 = Arc::clone(&completed_paths); + paths_rx + .into_iter() + .par_bridge() + .map(move |relative_path| { + completed_paths2 + .read() + .unwrap() + .new_containing_dirs_needed(relative_path) + }) + .filter(|new_dirs| !new_dirs.is_empty()) + .try_for_each(move |new_dirs| { + for d in new_dirs.iter() { + let outpath = directory2.join(d); + match fs::create_dir(outpath) { + Ok(()) => (), + Err(e) => { + if e.kind() == io::ErrorKind::AlreadyExists { + /* ignore */ + } else { + return Err(e.into()); + } + } + } + } + + completed_paths.write().unwrap().write_dirs(&new_dirs[..]); + Ok::<_, ZipError>(()) + }) + })) + .expect("dirs_task_rx hung up!1"); + }); + + let dirs_task_tx3 = dirs_task_tx2.clone(); + s.spawn(move |_| { + dirs_task_tx2 + .send(WRITER_POOL.install(move || { + /* dbg!("wtf"); */ + stops_rx.into_iter().par_bridge().try_for_each( + move |(data, relative_path, source_handle)| { + /* eprintln!("0: @@@@@@"); */ + /* eprintln!( */ + /* "@: {}/{}/{}", */ + /* relative_path.display(), */ + /* data.compressed_size, */ + /* &source_handle, */ + /* ); */ + + let mut decompress_reader = + ZipEntry::from_data(data, source_handle); + + /* eprintln!("1: @@@@@@@@"); */ + + const UNCOMPRESSED_SPOOL_THRESHOLD: usize = 100_000; + let len = data.uncompressed_size as usize; + let mut outfile = if len < UNCOMPRESSED_SPOOL_THRESHOLD { + IntermediateFile::immediate(len) + } else { + IntermediateFile::paging(len)? + }; + /* NB: this may decompress, which may take a lot of cpu! */ + io::copy(&mut decompress_reader, &mut outfile)?; + /* eprintln!("2: @@@@@@@@"); */ + outfile.rewind()?; + + /* decompress_reader.into_inner().remove_backing_file()?; */ + + /* eprintln!("+++++++++"); */ + + processed_tx + .send((data, relative_path, outfile)) + .expect("processed_rx hung up!"); + + /* eprintln!("#########"); */ + + Ok::<_, ZipError>(()) + }, + )?; + + /* eprintln!("huh???"); */ + + Ok::<_, ZipError>(()) + })) + .expect("dirs_task_rx hung up!2"); + }); + + s.spawn(move |_| { + let directory = directory; /* Move. */ + /* (4) extract/??? */ + dirs_task_tx3 + .send(EXTRACTOR_POOL.install(move || { + processed_rx.into_iter().par_bridge().try_for_each( + move |(data, relative_path, mut file)| { + let outpath = directory.join(relative_path); + /* dbg!(&outpath); */ + let mut outfile = match fs::File::create(&outpath) { + Ok(f) => f, + Err(e) => { + if e.kind() == io::ErrorKind::NotFound { + /* Somehow, the containing dir didn't + * exist. Let's make it ourself and + * enter it into the registry. */ + let new_dirs = completed_paths2 + .read() + .unwrap() + .new_containing_dirs_needed(&relative_path); + /* dbg!(&new_dirs); */ + + for d in new_dirs.iter() { + let outpath = directory.join(d); + match fs::create_dir(outpath) { + Ok(()) => (), + Err(e) => { + if e.kind() == io::ErrorKind::AlreadyExists + { + /* ignore */ + } else { + return Err(e.into()); + } + } + } + } + + if !new_dirs.is_empty() { + completed_paths2 + .write() + .unwrap() + .write_dirs(&new_dirs[..]); + } + + fs::File::create(&outpath)? + } else { + return Err(e.into()); + } + } + }; + /* eprintln!("&&&&&&&&&&"); */ + io::copy(&mut file, &mut outfile)?; + file.remove_backing_file()?; + // Set permissions + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + if let Some(mode) = data.unix_mode() { + outfile + .set_permissions(fs::Permissions::from_mode(mode))?; + } + } + Ok::<_, ZipError>(()) + }, + ) + })) + .expect("dirs_task_rx hung up!3"); + }); + Ok::<_, ZipError>(()) + })?; + for result in dirs_task_rx.into_iter() { + result?; + } + Ok(()) + } +} + const fn unsupported_zip_error(detail: &'static str) -> ZipResult { Err(ZipError::UnsupportedArchive(detail)) }