Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions crates/async-compression/src/futures/bufread/generic/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use crate::{codecs::Decode, core::util::PartialBuffer, generic::bufread::impl_decoder};

use crate::{
codecs::DecodeV2,
core::util::{PartialBuffer, WriteBuffer},
generic::bufread::impl_decoder,
};
use core::{
pin::Pin,
task::{Context, Poll},
};
use std::io::{IoSlice, Result};

use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
use std::io::{IoSlice, Result};

impl_decoder!();

impl<R: AsyncBufRead, D: Decode> AsyncRead for Decoder<R, D> {
impl<R: AsyncBufRead, D: DecodeV2> AsyncRead for Decoder<R, D> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -20,10 +22,10 @@ impl<R: AsyncBufRead, D: Decode> AsyncRead for Decoder<R, D> {
return Poll::Ready(Ok(0));
}

let mut output = PartialBuffer::new(buf);
let mut output = WriteBuffer::new_initialized(buf);
match self.do_poll_read(cx, &mut output)? {
Poll::Pending if output.written().is_empty() => Poll::Pending,
_ => Poll::Ready(Ok(output.written().len())),
_ => Poll::Ready(Ok(output.written_len())),
}
}
}
Expand Down
18 changes: 11 additions & 7 deletions crates/async-compression/src/futures/bufread/generic/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use crate::{codecs::Encode, core::util::PartialBuffer, generic::bufread::impl_encoder};
use core::{
use crate::{
codecs::EncodeV2,
core::util::{PartialBuffer, WriteBuffer},
generic::bufread::impl_encoder,
};
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite, IoSlice};
use std::{
io::Result,
pin::Pin,
task::{Context, Poll},
};
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite, IoSlice};
use std::io::Result;

impl_encoder!();

impl<R: AsyncBufRead, E: Encode> AsyncRead for Encoder<R, E> {
impl<R: AsyncBufRead, E: EncodeV2> AsyncRead for Encoder<R, E> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -18,10 +22,10 @@ impl<R: AsyncBufRead, E: Encode> AsyncRead for Encoder<R, E> {
return Poll::Ready(Ok(0));
}

let mut output = PartialBuffer::new(buf);
let mut output = WriteBuffer::new_initialized(buf);
match self.do_poll_read(cx, &mut output)? {
Poll::Pending if output.written().is_empty() => Poll::Pending,
_ => Poll::Ready(Ok(output.written().len())),
_ => Poll::Ready(Ok(output.written_len())),
}
}
}
Expand Down
76 changes: 41 additions & 35 deletions crates/async-compression/src/generic/bufread/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::codecs::Decode;
use crate::core::util::PartialBuffer;
use crate::{
codecs::DecodeV2,
core::util::{PartialBuffer, WriteBuffer},
};

use std::{io::Result, ops::ControlFlow};

Expand Down Expand Up @@ -31,10 +33,10 @@ impl Decoder {
self.multiple_members = enabled;
}

