Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions crates/async-compression/src/generic/bufread/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ impl Encoder {
mut input: Option<&mut PartialBuffer<&[u8]>>,
) -> ControlFlow<Result<()>> {
loop {
self.state = match self.state {
State::Encoding(mut read) => match input.as_mut() {
self.state = match &mut self.state {
State::Encoding(read) => match input.as_mut() {
None => {
if read == 0 {
if *read == 0 {
if output.written().is_empty() {
// Poll for more data
break;
Expand All @@ -56,7 +56,7 @@ impl Encoder {
return ControlFlow::Break(Err(err));
}

read += input.written().len();
*read += input.written().len();

// Poll for more data
break;
Expand Down Expand Up @@ -181,7 +181,11 @@ macro_rules! impl_encoder {
}

if is_pending {
return Poll::Pending;
if output.written().is_empty() {
return Poll::Pending;
} else {
return Poll::Ready(Ok(()));
}
}
}
}
Expand Down
68 changes: 68 additions & 0 deletions crates/async-compression/tests/gzip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,71 @@ fn gzip_bufread_chunks_decompress_with_extra_header() {

assert_eq!(output, &[1, 2, 3, 4, 5, 6][..]);
}

#[test]
#[ntest::timeout(1000)]
#[cfg(feature = "futures-io")]
fn gzip_bufread_chunks_compress_flushes_when_reader_pending() {
use crate::utils::block_on;
use async_compression::futures::bufread::GzipEncoder;
use futures::AsyncRead;
use futures::AsyncReadExt;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};

struct Input {
remaining: usize,
chunks_sent: Arc<AtomicUsize>,
chunks_received: Arc<AtomicUsize>,
}

impl AsyncRead for Input {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
if self.chunks_received.load(Ordering::Relaxed)
< self.chunks_sent.load(Ordering::Relaxed)
{
Poll::Pending
} else {
let bytes = b"X".repeat(buf.len().min(64).min(self.remaining));
buf[..bytes.len()].copy_from_slice(&bytes);
let this = self.get_mut();
this.remaining -= bytes.len();
this.chunks_sent.fetch_add(1, Ordering::Relaxed);
Poll::Ready(Ok(bytes.len()))
}
}
}

let chunks_sent = Arc::new(AtomicUsize::new(0));
let chunks_received = Arc::new(AtomicUsize::new(0));

let input = futures::io::BufReader::new(Input {
remaining: 4 * 1024,
chunks_sent: Arc::clone(&chunks_sent),
chunks_received: Arc::clone(&chunks_received),
});

let mut encoder = GzipEncoder::new(input);

let mut encoded_buffer: [u8; 64] = [0; 64];

block_on(async {
while let Ok(read) = encoder.read(&mut encoded_buffer).await {
if read == 0 {
break;
}
chunks_received.fetch_add(1, Ordering::Relaxed);
}
});

assert_eq!(
chunks_sent.load(Ordering::Relaxed),
chunks_received.load(Ordering::Relaxed)
);
}