Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/wasi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
tar xf wasi-sysroot.tar.gz --strip-components=1 -C wasi-sysroot
- run: |
export "CFLAGS_wasm32_wasip1_threads=--sysroot=${{ github.workspace }}/wasi-sysroot -I${{ github.workspace }}/wasi-sysroot/include/wasm32-wasip1-threads -L-I${{ github.workspace }}/wasi-sysroot/lib/wasm32-wasip1-threads"
cargo +nightly build --lib --features all --target wasm32-wasip1-threads
cargo +nightly build --lib --features all-implementations,brotli,bzip2,deflate,gzip,lz4,lzma,xz,zlib,zstd,deflate64 --target wasm32-wasip1-threads

on:
merge_group:
Expand Down
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ rustdoc-args = ["--cfg", "docsrs"]
# groups
all = ["all-implementations", "all-algorithms"]
all-implementations = ["futures-io", "tokio"]
all-algorithms = ["brotli", "bzip2", "deflate", "gzip", "lz4", "lzma", "xz", "zlib", "zstd", "deflate64"]
all-algorithms = ["brotli", "bzip2", "deflate", "gzip", "lz4", "lzma", "xz-parallel", "xz", "zlib", "zstd", "deflate64"]

# algorithms
deflate = ["flate2"]
gzip = ["flate2"]
lz4 = ["dep:lz4"]
lzma = ["dep:liblzma"]
xz = ["lzma"]
xz-parallel = ["xz", "liblzma/parallel"]
xz2 = ["xz"]
zlib = ["flate2"]
zstd = ["libzstd", "zstd-safe"]
Expand All @@ -45,7 +46,7 @@ lz4 = { version = "1.28.1", optional = true }
memchr = "2"
pin-project-lite = "0.2"
tokio = { version = "1.24.2", optional = true, default-features = false }
liblzma = { version = "0.4.0", optional = true }
liblzma = { version = "0.4.2", optional = true }
zstd-safe = { version = "7", optional = true, default-features = false }
deflate64 = { version = "0.1.5", optional = true }

Expand Down
8 changes: 8 additions & 0 deletions src/codec/xz/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ impl XzDecoder {
skip_padding: None,
}
}

#[cfg(feature = "xz-parallel")]
pub fn parallel(threads: std::num::NonZeroU32, memlimit: u64) -> Self {
Self {
inner: crate::codec::Xz2Decoder::parallel(threads, memlimit),
skip_padding: None,
}
}
}

impl Decode for XzDecoder {
Expand Down
7 changes: 7 additions & 0 deletions src/codec/xz/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ impl XzEncoder {
inner: crate::codec::Xz2Encoder::new(crate::codec::Xz2FileFormat::Xz, level),
}
}

#[cfg(feature = "xz-parallel")]
pub fn parallel(threads: std::num::NonZeroU32, level: u32) -> Self {
Self {
inner: crate::codec::Xz2Encoder::xz_parallel(level, threads),
}
}
}

impl Encode for XzEncoder {
Expand Down
31 changes: 30 additions & 1 deletion src/codec/xz2/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use crate::{codec::Decode, util::PartialBuffer};

pub struct Xz2Decoder {
stream: Stream,
#[cfg(feature = "xz-parallel")]
threads: Option<std::num::NonZeroU32>,
}

impl fmt::Debug for Xz2Decoder {
Expand All @@ -18,13 +20,40 @@ impl Xz2Decoder {
pub fn new(mem_limit: u64) -> Self {
Self {
stream: Stream::new_auto_decoder(mem_limit, 0).unwrap(),
#[cfg(feature = "xz-parallel")]
threads: None,
}
}

#[cfg(feature = "xz-parallel")]
pub fn parallel(threads: std::num::NonZeroU32, mem_limit: u64) -> Self {
Self {
stream: liblzma::stream::MtStreamBuilder::new()
.threads(threads.get())
.timeout_ms(300)
.memlimit_stop(mem_limit)
.decoder()
.unwrap(),
threads: Some(threads),
}
}
}

impl Decode for Xz2Decoder {
fn reinit(&mut self) -> io::Result<()> {
*self = Self::new(self.stream.memlimit());
#[cfg(feature = "xz-parallel")]
{
*self = match self.threads {
Some(threads) => Self::parallel(threads, self.stream.memlimit()),
None => Self::new(self.stream.memlimit()),
};
}

#[cfg(not(feature = "xz-parallel"))]
{
*self = Self::new(self.stream.memlimit());
}

Ok(())
}

Expand Down
38 changes: 34 additions & 4 deletions src/codec/xz2/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use crate::{

pub struct Xz2Encoder {
stream: Stream,
#[cfg(feature = "xz-parallel")]
threads: Option<std::num::NonZeroU32>,
}

impl fmt::Debug for Xz2Encoder {
Expand All @@ -26,7 +28,27 @@ impl Xz2Encoder {
}
};

Self { stream }
Self {
stream,
#[cfg(feature = "xz-parallel")]
threads: None,
}
}

#[cfg(feature = "xz-parallel")]
pub fn xz_parallel(level: u32, threads: std::num::NonZeroU32) -> Self {
let stream = liblzma::stream::MtStreamBuilder::new()
.threads(threads.get())
.timeout_ms(300)
.preset(level)
.check(Check::Crc64)
.encoder()
.unwrap();

Self {
stream,
threads: Some(threads),
}
}
}

Expand Down Expand Up @@ -59,9 +81,17 @@ impl Encode for Xz2Encoder {
) -> io::Result<bool> {
let previous_out = self.stream.total_out() as usize;

let status = self
.stream
.process(&[], output.unwritten_mut(), Action::SyncFlush)?;
// Multi-threaded streams don't support SyncFlush, use FullFlush instead
#[cfg(feature = "xz-parallel")]
let action = match self.threads {
Some(_) => Action::FullFlush,
None => Action::SyncFlush,
};

#[cfg(not(feature = "xz-parallel"))]
let action = Action::SyncFlush;

let status = self.stream.process(&[], output.unwritten_mut(), action)?;

output.advance(self.stream.total_out() as usize - previous_out);

Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@
)]
//!

//! ## Multi-thread support
//! The `xz` compression algorithm supports multi-threaded compression and decompression.
//! Enable the `xz-parallel` feature to enable multi-threading support.
//!

#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
#![warn(
missing_docs,
Expand Down
39 changes: 39 additions & 0 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,19 @@ macro_rules! algos {
),
}
}

