From cd3cf45c159ef864758e905c6a8e78efd47f90f9 Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Thu, 6 Nov 2025 12:13:57 +0000 Subject: [PATCH 01/13] Add uninitialized buffer API to `WriteBuffer` --- crates/compression-core/src/util.rs | 97 +++++++++++++++++++++++++---- 1 file changed, 86 insertions(+), 11 deletions(-) diff --git a/crates/compression-core/src/util.rs b/crates/compression-core/src/util.rs index 6182bfee..637ea7fe 100644 --- a/crates/compression-core/src/util.rs +++ b/crates/compression-core/src/util.rs @@ -1,3 +1,5 @@ +use core::mem::MaybeUninit; + pub const fn _assert_send() {} pub const fn _assert_sync() {} @@ -84,17 +86,34 @@ impl + AsMut<[u8]>> From for PartialBuffer { /// ``` #[derive(Debug)] pub struct WriteBuffer<'a> { - buffer: &'a mut [u8], + buffer: &'a mut [MaybeUninit], index: usize, + initialized: usize, } impl<'a> WriteBuffer<'a> { pub fn new_initialized(buffer: &'a mut [u8]) -> Self { - Self { buffer, index: 0 } + Self { + initialized: buffer.len(), + // Safety: with initialized set to len of the buffer, + // `WriteBuffer` would treat it as a `&mut [u8]`. + buffer: unsafe { &mut *(buffer as *mut [u8] as *mut _) }, + index: 0, + } + } + + pub fn new_uninitialized(buffer: &'a mut [MaybeUninit]) -> Self { + Self { + buffer, + index: 0, + initialized: 0, + } } pub fn written(&self) -> &[u8] { - &self.buffer[..self.index] + debug_assert!(self.index <= self.initialized); + + unsafe { &*(&self.buffer[..self.index] as *const _ as *const [u8]) } } /// Convenient method for `.writen().len()` @@ -110,31 +129,87 @@ impl<'a> WriteBuffer<'a> { /// Initialize all uninitialized, unwritten part to initialized, unwritten part /// Return all unwritten part pub fn initialize_unwritten(&mut self) -> &mut [u8] { - &mut self.buffer[self.index..] + self.buffer[self.initialized..] + .iter_mut() + .for_each(|maybe_uninit| { + maybe_uninit.write(0); + }); + self.initialized = self.buffer.len(); + + unsafe { &mut *(&mut self.buffer[self.index..] as *mut _ as *mut [u8]) } } /// Advance written index within initialized part. /// /// Note that try to advance into uninitialized part would panic. pub fn advance(&mut self, amount: usize) { + debug_assert!(self.index + amount <= self.buffer.len()); + debug_assert!(self.index + amount <= self.initialized); + self.index += amount; - debug_assert!(self.index <= self.buffer.len()); } pub fn reset(&mut self) { self.index = 0; } + /// Returns a mutable reference to the unwritten part of the buffer without + /// ensuring that it has been fully initialized. + /// + /// # Safety + /// + /// The caller must not de-initialize portions of the buffer that have already + /// been initialized. + /// + /// This includes any bytes in the region returned by this function. + pub unsafe fn unwritten_mut(&mut self) -> &mut [MaybeUninit] { + &mut self.buffer[self.index..] + } + + /// Asserts that the first `n` unfilled bytes of the buffer are initialized. + /// + /// [`WriteBuffer`] assumes that bytes are never de-initialized, so this method + /// does nothing when called with fewer bytes than are already known to be initialized. + /// + /// # Safety + /// + /// The caller must ensure that `n` unfilled bytes of the buffer have already been initialized. + pub unsafe fn assume_init(&mut self, n: usize) { + debug_assert!(self.index <= (self.initialized + n)); + debug_assert!((self.initialized + n) <= self.buffer.len()); + + self.initialized += n; + } + + /// Convenient function combining [`WriteBuffer::assume_init`] and [`WriteBuffer::advance`]. + /// + /// # Safety + /// + /// The caller must ensure that `n` unfilled bytes of the buffer have already been initialized. + pub unsafe fn assume_init_and_advance(&mut self, n: usize) { + debug_assert!(self.index + n <= self.buffer.len()); + + self.index += n; + self.initialized = self.initialized.max(self.index); + } + pub fn copy_unwritten_from>(&mut self, other: &mut PartialBuffer) -> usize { - let len = self - .initialize_unwritten() - .len() - .min(other.unwritten().len()); + // Safety: We never write uninitialized bytes into it + let out = unsafe { self.unwritten_mut() }; - self.initialize_unwritten()[..len].copy_from_slice(&other.unwritten()[..len]); + let len = out.len().min(other.unwritten().len()); - self.advance(len); + out[..len] + .iter_mut() + .zip(&other.unwritten()[..len]) + .for_each(|(maybe_uninit, byte)| { + maybe_uninit.write(*byte); + }); + + // Safety: We have written `len` bytes of initialized data into it + unsafe { self.assume_init_and_advance(len) }; other.advance(len); + len } } From 189aad6661a6d5839df5bb573e9b53d0c8d82437 Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Thu, 6 Nov 2025 12:19:52 +0000 Subject: [PATCH 02/13] Demono `WriteBuffer::copy_unwritten_from` - reduce compile time - having one version would make it easier to optimize for compiler --- crates/compression-core/src/util.rs | 31 +++++++++++++++++------------ 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/crates/compression-core/src/util.rs b/crates/compression-core/src/util.rs index 637ea7fe..99a65742 100644 --- a/crates/compression-core/src/util.rs +++ b/crates/compression-core/src/util.rs @@ -194,20 +194,25 @@ impl<'a> WriteBuffer<'a> { } pub fn copy_unwritten_from>(&mut self, other: &mut PartialBuffer) -> usize { - // Safety: We never write uninitialized bytes into it - let out = unsafe { self.unwritten_mut() }; - - let len = out.len().min(other.unwritten().len()); - - out[..len] - .iter_mut() - .zip(&other.unwritten()[..len]) - .for_each(|(maybe_uninit, byte)| { - maybe_uninit.write(*byte); - }); + fn inner(this: &mut WriteBuffer<'_>, input: &[u8]) -> usize { + // Safety: We will never ever write uninitialized bytes into it + let out = unsafe { this.unwritten_mut() }; + + let len = out.len().min(input.len()); + + out[..len] + .iter_mut() + .zip(&input[..len]) + .for_each(|(maybe_uninit, byte)| { + maybe_uninit.write(*byte); + }); + + // Safety: We have written `len` bytes of initialized data into it + unsafe { this.assume_init_and_advance(len) }; + len + } - // Safety: We have written `len` bytes of initialized data into it - unsafe { self.assume_init_and_advance(len) }; + let len = inner(self, other.unwritten()); other.advance(len); len From b63ab766d7e0656021d7565524b0d99e4c577eeb Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Fri, 7 Nov 2025 14:29:34 +0000 Subject: [PATCH 03/13] Use bzip2 0.6.1 uninitialized API --- crates/compression-codecs/Cargo.toml | 2 +- crates/compression-codecs/src/bzip2/decoder.rs | 8 ++++++-- crates/compression-codecs/src/bzip2/encoder.rs | 6 ++++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/crates/compression-codecs/Cargo.toml b/crates/compression-codecs/Cargo.toml index e410ca38..e8c03980 100644 --- a/crates/compression-codecs/Cargo.toml +++ b/crates/compression-codecs/Cargo.toml @@ -47,7 +47,7 @@ deflate64 = ["dep:deflate64"] compression-core.workspace = true # features brotli = { version = "8", optional = true } -bzip2 = { version = "0.6", optional = true } +bzip2 = { version = "0.6.1", optional = true } deflate64 = { version = "0.1.5", optional = true } flate2 = { version = "1.0.13", optional = true } libzstd = { package = "zstd", version = "0.13.1", optional = true, default-features = false } diff --git a/crates/compression-codecs/src/bzip2/decoder.rs b/crates/compression-codecs/src/bzip2/decoder.rs index f321618a..0906e74e 100644 --- a/crates/compression-codecs/src/bzip2/decoder.rs +++ b/crates/compression-codecs/src/bzip2/decoder.rs @@ -43,11 +43,15 @@ impl BzDecoder { let result = self .decompress - .decompress(input.unwritten(), output.initialize_unwritten()) + // Safety: We **trust** bzip2 to only write initialized data to it + .decompress_uninit(input.unwritten(), unsafe { output.unwritten_mut() }) .map_err(io::Error::other); input.advance((self.decompress.total_in() - prior_in) as usize); - output.advance((self.decompress.total_out() - prior_out) as usize); + // Safety: We **trust** bzip2 to write bytes properly + unsafe { + output.assume_init_and_advance((self.decompress.total_out() - prior_out) as usize) + }; // Track when stream has properly ended if matches!(result, Ok(Status::StreamEnd)) { diff --git a/crates/compression-codecs/src/bzip2/encoder.rs b/crates/compression-codecs/src/bzip2/encoder.rs index 40767da3..353c243e 100644 --- a/crates/compression-codecs/src/bzip2/encoder.rs +++ b/crates/compression-codecs/src/bzip2/encoder.rs @@ -57,11 +57,13 @@ impl BzEncoder { let result = self .compress - .compress(input.unwritten(), output.initialize_unwritten(), action) + // Safety: We **trust** bzip2 to only write initialized bytes into it + .compress_uninit(input.unwritten(), unsafe { output.unwritten_mut() }, action) .map_err(io::Error::other); input.advance((self.compress.total_in() - prior_in) as usize); - output.advance((self.compress.total_out() - prior_out) as usize); + // Safety: We **trust** bzip2 to properly write bytes into it + unsafe { output.assume_init_and_advance((self.compress.total_out() - prior_out) as usize) }; result } From d1b55be226f03b3cbc66c3753b315b6b32a958fe Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Sat, 8 Nov 2025 15:07:11 +0000 Subject: [PATCH 04/13] Use deflate64 0.1.10 uninitialized API --- crates/compression-codecs/Cargo.toml | 2 +- crates/compression-codecs/src/deflate64/decoder.rs | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/compression-codecs/Cargo.toml b/crates/compression-codecs/Cargo.toml index e8c03980..e5069e65 100644 --- a/crates/compression-codecs/Cargo.toml +++ b/crates/compression-codecs/Cargo.toml @@ -48,7 +48,7 @@ compression-core.workspace = true # features brotli = { version = "8", optional = true } bzip2 = { version = "0.6.1", optional = true } -deflate64 = { version = "0.1.5", optional = true } +deflate64 = { version = "0.1.10", optional = true } flate2 = { version = "1.0.13", optional = true } libzstd = { package = "zstd", version = "0.13.1", optional = true, default-features = false } lz4 = { version = "1.28.1", optional = true } diff --git a/crates/compression-codecs/src/deflate64/decoder.rs b/crates/compression-codecs/src/deflate64/decoder.rs index 0011c941..c0ebf63e 100644 --- a/crates/compression-codecs/src/deflate64/decoder.rs +++ b/crates/compression-codecs/src/deflate64/decoder.rs @@ -28,10 +28,12 @@ impl Deflate64Decoder { ) -> Result { let result = self .inflater - .inflate(input.unwritten(), output.initialize_unwritten()); + // Safety: We **trust** deflate64 to not write uninitialized bytes + .inflate_uninit(input.unwritten(), unsafe { output.unwritten_mut() }); input.advance(result.bytes_consumed); - output.advance(result.bytes_written); + // Safety: We **trust** deflate64 to properly write bytes into buffer + unsafe { output.assume_init_and_advance(result.bytes_written) }; if result.data_error { Err(Error::new(ErrorKind::InvalidData, "invalid data")) From a069dc73bee873dc922d289be2aa6edfd8be3648 Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Sun, 9 Nov 2025 00:29:23 +0000 Subject: [PATCH 05/13] Use flate2 1.1.4 uninitialized API --- crates/compression-codecs/Cargo.toml | 2 +- crates/compression-codecs/src/flate/decoder.rs | 12 ++++++++---- crates/compression-codecs/src/flate/encoder.rs | 10 ++++++---- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/crates/compression-codecs/Cargo.toml b/crates/compression-codecs/Cargo.toml index e5069e65..6ed5e4f5 100644 --- a/crates/compression-codecs/Cargo.toml +++ b/crates/compression-codecs/Cargo.toml @@ -49,7 +49,7 @@ compression-core.workspace = true brotli = { version = "8", optional = true } bzip2 = { version = "0.6.1", optional = true } deflate64 = { version = "0.1.10", optional = true } -flate2 = { version = "1.0.13", optional = true } +flate2 = { version = "1.1.4", optional = true } libzstd = { package = "zstd", version = "0.13.1", optional = true, default-features = false } lz4 = { version = "1.28.1", optional = true } liblzma = { version = "0.4.4", optional = true } diff --git a/crates/compression-codecs/src/flate/decoder.rs b/crates/compression-codecs/src/flate/decoder.rs index 82e3c95b..44bb0af7 100644 --- a/crates/compression-codecs/src/flate/decoder.rs +++ b/crates/compression-codecs/src/flate/decoder.rs @@ -26,12 +26,16 @@ impl FlateDecoder { let prior_in = self.decompress.total_in(); let prior_out = self.decompress.total_out(); - let result = - self.decompress - .decompress(input.unwritten(), output.initialize_unwritten(), flush); + let result = self + .decompress + // Safety: We **trust** flate2 to not write uninitialized bytes into buffer + .decompress_uninit(input.unwritten(), unsafe { output.unwritten_mut() }, flush); input.advance((self.decompress.total_in() - prior_in) as usize); - output.advance((self.decompress.total_out() - prior_out) as usize); + // Safety: We **trust** flate2 to write bytes into buffer properly + unsafe { + output.assume_init_and_advance((self.decompress.total_out() - prior_out) as usize) + }; Ok(result?) } diff --git a/crates/compression-codecs/src/flate/encoder.rs b/crates/compression-codecs/src/flate/encoder.rs index eed7a608..d3509c42 100644 --- a/crates/compression-codecs/src/flate/encoder.rs +++ b/crates/compression-codecs/src/flate/encoder.rs @@ -31,12 +31,14 @@ impl FlateEncoder { let prior_in = self.compress.total_in(); let prior_out = self.compress.total_out(); - let result = - self.compress - .compress(input.unwritten(), output.initialize_unwritten(), flush); + let result = self + .compress + // Safety: We **trust** flate2 to not write uninitialized bytes into buffer + .compress_uninit(input.unwritten(), unsafe { output.unwritten_mut() }, flush); input.advance((self.compress.total_in() - prior_in) as usize); - output.advance((self.compress.total_out() - prior_out) as usize); + // Safety: We **trust** flate2 to write bytes properly into buffer + unsafe { output.assume_init_and_advance((self.compress.total_out() - prior_out) as usize) }; Ok(result?) } From 86c1cc6ff6132dc4ebcafc451b9cde83729dd10c Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Sun, 9 Nov 2025 00:44:05 +0000 Subject: [PATCH 06/13] Support uninitialized buffer for lz4 codecs --- crates/compression-codecs/src/lz4/decoder.rs | 20 +++++++++++++------- crates/compression-codecs/src/lz4/encoder.rs | 9 ++++++--- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/crates/compression-codecs/src/lz4/decoder.rs b/crates/compression-codecs/src/lz4/decoder.rs index a26b4a0d..2a4c49f5 100644 --- a/crates/compression-codecs/src/lz4/decoder.rs +++ b/crates/compression-codecs/src/lz4/decoder.rs @@ -61,22 +61,28 @@ impl DecodeV2 for Lz4Decoder { input: &mut PartialBuffer<&[u8]>, output: &mut WriteBuffer<'_>, ) -> Result { - let out_buf = output.initialize_unwritten(); - - let mut output_size = out_buf.len(); let mut input_size = input.unwritten().len(); + + // Safety: We **trust** lz4 bytes to properly function as expected, + // only write decompressed, initialized data into the buffer properly. let result = unsafe { - check_error(LZ4F_decompress( + let out_buf = output.unwritten_mut(); + + let mut output_size = out_buf.len(); + + let result = check_error(LZ4F_decompress( self.ctx.get_mut().ctx, - out_buf.as_mut_ptr(), + out_buf.as_mut_ptr() as *mut _, &mut output_size, input.unwritten().as_ptr(), &mut input_size, core::ptr::null(), - )) + )); + output.assume_init_and_advance(output_size); + + result }; input.advance(input_size); - output.advance(output_size); let finished = result? == 0; if finished { diff --git a/crates/compression-codecs/src/lz4/encoder.rs b/crates/compression-codecs/src/lz4/encoder.rs index 516d4664..9bc3958d 100644 --- a/crates/compression-codecs/src/lz4/encoder.rs +++ b/crates/compression-codecs/src/lz4/encoder.rs @@ -117,7 +117,8 @@ impl Lz4Encoder { Lz4Fn::Flush | Lz4Fn::End => self.flush_buffer_size, }; - let out_buf = output.initialize_unwritten(); + // Safety: We **trust** lz4 to not write uninitialized bytes + let out_buf = unsafe { output.unwritten_mut() }; let output_len = out_buf.len(); let (dst_buffer, dst_size, maybe_internal_buffer) = if min_dst_size > output_len { @@ -132,7 +133,7 @@ impl Lz4Encoder { Some(buffer), ) } else { - (out_buf.as_mut_ptr(), output_len, None) + (out_buf.as_mut_ptr() as *mut _, output_len, None) }; let len = match lz4_fn { @@ -185,13 +186,15 @@ impl Lz4Encoder { }; let drained_after = if let Some(internal_buffer) = maybe_internal_buffer { + // Safety: We **trust** lz4 to properly write data into the buffer unsafe { internal_buffer.get_mut().set_len(len); } let (d, _) = self.drain_buffer(output); d } else { - output.advance(len); + // Safety: We **trust** lz4 to properly write data into the buffer + unsafe { output.assume_init_and_advance(len) }; len }; From d16a652518853ae63eeeef97c8e239bc9624c81a Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Sun, 9 Nov 2025 00:46:42 +0000 Subject: [PATCH 07/13] Ret early in `Lz4Encoder::write` if output has no spare space Avoid allocating a new buffer when the output has no spare space, it makes no sense to allocate an internal buffer and try compress when output is empty --- crates/compression-codecs/src/lz4/encoder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/compression-codecs/src/lz4/encoder.rs b/crates/compression-codecs/src/lz4/encoder.rs index 9bc3958d..7eee0cca 100644 --- a/crates/compression-codecs/src/lz4/encoder.rs +++ b/crates/compression-codecs/src/lz4/encoder.rs @@ -102,7 +102,7 @@ impl Lz4Encoder { fn write(&mut self, lz4_fn: Lz4Fn<'_, '_>, output: &mut WriteBuffer<'_>) -> Result { let (drained_before, undrained) = self.drain_buffer(output); - if undrained > 0 { + if undrained > 0 || output.has_no_spare_space() { return Ok(drained_before); } From 02d737556fae93d3ef0788e0714ec6825e1fa18b Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Sun, 9 Nov 2025 00:52:16 +0000 Subject: [PATCH 08/13] Fix `Lz4Encoder`: Use `Vec::spare_capacity_mut()` This is the safe usage, previous usage is ok in practice, but is actually considered an out-of-bound access by miri and any santizier. --- crates/compression-codecs/src/lz4/encoder.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/compression-codecs/src/lz4/encoder.rs b/crates/compression-codecs/src/lz4/encoder.rs index 7eee0cca..a0e78de5 100644 --- a/crates/compression-codecs/src/lz4/encoder.rs +++ b/crates/compression-codecs/src/lz4/encoder.rs @@ -127,14 +127,16 @@ impl Lz4Encoder { .maybe_buffer .get_or_insert_with(|| PartialBuffer::new(Vec::with_capacity(buffer_size))); buffer.reset(); + buffer.get_mut().clear(); ( - buffer.unwritten_mut().as_mut_ptr(), + buffer.get_mut().spare_capacity_mut().as_mut_ptr(), buffer_size, Some(buffer), ) } else { - (out_buf.as_mut_ptr() as *mut _, output_len, None) + (out_buf.as_mut_ptr(), output_len, None) }; + let dst_buffer = dst_buffer as *mut u8; let len = match lz4_fn { Lz4Fn::Begin => { From cbfeef3699a3abce077e29a300b9be0d6123c9c8 Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Sun, 9 Nov 2025 01:04:32 +0000 Subject: [PATCH 09/13] Refactor xz2 codecs: Extract new fn `process_stream` Shared between encoder and decoder --- crates/compression-codecs/src/xz2/decoder.rs | 41 ++++-------------- crates/compression-codecs/src/xz2/encoder.rs | 44 +++++++------------- crates/compression-codecs/src/xz2/mod.rs | 27 ++++++++++++ 3 files changed, 51 insertions(+), 61 deletions(-) diff --git a/crates/compression-codecs/src/xz2/decoder.rs b/crates/compression-codecs/src/xz2/decoder.rs index 0f18f6ca..64cd1c06 100644 --- a/crates/compression-codecs/src/xz2/decoder.rs +++ b/crates/compression-codecs/src/xz2/decoder.rs @@ -1,6 +1,6 @@ -use crate::{lzma::params::LzmaDecoderParams, DecodeV2, DecodedSize}; +use crate::{lzma::params::LzmaDecoderParams, xz2::process_stream, DecodeV2, DecodedSize}; use compression_core::util::{PartialBuffer, WriteBuffer}; -use liblzma::stream::{Action, Status, Stream}; +use liblzma::stream::{Action, Stream}; use std::{ convert::TryFrom, fmt, @@ -52,29 +52,6 @@ impl Xz2Decoder { Self::try_from(params).unwrap() } - - /// Return `Ok(true)` on stream ends. - fn process( - &mut self, - input: &[u8], - output: &mut WriteBuffer<'_>, - action: Action, - ) -> io::Result { - let previous_out = self.stream.total_out() as usize; - - let status = self - .stream - .process(input, output.initialize_unwritten(), action)?; - - output.advance(self.stream.total_out() as usize - previous_out); - - match status { - Status::Ok => Ok(false), - Status::StreamEnd => Ok(true), - Status::GetCheck => Err(io::Error::other("Unexpected lzma integrity check")), - Status::MemNeeded => Err(io::ErrorKind::OutOfMemory.into()), - } - } } impl DecodeV2 for Xz2Decoder { @@ -88,12 +65,7 @@ impl DecodeV2 for Xz2Decoder { input: &mut PartialBuffer<&[u8]>, output: &mut WriteBuffer<'_>, ) -> io::Result { - let previous_in = self.stream.total_in() as usize; - - let res = self.process(input.unwritten(), output, Action::Run); - input.advance(self.stream.total_in() as usize - previous_in); - - res + process_stream(&mut self.stream, input, output, Action::Run) } fn flush(&mut self, _output: &mut WriteBuffer<'_>) -> io::Result { @@ -102,7 +74,12 @@ impl DecodeV2 for Xz2Decoder { } fn finish(&mut self, output: &mut WriteBuffer<'_>) -> io::Result { - self.process(&[], output, Action::Finish) + process_stream( + &mut self.stream, + &mut PartialBuffer::new(&[]), + output, + Action::Finish, + ) } } diff --git a/crates/compression-codecs/src/xz2/encoder.rs b/crates/compression-codecs/src/xz2/encoder.rs index cddeda92..3b600a85 100644 --- a/crates/compression-codecs/src/xz2/encoder.rs +++ b/crates/compression-codecs/src/xz2/encoder.rs @@ -2,7 +2,7 @@ use compression_core::{ util::{PartialBuffer, WriteBuffer}, Level, }; -use liblzma::stream::{Action, Check, Status, Stream}; +use liblzma::stream::{Action, Check, Stream}; use std::{ convert::{TryFrom, TryInto}, fmt, io, @@ -10,6 +10,7 @@ use std::{ use crate::{ lzma::params::{LzmaEncoderParams, LzmaOptions}, + xz2::process_stream, EncodeV2, Xz2FileFormat, }; @@ -77,31 +78,6 @@ impl Xz2Encoder { let params = LzmaEncoderParams::MultiThread { builder }; Self::try_from(params).unwrap() } - - /// Return `Ok(true)` if stream ends. - fn process( - &mut self, - input: &mut PartialBuffer<&[u8]>, - output: &mut WriteBuffer<'_>, - action: Action, - ) -> io::Result { - let previous_in = self.stream.total_in() as usize; - let previous_out = self.stream.total_out() as usize; - - let res = self - .stream - .process(input.unwritten(), output.initialize_unwritten(), action); - - input.advance(self.stream.total_in() as usize - previous_in); - output.advance(self.stream.total_out() as usize - previous_out); - - match res? { - Status::Ok => Ok(false), - Status::StreamEnd => Ok(true), - Status::GetCheck => Err(io::Error::other("Unexpected lzma integrity check")), - Status::MemNeeded => Err(io::ErrorKind::OutOfMemory.into()), - } - } } impl EncodeV2 for Xz2Encoder { @@ -110,7 +86,7 @@ impl EncodeV2 for Xz2Encoder { input: &mut PartialBuffer<&[u8]>, output: &mut WriteBuffer<'_>, ) -> io::Result<()> { - self.process(input, output, Action::Run).map(|_| ()) + process_stream(&mut self.stream, input, output, Action::Run).map(|_| ()) } fn flush(&mut self, output: &mut WriteBuffer<'_>) -> io::Result { @@ -121,10 +97,20 @@ impl EncodeV2 for Xz2Encoder { _ => Action::SyncFlush, }; - self.process(&mut PartialBuffer::new(&[]), output, action) + process_stream( + &mut self.stream, + &mut PartialBuffer::new(&[]), + output, + action, + ) } fn finish(&mut self, output: &mut WriteBuffer<'_>) -> io::Result { - self.process(&mut PartialBuffer::new(&[]), output, Action::Finish) + process_stream( + &mut self.stream, + &mut PartialBuffer::new(&[]), + output, + Action::Finish, + ) } } diff --git a/crates/compression-codecs/src/xz2/mod.rs b/crates/compression-codecs/src/xz2/mod.rs index 626b8adb..e86be3cd 100644 --- a/crates/compression-codecs/src/xz2/mod.rs +++ b/crates/compression-codecs/src/xz2/mod.rs @@ -8,3 +8,30 @@ pub enum Xz2FileFormat { } pub use self::{decoder::Xz2Decoder, encoder::Xz2Encoder}; + +use compression_core::util::{PartialBuffer, WriteBuffer}; +use liblzma::stream::{Action, Status, Stream}; +use std::io; + +/// Return `Ok(true)` if stream ends. +fn process_stream( + stream: &mut Stream, + input: &mut PartialBuffer<&[u8]>, + output: &mut WriteBuffer<'_>, + action: Action, +) -> io::Result { + let previous_in = stream.total_in() as usize; + let previous_out = stream.total_out() as usize; + + let res = stream.process(input.unwritten(), output.initialize_unwritten(), action); + + input.advance(stream.total_in() as usize - previous_in); + output.advance(stream.total_out() as usize - previous_out); + + match res? { + Status::Ok => Ok(false), + Status::StreamEnd => Ok(true), + Status::GetCheck => Err(io::Error::other("Unexpected lzma integrity check")), + Status::MemNeeded => Err(io::ErrorKind::OutOfMemory.into()), + } +} From 7dca47b978fa1b45b3726bed53d42aeb8e9153ba Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Sun, 9 Nov 2025 01:21:32 +0000 Subject: [PATCH 10/13] Use liblzma 0.4.5 uninitialized API --- crates/compression-codecs/Cargo.toml | 2 +- crates/compression-codecs/src/xz2/mod.rs | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/compression-codecs/Cargo.toml b/crates/compression-codecs/Cargo.toml index 6ed5e4f5..2d98d6f2 100644 --- a/crates/compression-codecs/Cargo.toml +++ b/crates/compression-codecs/Cargo.toml @@ -52,7 +52,7 @@ deflate64 = { version = "0.1.10", optional = true } flate2 = { version = "1.1.4", optional = true } libzstd = { package = "zstd", version = "0.13.1", optional = true, default-features = false } lz4 = { version = "1.28.1", optional = true } -liblzma = { version = "0.4.4", optional = true } +liblzma = { version = "0.4.5", optional = true } memchr = { version = "2", optional = true } zstd-safe = { version = "7", optional = true, default-features = false } diff --git a/crates/compression-codecs/src/xz2/mod.rs b/crates/compression-codecs/src/xz2/mod.rs index e86be3cd..e45d178f 100644 --- a/crates/compression-codecs/src/xz2/mod.rs +++ b/crates/compression-codecs/src/xz2/mod.rs @@ -23,10 +23,12 @@ fn process_stream( let previous_in = stream.total_in() as usize; let previous_out = stream.total_out() as usize; - let res = stream.process(input.unwritten(), output.initialize_unwritten(), action); + // Safety: We **trust** liblzma to not write uninitialized bytes into the buffer + let res = stream.process_uninit(input.unwritten(), unsafe { output.unwritten_mut() }, action); input.advance(stream.total_in() as usize - previous_in); - output.advance(stream.total_out() as usize - previous_out); + // Safety: We **trust** liblzma to write bytes into the buffer properly + unsafe { output.assume_init_and_advance(stream.total_out() as usize - previous_out) }; match res? { Status::Ok => Ok(false), From 13c52dde91187b74e3066ce0569a32a734fc1efe Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Sun, 9 Nov 2025 02:46:36 +0000 Subject: [PATCH 11/13] Use zstd uninitialized API --- crates/compression-codecs/src/zstd/decoder.rs | 43 +++------ crates/compression-codecs/src/zstd/encoder.rs | 35 ++----- crates/compression-codecs/src/zstd/mod.rs | 92 +++++++++++++++++++ crates/compression-core/src/util.rs | 21 +++++ 4 files changed, 135 insertions(+), 56 deletions(-) diff --git a/crates/compression-codecs/src/zstd/decoder.rs b/crates/compression-codecs/src/zstd/decoder.rs index aaa8e99c..44231aed 100644 --- a/crates/compression-codecs/src/zstd/decoder.rs +++ b/crates/compression-codecs/src/zstd/decoder.rs @@ -1,13 +1,16 @@ -use crate::zstd::params::DParameter; -use crate::{DecodeV2, DecodedSize}; +use crate::{ + zstd::{params::DParameter, OperationExt}, + {DecodeV2, DecodedSize}, +}; use compression_core::{ unshared::Unshared, util::{PartialBuffer, WriteBuffer}, }; -use libzstd::stream::raw::{Decoder, Operation}; -use std::convert::TryInto; -use std::io; -use std::io::Result; +use libzstd::stream::raw::Decoder; +use std::{ + convert::TryInto, + io::{self, Result}, +}; use zstd_safe::get_error_name; #[derive(Debug)] @@ -48,24 +51,11 @@ impl ZstdDecoder { stream_ended: false, }) } - - fn call_fn_on_out_buffer( - &mut self, - output: &mut WriteBuffer<'_>, - f: fn(&mut Decoder<'static>, &mut zstd_safe::OutBuffer<'_, [u8]>) -> io::Result, - ) -> io::Result { - let mut out_buf = zstd_safe::OutBuffer::around(output.initialize_unwritten()); - let res = f(self.decoder.get_mut(), &mut out_buf); - let len = out_buf.as_slice().len(); - output.advance(len); - - res.map(|bytes_left| bytes_left == 0) - } } impl DecodeV2 for ZstdDecoder { fn reinit(&mut self) -> Result<()> { - self.decoder.get_mut().reinit()?; + self.decoder.reinit()?; self.stream_ended = false; Ok(()) } @@ -75,14 +65,7 @@ impl DecodeV2 for ZstdDecoder { input: &mut PartialBuffer<&[u8]>, output: &mut WriteBuffer<'_>, ) -> Result { - let status = self - .decoder - .get_mut() - .run_on_buffers(input.unwritten(), output.initialize_unwritten())?; - input.advance(status.bytes_read); - output.advance(status.bytes_written); - - let finished = status.remaining == 0; + let finished = self.decoder.run(input, output)?; if finished { self.stream_ended = true; } @@ -93,11 +76,11 @@ impl DecodeV2 for ZstdDecoder { // Note: stream_ended is not updated here because zstd's flush only flushes // buffered output and doesn't indicate stream completion. Stream completion // is detected in decode() when status.remaining == 0. - self.call_fn_on_out_buffer(output, |decoder, out_buf| decoder.flush(out_buf)) + self.decoder.flush(output) } fn finish(&mut self, output: &mut WriteBuffer<'_>) -> Result { - self.call_fn_on_out_buffer(output, |decoder, out_buf| decoder.finish(out_buf, true))?; + self.decoder.finish(output)?; if self.stream_ended { Ok(true) diff --git a/crates/compression-codecs/src/zstd/encoder.rs b/crates/compression-codecs/src/zstd/encoder.rs index 16a2e7a3..76f111fa 100644 --- a/crates/compression-codecs/src/zstd/encoder.rs +++ b/crates/compression-codecs/src/zstd/encoder.rs @@ -1,12 +1,13 @@ -use crate::zstd::params::CParameter; -use crate::EncodeV2; +use crate::{ + zstd::{params::CParameter, OperationExt}, + EncodeV2, +}; use compression_core::{ unshared::Unshared, util::{PartialBuffer, WriteBuffer}, }; -use libzstd::stream::raw::{Encoder, Operation}; -use std::io; -use std::io::Result; +use libzstd::stream::raw::Encoder; +use std::io::{self, Result}; #[derive(Debug)] pub struct ZstdEncoder { @@ -36,19 +37,6 @@ impl ZstdEncoder { encoder: Unshared::new(encoder), }) } - - fn call_fn_on_out_buffer( - &mut self, - output: &mut WriteBuffer<'_>, - f: fn(&mut Encoder<'static>, &mut zstd_safe::OutBuffer<'_, [u8]>) -> io::Result, - ) -> io::Result { - let mut out_buf = zstd_safe::OutBuffer::around(output.initialize_unwritten()); - let res = f(self.encoder.get_mut(), &mut out_buf); - let len = out_buf.as_slice().len(); - output.advance(len); - - res.map(|bytes_left| bytes_left == 0) - } } impl EncodeV2 for ZstdEncoder { @@ -57,20 +45,15 @@ impl EncodeV2 for ZstdEncoder { input: &mut PartialBuffer<&[u8]>, output: &mut WriteBuffer<'_>, ) -> Result<()> { - let status = self - .encoder - .get_mut() - .run_on_buffers(input.unwritten(), output.initialize_unwritten())?; - input.advance(status.bytes_read); - output.advance(status.bytes_written); + self.encoder.run(input, output)?; Ok(()) } fn flush(&mut self, output: &mut WriteBuffer<'_>) -> Result { - self.call_fn_on_out_buffer(output, |encoder, out_buf| encoder.flush(out_buf)) + self.encoder.flush(output) } fn finish(&mut self, output: &mut WriteBuffer<'_>) -> Result { - self.call_fn_on_out_buffer(output, |encoder, out_buf| encoder.finish(out_buf, true)) + self.encoder.finish(output) } } diff --git a/crates/compression-codecs/src/zstd/mod.rs b/crates/compression-codecs/src/zstd/mod.rs index 5a0b9be0..c93af350 100644 --- a/crates/compression-codecs/src/zstd/mod.rs +++ b/crates/compression-codecs/src/zstd/mod.rs @@ -3,3 +3,95 @@ mod encoder; pub mod params; pub use self::{decoder::ZstdDecoder, encoder::ZstdEncoder}; + +use compression_core::{ + unshared::Unshared, + util::{PartialBuffer, WriteBuffer}, +}; +use libzstd::stream::raw::{InBuffer, Operation, OutBuffer, WriteBuf}; +use std::io; + +#[repr(transparent)] +struct WriteBufferWrapper<'a>(WriteBuffer<'a>); + +unsafe impl WriteBuf for WriteBufferWrapper<'_> { + fn as_slice(&self) -> &[u8] { + self.0.written() + } + fn capacity(&self) -> usize { + self.0.capacity() + } + fn as_mut_ptr(&mut self) -> *mut u8 { + self.0.as_mut_ptr() + } + unsafe fn filled_until(&mut self, n: usize) { + self.0.set_written_and_initialized_len(n); + } +} + +trait WriteBufExt { + fn get_out_buf(&mut self) -> OutBuffer<'_, WriteBufferWrapper<'_>>; +} + +impl WriteBufExt for WriteBuffer<'_> { + fn get_out_buf(&mut self) -> OutBuffer<'_, WriteBufferWrapper<'_>> { + { + use std::mem::{align_of, size_of}; + assert_eq!( + size_of::>(), + size_of::>() + ); + assert_eq!( + align_of::>(), + align_of::>() + ); + } + + // Pass written_len to avoid overwriting existing data in buffer. + let written_len = self.written_len(); + OutBuffer::around_pos( + unsafe { &mut *(self as *mut _ as *mut WriteBufferWrapper<'_>) }, + written_len, + ) + } +} + +trait OperationExt { + fn reinit(&mut self) -> io::Result<()>; + + /// Return `true` if finished. + fn run( + &mut self, + input: &mut PartialBuffer<&[u8]>, + output: &mut WriteBuffer<'_>, + ) -> io::Result; + + fn flush(&mut self, output: &mut WriteBuffer<'_>) -> io::Result; + + fn finish(&mut self, output: &mut WriteBuffer<'_>) -> io::Result; +} + +impl OperationExt for Unshared { + fn reinit(&mut self) -> io::Result<()> { + self.get_mut().reinit() + } + + fn run( + &mut self, + input: &mut PartialBuffer<&[u8]>, + output: &mut WriteBuffer<'_>, + ) -> io::Result { + let mut in_buf = InBuffer::around(input.unwritten()); + let result = self.get_mut().run(&mut in_buf, &mut output.get_out_buf()); + input.advance(in_buf.pos()); + Ok(result? == 0) + } + + fn flush(&mut self, output: &mut WriteBuffer<'_>) -> io::Result { + Ok(self.get_mut().flush(&mut output.get_out_buf())? == 0) + } + + fn finish(&mut self, output: &mut WriteBuffer<'_>) -> io::Result { + Ok(self.get_mut().finish(&mut output.get_out_buf(), true)? == 0) + } +} diff --git a/crates/compression-core/src/util.rs b/crates/compression-core/src/util.rs index 99a65742..c375a97d 100644 --- a/crates/compression-core/src/util.rs +++ b/crates/compression-core/src/util.rs @@ -110,6 +110,14 @@ impl<'a> WriteBuffer<'a> { } } + pub fn capacity(&self) -> usize { + self.buffer.len() + } + + pub fn as_mut_ptr(&mut self) -> *mut u8 { + self.buffer.as_mut_ptr() as *mut _ + } + pub fn written(&self) -> &[u8] { debug_assert!(self.index <= self.initialized); @@ -193,6 +201,19 @@ impl<'a> WriteBuffer<'a> { self.initialized = self.initialized.max(self.index); } + /// Convenient function combining [`WriteBuffer::assume_init`] and [`WriteBuffer::advance`], + /// works similar to [`Vec::set_len`]. + /// + /// # Safety + /// + /// The caller must ensure that first `n` bytes of the buffer have already been initialized. + pub unsafe fn set_written_and_initialized_len(&mut self, n: usize) { + debug_assert!(n <= self.buffer.len()); + + self.index = n; + self.initialized = self.initialized.max(n); + } + pub fn copy_unwritten_from>(&mut self, other: &mut PartialBuffer) -> usize { fn inner(this: &mut WriteBuffer<'_>, input: &[u8]) -> usize { // Safety: We will never ever write uninitialized bytes into it From 52ec3cd7c04dae34f1f7b406701b7a53b71c2870 Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Sun, 9 Nov 2025 03:37:56 +0000 Subject: [PATCH 12/13] Refactor: Extract `tokio::poll_read` Extract duplicate code, easier to modify --- .../src/tokio/bufread/generic/decoder.rs | 19 +++------------ .../src/tokio/bufread/generic/encoder.rs | 14 +---------- .../src/tokio/bufread/generic/mod.rs | 23 +++++++++++++++++++ 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/crates/async-compression/src/tokio/bufread/generic/decoder.rs b/crates/async-compression/src/tokio/bufread/generic/decoder.rs index 2daff045..832fe13e 100644 --- a/crates/async-compression/src/tokio/bufread/generic/decoder.rs +++ b/crates/async-compression/src/tokio/bufread/generic/decoder.rs @@ -4,12 +4,11 @@ use crate::{ generic::bufread::impl_decoder, }; -use core::{ +use std::{ + io::{IoSlice, Result}, pin::Pin, task::{Context, Poll}, }; -use std::io::{IoSlice, Result}; - use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; impl_decoder!(); @@ -20,19 +19,7 @@ impl AsyncRead for Decoder { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - if buf.remaining() == 0 { - return Poll::Ready(Ok(())); - } - - let mut output = WriteBuffer::new_initialized(buf.initialize_unfilled()); - match self.do_poll_read(cx, &mut output)? { - Poll::Pending if output.written().is_empty() => Poll::Pending, - _ => { - let len = output.written_len(); - buf.advance(len); - Poll::Ready(Ok(())) - } - } + super::poll_read(buf, |output| self.do_poll_read(cx, output)) } } diff --git a/crates/async-compression/src/tokio/bufread/generic/encoder.rs b/crates/async-compression/src/tokio/bufread/generic/encoder.rs index edda53dd..3cd6fb07 100644 --- a/crates/async-compression/src/tokio/bufread/generic/encoder.rs +++ b/crates/async-compression/src/tokio/bufread/generic/encoder.rs @@ -18,19 +18,7 @@ impl AsyncRead for Encoder { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - if buf.remaining() == 0 { - return Poll::Ready(Ok(())); - } - - let mut output = WriteBuffer::new_initialized(buf.initialize_unfilled()); - match self.do_poll_read(cx, &mut output)? { - Poll::Pending if output.written().is_empty() => Poll::Pending, - _ => { - let len = output.written_len(); - buf.advance(len); - Poll::Ready(Ok(())) - } - } + super::poll_read(buf, |output| self.do_poll_read(cx, output)) } } diff --git a/crates/async-compression/src/tokio/bufread/generic/mod.rs b/crates/async-compression/src/tokio/bufread/generic/mod.rs index dbe1e3e2..83a6c216 100644 --- a/crates/async-compression/src/tokio/bufread/generic/mod.rs +++ b/crates/async-compression/src/tokio/bufread/generic/mod.rs @@ -2,3 +2,26 @@ mod decoder; mod encoder; pub use self::{decoder::Decoder, encoder::Encoder}; + +use crate::core::util::WriteBuffer; +use std::{io::Result, task::Poll}; +use tokio::io::ReadBuf; + +fn poll_read( + buf: &mut ReadBuf<'_>, + do_poll_read: impl FnOnce(&mut WriteBuffer<'_>) -> Poll>, +) -> Poll> { + if buf.remaining() == 0 { + return Poll::Ready(Ok(())); + } + + let mut output = WriteBuffer::new_initialized(buf.initialize_unfilled()); + match do_poll_read(&mut output)? { + Poll::Pending if output.written().is_empty() => Poll::Pending, + _ => { + let len = output.written_len(); + buf.advance(len); + Poll::Ready(Ok(())) + } + } +} From d92f2fc4fe11ad8f0cbe3ffce0834174e9099e17 Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Sun, 9 Nov 2025 05:38:23 +0000 Subject: [PATCH 13/13] Support uninitialized buffer for `tokio::bufread` --- .../src/tokio/bufread/generic/mod.rs | 26 +++++++++++++------ crates/compression-core/src/util.rs | 4 +++ 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/crates/async-compression/src/tokio/bufread/generic/mod.rs b/crates/async-compression/src/tokio/bufread/generic/mod.rs index 83a6c216..1da3933c 100644 --- a/crates/async-compression/src/tokio/bufread/generic/mod.rs +++ b/crates/async-compression/src/tokio/bufread/generic/mod.rs @@ -15,13 +15,23 @@ fn poll_read( return Poll::Ready(Ok(())); } - let mut output = WriteBuffer::new_initialized(buf.initialize_unfilled()); - match do_poll_read(&mut output)? { - Poll::Pending if output.written().is_empty() => Poll::Pending, - _ => { - let len = output.written_len(); - buf.advance(len); - Poll::Ready(Ok(())) - } + let initialized = buf.initialized().len() - buf.filled().len(); + // Safety: `WriteBuffer` has the same safety invariant as `ReadBuf` + let mut output = WriteBuffer::new_uninitialized(unsafe { buf.unfilled_mut() }); + // Safety: `ReadBuf` ensures that it is initialized + unsafe { output.assume_init(initialized) }; + + let res = do_poll_read(&mut output); + + let initialized = output.initialized_len(); + let written = output.written_len(); + + // Safety: We trust our implementation to have properly initialized it + unsafe { buf.assume_init(initialized) }; + buf.advance(written); + + match res? { + Poll::Pending if written == 0 => Poll::Pending, + _ => Poll::Ready(Ok(())), } } diff --git a/crates/compression-core/src/util.rs b/crates/compression-core/src/util.rs index c375a97d..f2073bc3 100644 --- a/crates/compression-core/src/util.rs +++ b/crates/compression-core/src/util.rs @@ -118,6 +118,10 @@ impl<'a> WriteBuffer<'a> { self.buffer.as_mut_ptr() as *mut _ } + pub fn initialized_len(&self) -> usize { + self.initialized + } + pub fn written(&self) -> &[u8] { debug_assert!(self.index <= self.initialized);