Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions crates/async-compression/src/tokio/bufread/generic/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ use crate::{
generic::bufread::impl_decoder,
};

use core::{
use std::{
io::{IoSlice, Result},
pin::Pin,
task::{Context, Poll},
};
use std::io::{IoSlice, Result};

use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};

impl_decoder!();
Expand All @@ -20,19 +19,7 @@ impl<R: AsyncBufRead, D: DecodeV2> AsyncRead for Decoder<R, D> {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
if buf.remaining() == 0 {
return Poll::Ready(Ok(()));
}

let mut output = WriteBuffer::new_initialized(buf.initialize_unfilled());
match self.do_poll_read(cx, &mut output)? {
Poll::Pending if output.written().is_empty() => Poll::Pending,
_ => {
let len = output.written_len();
buf.advance(len);
Poll::Ready(Ok(()))
}
}
super::poll_read(buf, |output| self.do_poll_read(cx, output))
}
}

Expand Down
14 changes: 1 addition & 13 deletions crates/async-compression/src/tokio/bufread/generic/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,7 @@ impl<R: AsyncBufRead, E: EncodeV2> AsyncRead for Encoder<R, E> {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
if buf.remaining() == 0 {
return Poll::Ready(Ok(()));
}

let mut output = WriteBuffer::new_initialized(buf.initialize_unfilled());
match self.do_poll_read(cx, &mut output)? {
Poll::Pending if output.written().is_empty() => Poll::Pending,
_ => {
let len = output.written_len();
buf.advance(len);
Poll::Ready(Ok(()))
}
}
super::poll_read(buf, |output| self.do_poll_read(cx, output))
}
}

Expand Down
33 changes: 33 additions & 0 deletions crates/async-compression/src/tokio/bufread/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,36 @@ mod decoder;
mod encoder;

pub use self::{decoder::Decoder, encoder::Encoder};

use crate::core::util::WriteBuffer;
use std::{io::Result, task::Poll};
use tokio::io::ReadBuf;

