Skip to content

Commit 3b884ed

Browse files
authored
xz: add multi-thread support (#353)
* xz: add multi-thread support Gated by xz-parallel feature. * wasi: disable xz-parallel build * Apply code review
1 parent 8f9394d commit 3b884ed

File tree

10 files changed

+225
-8
lines changed

10 files changed

+225
-8
lines changed

.github/workflows/wasi.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
tar xf wasi-sysroot.tar.gz --strip-components=1 -C wasi-sysroot
2020
- run: |
2121
export "CFLAGS_wasm32_wasip1_threads=--sysroot=${{ github.workspace }}/wasi-sysroot -I${{ github.workspace }}/wasi-sysroot/include/wasm32-wasip1-threads -L-I${{ github.workspace }}/wasi-sysroot/lib/wasm32-wasip1-threads"
22-
cargo +nightly build --lib --features all --target wasm32-wasip1-threads
22+
cargo +nightly build --lib --features all-implementations,brotli,bzip2,deflate,gzip,lz4,lzma,xz,zlib,zstd,deflate64 --target wasm32-wasip1-threads
2323
2424
on:
2525
merge_group:

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ rustdoc-args = ["--cfg", "docsrs"]
2020
# groups
2121
all = ["all-implementations", "all-algorithms"]
2222
all-implementations = ["futures-io", "tokio"]
23-
all-algorithms = ["brotli", "bzip2", "deflate", "gzip", "lz4", "lzma", "xz", "zlib", "zstd", "deflate64"]
23+
all-algorithms = ["brotli", "bzip2", "deflate", "gzip", "lz4", "lzma", "xz-parallel", "xz", "zlib", "zstd", "deflate64"]
2424

2525
# algorithms
2626
deflate = ["flate2"]
2727
gzip = ["flate2"]
2828
lz4 = ["dep:lz4"]
2929
lzma = ["dep:liblzma"]
3030
xz = ["lzma"]
31+
xz-parallel = ["xz", "liblzma/parallel"]
3132
xz2 = ["xz"]
3233
zlib = ["flate2"]
3334
zstd = ["libzstd", "zstd-safe"]
@@ -45,7 +46,7 @@ lz4 = { version = "1.28.1", optional = true }
4546
memchr = "2"
4647
pin-project-lite = "0.2"
4748
tokio = { version = "1.24.2", optional = true, default-features = false }
48-
liblzma = { version = "0.4.0", optional = true }
49+
liblzma = { version = "0.4.2", optional = true }
4950
zstd-safe = { version = "7", optional = true, default-features = false }
5051
deflate64 = { version = "0.1.5", optional = true }
5152

src/codec/xz/decoder.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ impl XzDecoder {
2222
skip_padding: None,
2323
}
2424
}
25+
26+
#[cfg(feature = "xz-parallel")]
27+
pub fn parallel(threads: std::num::NonZeroU32, memlimit: u64) -> Self {
28+
Self {
29+
inner: crate::codec::Xz2Decoder::parallel(threads, memlimit),
30+
skip_padding: None,
31+
}
32+
}
2533
}
2634

2735
impl Decode for XzDecoder {

src/codec/xz/encoder.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ impl XzEncoder {
1313
inner: crate::codec::Xz2Encoder::new(crate::codec::Xz2FileFormat::Xz, level),
1414
}
1515
}
16+
17+
#[cfg(feature = "xz-parallel")]
18+
pub fn parallel(threads: std::num::NonZeroU32, level: u32) -> Self {
19+
Self {
20+
inner: crate::codec::Xz2Encoder::xz_parallel(level, threads),
21+
}
22+
}
1623
}
1724