pub fn do_poll_read<D: Decode>(
pub fn do_poll_read(
&mut self,
output: &mut PartialBuffer<&mut [u8]>,
decoder: &mut D,
output: &mut WriteBuffer<'_>,
decoder: &mut dyn DecodeV2,
input: &mut PartialBuffer<&[u8]>,
mut first: bool,
) -> ControlFlow<Result<()>> {
Expand Down Expand Up @@ -95,12 +97,12 @@ impl Decoder {
}
};

if output.unwritten().is_empty() {
if output.has_no_spare_space() {
return ControlFlow::Break(Ok(()));
}
}

if output.unwritten().is_empty() {
if output.has_no_spare_space() {
ControlFlow::Break(Ok(()))
} else {
ControlFlow::Continue(())
Expand All @@ -127,7 +129,7 @@ macro_rules! impl_decoder {
}
}

impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
impl<R: AsyncBufRead, D: DecodeV2> Decoder<R, D> {
pub fn new(reader: R, decoder: D) -> Self {
Self {
reader,
Expand Down Expand Up @@ -159,40 +161,44 @@ macro_rules! impl_decoder {
}
}

impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
fn do_poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
output: &mut PartialBuffer<&mut [u8]>,
) -> Poll<Result<()>> {
let mut this = self.project();

if let ControlFlow::Break(res) = this.inner.do_poll_read(
output,
this.decoder,
&mut PartialBuffer::new(&[][..]),
true,
) {
return Poll::Ready(res);
}
fn do_poll_read(
inner: &mut GenericDecoder,
decoder: &mut dyn DecodeV2,
mut reader: Pin<&mut dyn AsyncBufRead>,
cx: &mut Context<'_>,
output: &mut WriteBuffer<'_>,
) -> Poll<Result<()>> {
if let ControlFlow::Break(res) =
inner.do_poll_read(output, decoder, &mut PartialBuffer::new(&[][..]), true)
{
return Poll::Ready(res);
}

loop {
let mut input =
PartialBuffer::new(ready!(this.reader.as_mut().poll_fill_buf(cx))?);
loop {
let mut input = PartialBuffer::new(ready!(reader.as_mut().poll_fill_buf(cx))?);

let control_flow =
this.inner
.do_poll_read(output, this.decoder, &mut input, false);
let control_flow = inner.do_poll_read(output, decoder, &mut input, false);

let bytes_read = input.written().len();
this.reader.as_mut().consume(bytes_read);
let bytes_read = input.written().len();
reader.as_mut().consume(bytes_read);

if let ControlFlow::Break(res) = control_flow {
break Poll::Ready(res);
}
if let ControlFlow::Break(res) = control_flow {
break Poll::Ready(res);
}
}
}

impl<R: AsyncBufRead, D: DecodeV2> Decoder<R, D> {
fn do_poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
output: &mut WriteBuffer<'_>,
) -> Poll<Result<()>> {
let this = self.project();

do_poll_read(this.inner, this.decoder, this.reader, cx, output)
}
}
};
}
pub(crate) use impl_decoder;
85 changes: 47 additions & 38 deletions crates/async-compression/src/generic/bufread/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{codecs::Encode, core::util::PartialBuffer};
use crate::{
codecs::EncodeV2,
core::util::{PartialBuffer, WriteBuffer},
};
use std::{io::Result, ops::ControlFlow};

#[derive(Debug)]
Expand Down Expand Up @@ -26,8 +29,8 @@ impl Encoder {
/// `input` - should be `None` if `Poll::Pending`.
pub fn do_poll_read(
&mut self,
output: &mut PartialBuffer<&mut [u8]>,
encoder: &mut impl Encode,
output: &mut WriteBuffer<'_>,
encoder: &mut dyn EncodeV2,
mut input: Option<&mut PartialBuffer<&[u8]>>,
) -> ControlFlow<Result<()>> {
loop {
Expand Down Expand Up @@ -81,12 +84,12 @@ impl Encoder {
State::Done => return ControlFlow::Break(Ok(())),
};

if output.unwritten().is_empty() {
if output.has_no_spare_space() {
return ControlFlow::Break(Ok(()));
}
}

if output.unwritten().is_empty() {
if output.has_no_spare_space() {
ControlFlow::Break(Ok(()))
} else {
ControlFlow::Continue(())
Expand All @@ -113,7 +116,7 @@ macro_rules! impl_encoder {
}
}

impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
impl<R: AsyncBufRead, E: EncodeV2> Encoder<R, E> {
pub fn new(reader: R, encoder: E) -> Self {
Self {
reader,
Expand Down Expand Up @@ -149,46 +152,52 @@ macro_rules! impl_encoder {
}
}

impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
fn do_poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
output: &mut PartialBuffer<&mut [u8]>,
) -> Poll<Result<()>> {
let mut this = self.project();

if let ControlFlow::Break(res) =
this.inner.do_poll_read(output, &mut *this.encoder, None)
{
return Poll::Ready(res);
}
fn do_poll_read(
inner: &mut GenericEncoder,
encoder: &mut dyn EncodeV2,
mut reader: Pin<&mut dyn AsyncBufRead>,
cx: &mut Context<'_>,
output: &mut WriteBuffer<'_>,
) -> Poll<Result<()>> {
if let ControlFlow::Break(res) = inner.do_poll_read(output, encoder, None) {
return Poll::Ready(res);
}

loop {
let mut input = match this.reader.as_mut().poll_fill_buf(cx) {
Poll::Pending => None,
Poll::Ready(res) => Some(PartialBuffer::new(res?)),
};
loop {
let mut input = match reader.as_mut().poll_fill_buf(cx) {
Poll::Pending => None,
Poll::Ready(res) => Some(PartialBuffer::new(res?)),
};

let control_flow =
this.inner
.do_poll_read(output, &mut *this.encoder, input.as_mut());
let control_flow = inner.do_poll_read(output, encoder, input.as_mut());

let is_pending = input.is_none();
if let Some(input) = input {
let len = input.written().len();
this.reader.as_mut().consume(len);
}
let is_pending = input.is_none();
if let Some(input) = input {
let len = input.written().len();
reader.as_mut().consume(len);
}

if let ControlFlow::Break(res) = control_flow {
break Poll::Ready(res);
}
if let ControlFlow::Break(res) = control_flow {
break Poll::Ready(res);
}

if is_pending {
return Poll::Pending;
}
if is_pending {
return Poll::Pending;
}
}
}

impl<R: AsyncBufRead, E: EncodeV2> Encoder<R, E> {
fn do_poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
output: &mut WriteBuffer<'_>,
) -> Poll<Result<()>> {
let this = self.project();

do_poll_read(this.inner, this.encoder, this.reader, cx, output)
}
}
};
}
pub(crate) use impl_encoder;
Loading