fn poll_read(
buf: &mut ReadBuf<'_>,
do_poll_read: impl FnOnce(&mut WriteBuffer<'_>) -> Poll<Result<()>>,
) -> Poll<Result<()>> {
if buf.remaining() == 0 {
return Poll::Ready(Ok(()));
}

let initialized = buf.initialized().len() - buf.filled().len();
// Safety: `WriteBuffer` has the same safety invariant as `ReadBuf`
let mut output = WriteBuffer::new_uninitialized(unsafe { buf.unfilled_mut() });
// Safety: `ReadBuf` ensures that it is initialized
unsafe { output.assume_init(initialized) };

let res = do_poll_read(&mut output);

let initialized = output.initialized_len();
let written = output.written_len();

// Safety: We trust our implementation to have properly initialized it
unsafe { buf.assume_init(initialized) };
buf.advance(written);

match res? {
Poll::Pending if written == 0 => Poll::Pending,
_ => Poll::Ready(Ok(())),
}
}
8 changes: 4 additions & 4 deletions crates/compression-codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ deflate64 = ["dep:deflate64"]
compression-core.workspace = true
# features
brotli = { version = "8", optional = true }
bzip2 = { version = "0.6", optional = true }
deflate64 = { version = "0.1.5", optional = true }
flate2 = { version = "1.0.13", optional = true }
bzip2 = { version = "0.6.1", optional = true }
deflate64 = { version = "0.1.10", optional = true }
flate2 = { version = "1.1.4", optional = true }
libzstd = { package = "zstd", version = "0.13.1", optional = true, default-features = false }
lz4 = { version = "1.28.1", optional = true }
liblzma = { version = "0.4.4", optional = true }
liblzma = { version = "0.4.5", optional = true }
memchr = { version = "2", optional = true }
zstd-safe = { version = "7", optional = true, default-features = false }

Expand Down
8 changes: 6 additions & 2 deletions crates/compression-codecs/src/bzip2/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ impl BzDecoder {

let result = self
.decompress
.decompress(input.unwritten(), output.initialize_unwritten())
// Safety: We **trust** bzip2 to only write initialized data to it
.decompress_uninit(input.unwritten(), unsafe { output.unwritten_mut() })
.map_err(io::Error::other);

input.advance((self.decompress.total_in() - prior_in) as usize);
output.advance((self.decompress.total_out() - prior_out) as usize);
// Safety: We **trust** bzip2 to write bytes properly
unsafe {
output.assume_init_and_advance((self.decompress.total_out() - prior_out) as usize)
};

// Track when stream has properly ended
if matches!(result, Ok(Status::StreamEnd)) {
Expand Down
6 changes: 4 additions & 2 deletions crates/compression-codecs/src/bzip2/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ impl BzEncoder {

let result = self
.compress
.compress(input.unwritten(), output.initialize_unwritten(), action)
// Safety: We **trust** bzip2 to only write initialized bytes into it
.compress_uninit(input.unwritten(), unsafe { output.unwritten_mut() }, action)
.map_err(io::Error::other);

input.advance((self.compress.total_in() - prior_in) as usize);
output.advance((self.compress.total_out() - prior_out) as usize);
// Safety: We **trust** bzip2 to properly write bytes into it
unsafe { output.assume_init_and_advance((self.compress.total_out() - prior_out) as usize) };

result
}
Expand Down
6 changes: 4 additions & 2 deletions crates/compression-codecs/src/deflate64/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ impl Deflate64Decoder {
) -> Result<bool> {
let result = self
.inflater
.inflate(input.unwritten(), output.initialize_unwritten());
// Safety: We **trust** deflate64 to not write uninitialized bytes
.inflate_uninit(input.unwritten(), unsafe { output.unwritten_mut() });

input.advance(result.bytes_consumed);
output.advance(result.bytes_written);
// Safety: We **trust** deflate64 to properly write bytes into buffer
unsafe { output.assume_init_and_advance(result.bytes_written) };

if result.data_error {
Err(Error::new(ErrorKind::InvalidData, "invalid data"))
Expand Down
12 changes: 8 additions & 4 deletions crates/compression-codecs/src/flate/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ impl FlateDecoder {
let prior_in = self.decompress.total_in();
let prior_out = self.decompress.total_out();

let result =
self.decompress
.decompress(input.unwritten(), output.initialize_unwritten(), flush);
let result = self
.decompress
// Safety: We **trust** flate2 to not write uninitialized bytes into buffer
.decompress_uninit(input.unwritten(), unsafe { output.unwritten_mut() }, flush);

input.advance((self.decompress.total_in() - prior_in) as usize);
output.advance((self.decompress.total_out() - prior_out) as usize);
// Safety: We **trust** flate2 to write bytes into buffer properly
unsafe {
output.assume_init_and_advance((self.decompress.total_out() - prior_out) as usize)
};

Ok(result?)
}
Expand Down
10 changes: 6 additions & 4 deletions crates/compression-codecs/src/flate/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ impl FlateEncoder {
let prior_in = self.compress.total_in();
let prior_out = self.compress.total_out();

let result =
self.compress
.compress(input.unwritten(), output.initialize_unwritten(), flush);
let result = self
.compress
// Safety: We **trust** flate2 to not write uninitialized bytes into buffer
.compress_uninit(input.unwritten(), unsafe { output.unwritten_mut() }, flush);

input.advance((self.compress.total_in() - prior_in) as usize);
output.advance((self.compress.total_out() - prior_out) as usize);
// Safety: We **trust** flate2 to write bytes properly into buffer
unsafe { output.assume_init_and_advance((self.compress.total_out() - prior_out) as usize) };

Ok(result?)
}
Expand Down
20 changes: 13 additions & 7 deletions crates/compression-codecs/src/lz4/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,28 @@ impl DecodeV2 for Lz4Decoder {
input: &mut PartialBuffer<&[u8]>,
output: &mut WriteBuffer<'_>,
) -> Result<bool> {
let out_buf = output.initialize_unwritten();

let mut output_size = out_buf.len();
let mut input_size = input.unwritten().len();

// Safety: We **trust** lz4 bytes to properly function as expected,
// only write decompressed, initialized data into the buffer properly.
let result = unsafe {
check_error(LZ4F_decompress(
let out_buf = output.unwritten_mut();

let mut output_size = out_buf.len();

let result = check_error(LZ4F_decompress(
self.ctx.get_mut().ctx,
out_buf.as_mut_ptr(),
out_buf.as_mut_ptr() as *mut _,
&mut output_size,
input.unwritten().as_ptr(),
&mut input_size,
core::ptr::null(),
))
));
output.assume_init_and_advance(output_size);

result
};
input.advance(input_size);
output.advance(output_size);

let finished = result? == 0;
if finished {
Expand Down
13 changes: 9 additions & 4 deletions crates/compression-codecs/src/lz4/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl Lz4Encoder {

fn write(&mut self, lz4_fn: Lz4Fn<'_, '_>, output: &mut WriteBuffer<'_>) -> Result<usize> {
let (drained_before, undrained) = self.drain_buffer(output);
if undrained > 0 {
if undrained > 0 || output.has_no_spare_space() {
return Ok(drained_before);
}

Expand All @@ -117,7 +117,8 @@ impl Lz4Encoder {
Lz4Fn::Flush | Lz4Fn::End => self.flush_buffer_size,
};

let out_buf = output.initialize_unwritten();
// Safety: We **trust** lz4 to not write uninitialized bytes
let out_buf = unsafe { output.unwritten_mut() };
let output_len = out_buf.len();

let (dst_buffer, dst_size, maybe_internal_buffer) = if min_dst_size > output_len {
Expand All @@ -126,14 +127,16 @@ impl Lz4Encoder {
.maybe_buffer
.get_or_insert_with(|| PartialBuffer::new(Vec::with_capacity(buffer_size)));
buffer.reset();
buffer.get_mut().clear();
(
buffer.unwritten_mut().as_mut_ptr(),
buffer.get_mut().spare_capacity_mut().as_mut_ptr(),
buffer_size,
Some(buffer),
)
} else {
(out_buf.as_mut_ptr(), output_len, None)
};
let dst_buffer = dst_buffer as *mut u8;

let len = match lz4_fn {
Lz4Fn::Begin => {
Expand Down Expand Up @@ -185,13 +188,15 @@ impl Lz4Encoder {
};

let drained_after = if let Some(internal_buffer) = maybe_internal_buffer {
// Safety: We **trust** lz4 to properly write data into the buffer
unsafe {
internal_buffer.get_mut().set_len(len);
}
let (d, _) = self.drain_buffer(output);
d
} else {
output.advance(len);
// Safety: We **trust** lz4 to properly write data into the buffer
unsafe { output.assume_init_and_advance(len) };
len
};

Expand Down
41 changes: 9 additions & 32 deletions crates/compression-codecs/src/xz2/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{lzma::params::LzmaDecoderParams, DecodeV2, DecodedSize};
use crate::{lzma::params::LzmaDecoderParams, xz2::process_stream, DecodeV2, DecodedSize};
use compression_core::util::{PartialBuffer, WriteBuffer};
use liblzma::stream::{Action, Status, Stream};
use liblzma::stream::{Action, Stream};
use std::{
convert::TryFrom,
fmt,
Expand Down Expand Up @@ -52,29 +52,6 @@ impl Xz2Decoder {

Self::try_from(params).unwrap()
}

/// Return `Ok(true)` on stream ends.
fn process(
&mut self,
input: &[u8],
output: &mut WriteBuffer<'_>,
action: Action,
) -> io::Result<bool> {
let previous_out = self.stream.total_out() as usize;

let status = self
.stream
.process(input, output.initialize_unwritten(), action)?;

output.advance(self.stream.total_out() as usize - previous_out);

match status {
Status::Ok => Ok(false),
Status::StreamEnd => Ok(true),
Status::GetCheck => Err(io::Error::other("Unexpected lzma integrity check")),
Status::MemNeeded => Err(io::ErrorKind::OutOfMemory.into()),
}
}
}

impl DecodeV2 for Xz2Decoder {
Expand All @@ -88,12 +65,7 @@ impl DecodeV2 for Xz2Decoder {
input: &mut PartialBuffer<&[u8]>,
output: &mut WriteBuffer<'_>,
) -> io::Result<bool> {
let previous_in = self.stream.total_in() as usize;

let res = self.process(input.unwritten(), output, Action::Run);
input.advance(self.stream.total_in() as usize - previous_in);

res
process_stream(&mut self.stream, input, output, Action::Run)
}

fn flush(&mut self, _output: &mut WriteBuffer<'_>) -> io::Result<bool> {
Expand All @@ -102,7 +74,12 @@ impl DecodeV2 for Xz2Decoder {
}

fn finish(&mut self, output: &mut WriteBuffer<'_>) -> io::Result<bool> {
self.process(&[], output, Action::Finish)
process_stream(
&mut self.stream,
&mut PartialBuffer::new(&[]),
output,
Action::Finish,
)
}
}

Expand Down
Loading