/// Creates a new multi-threaded encoder.
///
/// Note that flushing will severely impact multi-threaded performance.
#[cfg(feature = "xz-parallel")]
pub fn parallel(inner: $inner, level: crate::Level, threads: std::num::NonZeroU32) -> Self {
Self {
inner: crate::$($mod::)+generic::Encoder::new(
inner,
crate::codec::XzEncoder::parallel(threads, level.into_xz2()),
),
}
}
}
{ @dec
/// Creates a new decoder with the specified limit of memory.
Expand All @@ -267,6 +280,32 @@ macro_rules! algos {
),
}
}

/// Creates a new multi-threaded decoder.
#[cfg(feature = "xz-parallel")]
pub fn parallel(read: $inner, threads: std::num::NonZeroU32) -> Self {
Self {
inner: crate::$($mod::)+generic::Decoder::new(
read,
crate::codec::XzDecoder::parallel(threads, u64::MAX),
),
}
}

/// Creates a new multi-threaded decoder with the specified limit of memory.
///
/// # Errors
///
/// An IO error may be returned during decoding if the specified limit is too small.
#[cfg(feature = "xz-parallel")]
pub fn parallel_with_mem_limit(read: $inner, threads: std::num::NonZeroU32, memlimit: u64) -> Self {
Self {
inner: crate::$($mod::)+generic::Decoder::new(
read,
crate::codec::XzDecoder::parallel(threads, memlimit),
),
}
}
}
);

Expand Down
95 changes: 95 additions & 0 deletions tests/utils/algos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,98 @@ algos! {
}
}
}

macro_rules! io_algo_parallel {
($impl:ident, $algo:ident($encoder:ident, $decoder:ident)) => {
pub mod $impl {
const THREADS: std::num::NonZeroU32 = std::num::NonZeroU32::new(16).unwrap();

pub mod read {
pub use crate::utils::impls::$impl::read::{poll_read, to_vec};
}

pub mod bufread {
pub use crate::utils::impls::$impl::bufread::{from, AsyncBufRead};
pub use async_compression::$impl::bufread::{
$decoder as Decoder, $encoder as Encoder,
};

use super::THREADS;
use crate::utils::{pin_mut, Level};

pub fn compress(input: impl AsyncBufRead) -> Vec<u8> {
pin_mut!(input);
super::read::to_vec(Encoder::parallel(input, Level::Fastest, THREADS))
}

pub fn decompress(input: impl AsyncBufRead) -> Vec<u8> {
pin_mut!(input);
super::read::to_vec(Decoder::parallel(input, THREADS))
}
}

pub mod write {
pub use crate::utils::impls::$impl::write::to_vec;
pub use async_compression::$impl::write::{
$decoder as Decoder, $encoder as Encoder,
};

use super::THREADS;
use crate::utils::Level;

pub fn compress(input: &[Vec<u8>], limit: usize) -> Vec<u8> {
to_vec(
input,
|input| Box::pin(Encoder::parallel(input, Level::Fastest, THREADS)),
limit,
)
}

pub fn decompress(input: &[Vec<u8>], limit: usize) -> Vec<u8> {
to_vec(
input,
|input| Box::pin(Decoder::parallel(input, THREADS)),
limit,
)
}
}
}
};
}

macro_rules! algos_parallel {
($(pub mod $name:ident($feat:literal, $encoder:ident, $decoder:ident) { pub mod sync { $($tt:tt)* } })*) => {
$(
#[cfg(feature = $feat)]
pub mod $name {
pub mod sync { $($tt)* }

#[cfg(feature = "futures-io")]
io_algo_parallel!(futures, $name($encoder, $decoder));

#[cfg(feature = "tokio")]
io_algo_parallel!(tokio, $name($encoder, $decoder));
}
)*
}
}

algos_parallel! {
pub mod xz_parallel("xz-parallel", XzEncoder, XzDecoder) {
pub mod sync {
pub use crate::utils::impls::sync::to_vec;

pub fn compress(bytes: &[u8]) -> Vec<u8> {
use liblzma::bufread::XzEncoder;

to_vec(XzEncoder::new(bytes, 0))
}

pub fn decompress(bytes: &[u8]) -> Vec<u8> {
use liblzma::bufread::XzDecoder;

to_vec(XzDecoder::new(bytes))
}
}
}
}
3 changes: 3 additions & 0 deletions tests/xz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ mod utils;

test_cases!(xz);

#[cfg(feature = "xz-parallel")]
test_cases!(xz_parallel);

#[allow(unused)]
use utils::{algos::xz::sync, InputStream};

Expand Down
Loading