1825
impl Encode for XzEncoder {

src/codec/xz2/decoder.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ use crate::{codec::Decode, util::PartialBuffer};
66

77
pub struct Xz2Decoder {
88
stream: Stream,
9+
#[cfg(feature = "xz-parallel")]
10+
threads: Option<std::num::NonZeroU32>,
911
}
1012

1113
impl fmt::Debug for Xz2Decoder {
@@ -18,13 +20,40 @@ impl Xz2Decoder {
1820
pub fn new(mem_limit: u64) -> Self {
1921
Self {
2022
stream: Stream::new_auto_decoder(mem_limit, 0).unwrap(),
23+
#[cfg(feature = "xz-parallel")]
24+
threads: None,
25+
}
26+
}
27+
28+
#[cfg(feature = "xz-parallel")]
29+
pub fn parallel(threads: std::num::NonZeroU32, mem_limit: u64) -> Self {
30+
Self {
31+
stream: liblzma::stream::MtStreamBuilder::new()
32+
.threads(threads.get())
33+
.timeout_ms(300)
34+
.memlimit_stop(mem_limit)
35+
.decoder()
36+
.unwrap(),
37+
threads: Some(threads),
2138
}
2239
}
2340
}
2441

2542
impl Decode for Xz2Decoder {
2643
fn reinit(&mut self) -> io::Result<()> {
27-
*self = Self::new(self.stream.memlimit());
44+
#[cfg(feature = "xz-parallel")]
45+
{
46+
*self = match self.threads {
47+
Some(threads) => Self::parallel(threads, self.stream.memlimit()),
48+
None => Self::new(self.stream.memlimit()),
49+
};
50+
}
51+
52+
#[cfg(not(feature = "xz-parallel"))]
53+
{
54+
*self = Self::new(self.stream.memlimit());
55+
}
56+
2857
Ok(())
2958
}
3059

src/codec/xz2/encoder.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use crate::{
99

1010
pub struct Xz2Encoder {
1111
stream: Stream,
12+
#[cfg(feature = "xz-parallel")]
13+
threads: Option<std::num::NonZeroU32>,
1214
}
1315

1416
impl fmt::Debug for Xz2Encoder {
@@ -26,7 +28,27 @@ impl Xz2Encoder {
2628
}
2729
};
2830

29-
Self { stream }
31+
Self {
32+
stream,
33+
#[cfg(feature = "xz-parallel")]
34+
threads: None,
35+
}
36+
}
37+
38+
#[cfg(feature = "xz-parallel")]
39+
pub fn xz_parallel(level: u32, threads: std::num::NonZeroU32) -> Self {
40+
let stream = liblzma::stream::MtStreamBuilder::new()
41+
.threads(threads.get())
42+
.timeout_ms(300)
43+
.preset(level)
44+
.check(Check::Crc64)
45+
.encoder()
46+
.unwrap();
47+
48+
Self {
49+
stream,
50+
threads: Some(threads),
51+
}
3052
}
3153
}
3254

@@ -59,9 +81,17 @@ impl Encode for Xz2Encoder {
5981
) -> io::Result<bool> {
6082
let previous_out = self.stream.total_out() as usize;
6183

62-
let status = self
63-
.stream
64-
.process(&[], output.unwritten_mut(), Action::SyncFlush)?;
84+
// Multi-threaded streams don't support SyncFlush, use FullFlush instead
85+
#[cfg(feature = "xz-parallel")]
86+
let action = match self.threads {
87+
Some(_) => Action::FullFlush,
88+
None => Action::SyncFlush,
89+
};
90+
91+
#[cfg(not(feature = "xz-parallel"))]
92+
let action = Action::SyncFlush;
93+
94+
let status = self.stream.process(&[], output.unwritten_mut(), action)?;
6595

6696
output.advance(self.stream.total_out() as usize - previous_out);
6797

src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@
138138
)]
139139
//!
140140
141+
//! ## Multi-thread support
142+
//! The `xz` compression algorithm supports multi-threaded compression and decompression.
143+
//! Enable the `xz-parallel` feature to enable multi-threading support.
144+
//!
145+
141146
#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
142147
#![warn(
143148
missing_docs,

src/macros.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,19 @@ macro_rules! algos {
252252
),
253253
}
254254
}
255+
256+
/// Creates a new multi-threaded encoder.
257+
///
258+
/// Note that flushing will severely impact multi-threaded performance.
259+
#[cfg(feature = "xz-parallel")]
260+
pub fn parallel(inner: $inner, level: crate::Level, threads: std::num::NonZeroU32) -> Self {
261+
Self {
262+
inner: crate::$($mod::)+generic::Encoder::new(
263+
inner,
264+
crate::codec::XzEncoder::parallel(threads, level.into_xz2()),
265+
),
266+
}
267+
}
255268
}
256269
{ @dec
257270
/// Creates a new decoder with the specified limit of memory.
@@ -267,6 +280,32 @@ macro_rules! algos {
267280
),
268281
}
269282
}
283+
284+
/// Creates a new multi-threaded decoder.
285+
#[cfg(feature = "xz-parallel")]
286+
pub fn parallel(read: $inner, threads: std::num::NonZeroU32) -> Self {
287+
Self {
288+
inner: crate::$($mod::)+generic::Decoder::new(
289+
read,
290+
crate::codec::XzDecoder::parallel(threads, u64::MAX),
291+
),
292+
}
293+
}
294+
295+
/// Creates a new multi-threaded decoder with the specified limit of memory.
296+
///
297+
/// # Errors
298+
///
299+
/// An IO error may be returned during decoding if the specified limit is too small.
300+
#[cfg(feature = "xz-parallel")]
301+
pub fn parallel_with_mem_limit(read: $inner, threads: std::num::NonZeroU32, memlimit: u64) -> Self {
302+
Self {
303+
inner: crate::$($mod::)+generic::Decoder::new(
304+
read,
305+
crate::codec::XzDecoder::parallel(threads, memlimit),
306+
),
307+
}
308+
}
270309
}
271310
);
272311

