diff --git a/.github/workflows/wasi.yml b/.github/workflows/wasi.yml index b6a4e0fe..4dc879a7 100644 --- a/.github/workflows/wasi.yml +++ b/.github/workflows/wasi.yml @@ -19,7 +19,7 @@ jobs: tar xf wasi-sysroot.tar.gz --strip-components=1 -C wasi-sysroot - run: | export "CFLAGS_wasm32_wasip1_threads=--sysroot=${{ github.workspace }}/wasi-sysroot -I${{ github.workspace }}/wasi-sysroot/include/wasm32-wasip1-threads -L-I${{ github.workspace }}/wasi-sysroot/lib/wasm32-wasip1-threads" - cargo +nightly build --lib --features all --target wasm32-wasip1-threads + cargo +nightly build --lib --features all-implementations,brotli,bzip2,deflate,gzip,lz4,lzma,xz,zlib,zstd,deflate64 --target wasm32-wasip1-threads on: merge_group: diff --git a/Cargo.toml b/Cargo.toml index f6ff05e3..2d3761b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ rustdoc-args = ["--cfg", "docsrs"] # groups all = ["all-implementations", "all-algorithms"] all-implementations = ["futures-io", "tokio"] -all-algorithms = ["brotli", "bzip2", "deflate", "gzip", "lz4", "lzma", "xz", "zlib", "zstd", "deflate64"] +all-algorithms = ["brotli", "bzip2", "deflate", "gzip", "lz4", "lzma", "xz-parallel", "xz", "zlib", "zstd", "deflate64"] # algorithms deflate = ["flate2"] @@ -28,6 +28,7 @@ gzip = ["flate2"] lz4 = ["dep:lz4"] lzma = ["dep:liblzma"] xz = ["lzma"] +xz-parallel = ["xz", "liblzma/parallel"] xz2 = ["xz"] zlib = ["flate2"] zstd = ["libzstd", "zstd-safe"] @@ -45,7 +46,7 @@ lz4 = { version = "1.28.1", optional = true } memchr = "2" pin-project-lite = "0.2" tokio = { version = "1.24.2", optional = true, default-features = false } -liblzma = { version = "0.4.0", optional = true } +liblzma = { version = "0.4.2", optional = true } zstd-safe = { version = "7", optional = true, default-features = false } deflate64 = { version = "0.1.5", optional = true } diff --git a/src/codec/xz/decoder.rs b/src/codec/xz/decoder.rs index 69128d44..95d552ef 100644 --- a/src/codec/xz/decoder.rs +++ b/src/codec/xz/decoder.rs @@ -22,6 +22,14 @@ impl XzDecoder { skip_padding: None, } } + + #[cfg(feature = "xz-parallel")] + pub fn parallel(threads: std::num::NonZeroU32, memlimit: u64) -> Self { + Self { + inner: crate::codec::Xz2Decoder::parallel(threads, memlimit), + skip_padding: None, + } + } } impl Decode for XzDecoder { diff --git a/src/codec/xz/encoder.rs b/src/codec/xz/encoder.rs index a3a23a27..22595ca2 100644 --- a/src/codec/xz/encoder.rs +++ b/src/codec/xz/encoder.rs @@ -13,6 +13,13 @@ impl XzEncoder { inner: crate::codec::Xz2Encoder::new(crate::codec::Xz2FileFormat::Xz, level), } } + + #[cfg(feature = "xz-parallel")] + pub fn parallel(threads: std::num::NonZeroU32, level: u32) -> Self { + Self { + inner: crate::codec::Xz2Encoder::xz_parallel(level, threads), + } + } } impl Encode for XzEncoder { diff --git a/src/codec/xz2/decoder.rs b/src/codec/xz2/decoder.rs index 1c111fd6..80eed14a 100644 --- a/src/codec/xz2/decoder.rs +++ b/src/codec/xz2/decoder.rs @@ -6,6 +6,8 @@ use crate::{codec::Decode, util::PartialBuffer}; pub struct Xz2Decoder { stream: Stream, + #[cfg(feature = "xz-parallel")] + threads: Option, } impl fmt::Debug for Xz2Decoder { @@ -18,13 +20,40 @@ impl Xz2Decoder { pub fn new(mem_limit: u64) -> Self { Self { stream: Stream::new_auto_decoder(mem_limit, 0).unwrap(), + #[cfg(feature = "xz-parallel")] + threads: None, + } + } + + #[cfg(feature = "xz-parallel")] + pub fn parallel(threads: std::num::NonZeroU32, mem_limit: u64) -> Self { + Self { + stream: liblzma::stream::MtStreamBuilder::new() + .threads(threads.get()) + .timeout_ms(300) + .memlimit_stop(mem_limit) + .decoder() + .unwrap(), + threads: Some(threads), } } } impl Decode for Xz2Decoder { fn reinit(&mut self) -> io::Result<()> { - *self = Self::new(self.stream.memlimit()); + #[cfg(feature = "xz-parallel")] + { + *self = match self.threads { + Some(threads) => Self::parallel(threads, self.stream.memlimit()), + None => Self::new(self.stream.memlimit()), + }; + } + + #[cfg(not(feature = "xz-parallel"))] + { + *self = Self::new(self.stream.memlimit()); + } + Ok(()) } diff --git a/src/codec/xz2/encoder.rs b/src/codec/xz2/encoder.rs index f032c965..0c3afa89 100644 --- a/src/codec/xz2/encoder.rs +++ b/src/codec/xz2/encoder.rs @@ -9,6 +9,8 @@ use crate::{ pub struct Xz2Encoder { stream: Stream, + #[cfg(feature = "xz-parallel")] + threads: Option, } impl fmt::Debug for Xz2Encoder { @@ -26,7 +28,27 @@ impl Xz2Encoder { } }; - Self { stream } + Self { + stream, + #[cfg(feature = "xz-parallel")] + threads: None, + } + } + + #[cfg(feature = "xz-parallel")] + pub fn xz_parallel(level: u32, threads: std::num::NonZeroU32) -> Self { + let stream = liblzma::stream::MtStreamBuilder::new() + .threads(threads.get()) + .timeout_ms(300) + .preset(level) + .check(Check::Crc64) + .encoder() + .unwrap(); + + Self { + stream, + threads: Some(threads), + } } } @@ -59,9 +81,17 @@ impl Encode for Xz2Encoder { ) -> io::Result { let previous_out = self.stream.total_out() as usize; - let status = self - .stream - .process(&[], output.unwritten_mut(), Action::SyncFlush)?; + // Multi-threaded streams don't support SyncFlush, use FullFlush instead + #[cfg(feature = "xz-parallel")] + let action = match self.threads { + Some(_) => Action::FullFlush, + None => Action::SyncFlush, + }; + + #[cfg(not(feature = "xz-parallel"))] + let action = Action::SyncFlush; + + let status = self.stream.process(&[], output.unwritten_mut(), action)?; output.advance(self.stream.total_out() as usize - previous_out); diff --git a/src/lib.rs b/src/lib.rs index 37b98eeb..5ecc6dd4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -138,6 +138,11 @@ )] //! +//! ## Multi-thread support +//! The `xz` compression algorithm supports multi-threaded compression and decompression. +//! Enable the `xz-parallel` feature to enable multi-threading support. +//! + #![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))] #![warn( missing_docs, diff --git a/src/macros.rs b/src/macros.rs index dbc3ef95..cbd161f1 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -252,6 +252,19 @@ macro_rules! algos { ), } } + + /// Creates a new multi-threaded encoder. + /// + /// Note that flushing will severely impact multi-threaded performance. + #[cfg(feature = "xz-parallel")] + pub fn parallel(inner: $inner, level: crate::Level, threads: std::num::NonZeroU32) -> Self { + Self { + inner: crate::$($mod::)+generic::Encoder::new( + inner, + crate::codec::XzEncoder::parallel(threads, level.into_xz2()), + ), + } + } } { @dec /// Creates a new decoder with the specified limit of memory. @@ -267,6 +280,32 @@ macro_rules! algos { ), } } + + /// Creates a new multi-threaded decoder. + #[cfg(feature = "xz-parallel")] + pub fn parallel(read: $inner, threads: std::num::NonZeroU32) -> Self { + Self { + inner: crate::$($mod::)+generic::Decoder::new( + read, + crate::codec::XzDecoder::parallel(threads, u64::MAX), + ), + } + } + + /// Creates a new multi-threaded decoder with the specified limit of memory. + /// + /// # Errors + /// + /// An IO error may be returned during decoding if the specified limit is too small. + #[cfg(feature = "xz-parallel")] + pub fn parallel_with_mem_limit(read: $inner, threads: std::num::NonZeroU32, memlimit: u64) -> Self { + Self { + inner: crate::$($mod::)+generic::Decoder::new( + read, + crate::codec::XzDecoder::parallel(threads, memlimit), + ), + } + } } ); diff --git a/tests/utils/algos.rs b/tests/utils/algos.rs index 723d9688..634c47dd 100644 --- a/tests/utils/algos.rs +++ b/tests/utils/algos.rs @@ -230,3 +230,98 @@ algos! { } } } + +macro_rules! io_algo_parallel { + ($impl:ident, $algo:ident($encoder:ident, $decoder:ident)) => { + pub mod $impl { + const THREADS: std::num::NonZeroU32 = std::num::NonZeroU32::new(16).unwrap(); + + pub mod read { + pub use crate::utils::impls::$impl::read::{poll_read, to_vec}; + } + + pub mod bufread { + pub use crate::utils::impls::$impl::bufread::{from, AsyncBufRead}; + pub use async_compression::$impl::bufread::{ + $decoder as Decoder, $encoder as Encoder, + }; + + use super::THREADS; + use crate::utils::{pin_mut, Level}; + + pub fn compress(input: impl AsyncBufRead) -> Vec { + pin_mut!(input); + super::read::to_vec(Encoder::parallel(input, Level::Fastest, THREADS)) + } + + pub fn decompress(input: impl AsyncBufRead) -> Vec { + pin_mut!(input); + super::read::to_vec(Decoder::parallel(input, THREADS)) + } + } + + pub mod write { + pub use crate::utils::impls::$impl::write::to_vec; + pub use async_compression::$impl::write::{ + $decoder as Decoder, $encoder as Encoder, + }; + + use super::THREADS; + use crate::utils::Level; + + pub fn compress(input: &[Vec], limit: usize) -> Vec { + to_vec( + input, + |input| Box::pin(Encoder::parallel(input, Level::Fastest, THREADS)), + limit, + ) + } + + pub fn decompress(input: &[Vec], limit: usize) -> Vec { + to_vec( + input, + |input| Box::pin(Decoder::parallel(input, THREADS)), + limit, + ) + } + } + } + }; +} + +macro_rules! algos_parallel { + ($(pub mod $name:ident($feat:literal, $encoder:ident, $decoder:ident) { pub mod sync { $($tt:tt)* } })*) => { + $( + #[cfg(feature = $feat)] + pub mod $name { + pub mod sync { $($tt)* } + + #[cfg(feature = "futures-io")] + io_algo_parallel!(futures, $name($encoder, $decoder)); + + #[cfg(feature = "tokio")] + io_algo_parallel!(tokio, $name($encoder, $decoder)); + } + )* + } +} + +algos_parallel! { + pub mod xz_parallel("xz-parallel", XzEncoder, XzDecoder) { + pub mod sync { + pub use crate::utils::impls::sync::to_vec; + + pub fn compress(bytes: &[u8]) -> Vec { + use liblzma::bufread::XzEncoder; + + to_vec(XzEncoder::new(bytes, 0)) + } + + pub fn decompress(bytes: &[u8]) -> Vec { + use liblzma::bufread::XzDecoder; + + to_vec(XzDecoder::new(bytes)) + } + } + } +} diff --git a/tests/xz.rs b/tests/xz.rs index f029993c..401f235e 100644 --- a/tests/xz.rs +++ b/tests/xz.rs @@ -6,6 +6,9 @@ mod utils; test_cases!(xz); +#[cfg(feature = "xz-parallel")] +test_cases!(xz_parallel); + #[allow(unused)] use utils::{algos::xz::sync, InputStream};