tests/utils/algos.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,3 +230,98 @@ algos! {
230230
}
231231
}
232232
}
233+
234+
macro_rules! io_algo_parallel {
235+
($impl:ident, $algo:ident($encoder:ident, $decoder:ident)) => {
236+
pub mod $impl {
237+
const THREADS: std::num::NonZeroU32 = std::num::NonZeroU32::new(16).unwrap();
238+
239+
pub mod read {
240+
pub use crate::utils::impls::$impl::read::{poll_read, to_vec};
241+
}
242+
243+
pub mod bufread {
244+
pub use crate::utils::impls::$impl::bufread::{from, AsyncBufRead};
245+
pub use async_compression::$impl::bufread::{
246+
$decoder as Decoder, $encoder as Encoder,
247+
};
248+
249+
use super::THREADS;
250+
use crate::utils::{pin_mut, Level};
251+
252+
pub fn compress(input: impl AsyncBufRead) -> Vec<u8> {
253+
pin_mut!(input);
254+
super::read::to_vec(Encoder::parallel(input, Level::Fastest, THREADS))
255+
}
256+
257+
pub fn decompress(input: impl AsyncBufRead) -> Vec<u8> {
258+
pin_mut!(input);
259+
super::read::to_vec(Decoder::parallel(input, THREADS))
260+
}
261+
}
262+
263+
pub mod write {
264+
pub use crate::utils::impls::$impl::write::to_vec;
265+
pub use async_compression::$impl::write::{
266+
$decoder as Decoder, $encoder as Encoder,
267+
};
268+
269+
use super::THREADS;
270+
use crate::utils::Level;
271+
272+
pub fn compress(input: &[Vec<u8>], limit: usize) -> Vec<u8> {
273+
to_vec(
274+
input,
275+
|input| Box::pin(Encoder::parallel(input, Level::Fastest, THREADS)),
276+
limit,
277+
)
278+
}
279+
280+
pub fn decompress(input: &[Vec<u8>], limit: usize) -> Vec<u8> {
281+
to_vec(
282+
input,
283+
|input| Box::pin(Decoder::parallel(input, THREADS)),
284+
limit,
285+
)
286+
}
287+
}
288+
}
289+
};
290+
}
291+
292+
macro_rules! algos_parallel {
293+
($(pub mod $name:ident($feat:literal, $encoder:ident, $decoder:ident) { pub mod sync { $($tt:tt)* } })*) => {
294+
$(
295+
#[cfg(feature = $feat)]
296+
pub mod $name {
297+
pub mod sync { $($tt)* }
298+
299+
#[cfg(feature = "futures-io")]
300+
io_algo_parallel!(futures, $name($encoder, $decoder));
301+
302+
#[cfg(feature = "tokio")]
303+
io_algo_parallel!(tokio, $name($encoder, $decoder));
304+
}
305+
)*
306+
}
307+
}
308+
309+
algos_parallel! {
310+
pub mod xz_parallel("xz-parallel", XzEncoder, XzDecoder) {
311+
pub mod sync {
312+
pub use crate::utils::impls::sync::to_vec;
313+
314+
pub fn compress(bytes: &[u8]) -> Vec<u8> {
315+
use liblzma::bufread::XzEncoder;
316+
317+
to_vec(XzEncoder::new(bytes, 0))
318+
}
319+
320+
pub fn decompress(bytes: &[u8]) -> Vec<u8> {
321+
use liblzma::bufread::XzDecoder;
322+
323+
to_vec(XzDecoder::new(bytes))
324+
}
325+
}
326+
}
327+
}

tests/xz.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ mod utils;
66

77
test_cases!(xz);
88

9+
#[cfg(feature = "xz-parallel")]
10+
test_cases!(xz_parallel);
11+
912
#[allow(unused)]
1013
use utils::{algos::xz::sync, InputStream};
1114

0 commit comments

Comments
 (0)