diff --git a/crates/async-compression/src/futures/bufread/generic/encoder.rs b/crates/async-compression/src/futures/bufread/generic/encoder.rs index 7c5236d3..0496c2bd 100644 --- a/crates/async-compression/src/futures/bufread/generic/encoder.rs +++ b/crates/async-compression/src/futures/bufread/generic/encoder.rs @@ -1,138 +1,12 @@ +use crate::{codecs::Encode, core::util::PartialBuffer, generic::bufread::impl_encoder}; use core::{ pin::Pin, task::{Context, Poll}, }; -use std::io::Result; - -use crate::codecs::Encode; -use crate::core::util::PartialBuffer; -use futures_core::ready; use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite, IoSlice}; -use pin_project_lite::pin_project; - -#[derive(Debug)] -enum State { - Encoding, - Flushing, - Finishing, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Encoder { - #[pin] - reader: R, - encoder: E, - state: State, - } -} - -impl Encoder { - pub fn new(reader: R, encoder: E) -> Self { - Self { - reader, - encoder, - state: State::Encoding, - } - } - - pub fn with_capacity(reader: R, encoder: E, _cap: usize) -> Self { - Self::new(reader, encoder) - } -} - -impl Encoder { - pub fn get_ref(&self) -> &R { - &self.reader - } - - pub fn get_mut(&mut self) -> &mut R { - &mut self.reader - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { - self.project().reader - } - - pub(crate) fn get_encoder_ref(&self) -> &E { - &self.encoder - } - - pub fn into_inner(self) -> R { - self.reader - } -} - -impl Encoder { - fn do_poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - output: &mut PartialBuffer<&mut [u8]>, - ) -> Poll> { - let mut this = self.project(); - let mut read = 0usize; - - loop { - *this.state = match this.state { - State::Encoding => { - let res = this.reader.as_mut().poll_fill_buf(cx); - - match res { - Poll::Pending => { - if read == 0 { - return Poll::Pending; - } else { - State::Flushing - } - } - Poll::Ready(res) => { - let input = res?; - - if input.is_empty() { - State::Finishing - } else { - let mut input = PartialBuffer::new(input); - this.encoder.encode(&mut input, output)?; - let len = input.written().len(); - this.reader.as_mut().consume(len); - read += len; - - State::Encoding - } - } - } - } - - State::Flushing => { - if this.encoder.flush(output)? { - read = 0; - State::Encoding - } else { - State::Flushing - } - } - - State::Finishing => { - if this.encoder.finish(output)? { - State::Done - } else { - State::Finishing - } - } - - State::Done => State::Done, - }; +use std::io::Result; - if let State::Done = *this.state { - return Poll::Ready(Ok(())); - } - if output.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } -} +impl_encoder!(); impl AsyncRead for Encoder { fn poll_read( diff --git a/crates/async-compression/src/generic/bufread/encoder.rs b/crates/async-compression/src/generic/bufread/encoder.rs new file mode 100644 index 00000000..4187c9b7 --- /dev/null +++ b/crates/async-compression/src/generic/bufread/encoder.rs @@ -0,0 +1,191 @@ +use crate::{codecs::Encode, core::util::PartialBuffer}; +use std::{io::Result, ops::ControlFlow}; + +#[derive(Debug)] +enum State { + Encoding(usize), + Flushing, + Finishing, + Done, +} + +#[derive(Debug)] +pub struct Encoder { + state: State, +} + +impl Default for Encoder { + fn default() -> Self { + Self { + state: State::Encoding(0), + } + } +} + +impl Encoder { + /// `input` - should be `None` if `Poll::Pending`. + pub fn do_poll_read( + &mut self, + output: &mut PartialBuffer<&mut [u8]>, + encoder: &mut impl Encode, + mut input: Option<&mut PartialBuffer<&[u8]>>, + ) -> ControlFlow> { + loop { + self.state = match self.state { + State::Encoding(mut read) => match input.as_mut() { + None => { + if read == 0 { + // Poll for more data + // TODO (nobodyxu): Return Ok if `!output.written().is_empty()` + break; + } else { + State::Flushing + } + } + Some(input) => { + if input.unwritten().is_empty() { + State::Finishing + } else { + if let Err(err) = encoder.encode(input, output) { + return ControlFlow::Break(Err(err)); + } + + read += input.written().len(); + + // Poll for more data + break; + } + } + }, + + State::Flushing => match encoder.flush(output) { + Ok(true) => { + self.state = State::Encoding(0); + + // Poll for more data + break; + } + Ok(false) => State::Flushing, + Err(err) => return ControlFlow::Break(Err(err)), + }, + + State::Finishing => match encoder.finish(output) { + Ok(true) => State::Done, + Ok(false) => State::Finishing, + Err(err) => return ControlFlow::Break(Err(err)), + }, + + State::Done => return ControlFlow::Break(Ok(())), + }; + + if output.unwritten().is_empty() { + return ControlFlow::Break(Ok(())); + } + } + + if output.unwritten().is_empty() { + ControlFlow::Break(Ok(())) + } else { + ControlFlow::Continue(()) + } + } +} + +macro_rules! impl_encoder { + () => { + use crate::generic::bufread::Encoder as GenericEncoder; + + use std::ops::ControlFlow; + + use futures_core::ready; + use pin_project_lite::pin_project; + + pin_project! { + #[derive(Debug)] + pub struct Encoder { + #[pin] + reader: R, + encoder: E, + inner: GenericEncoder, + } + } + + impl Encoder { + pub fn new(reader: R, encoder: E) -> Self { + Self { + reader, + encoder, + inner: Default::default(), + } + } + + pub fn with_capacity(reader: R, encoder: E, _cap: usize) -> Self { + Self::new(reader, encoder) + } + } + + impl Encoder { + pub fn get_ref(&self) -> &R { + &self.reader + } + + pub fn get_mut(&mut self) -> &mut R { + &mut self.reader + } + + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { + self.project().reader + } + + pub(crate) fn get_encoder_ref(&self) -> &E { + &self.encoder + } + + pub fn into_inner(self) -> R { + self.reader + } + } + + impl Encoder { + fn do_poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + output: &mut PartialBuffer<&mut [u8]>, + ) -> Poll> { + let mut this = self.project(); + + if let ControlFlow::Break(res) = + this.inner.do_poll_read(output, &mut *this.encoder, None) + { + return Poll::Ready(res); + } + + loop { + let mut input = match this.reader.as_mut().poll_fill_buf(cx) { + Poll::Pending => None, + Poll::Ready(res) => Some(PartialBuffer::new(res?)), + }; + + let control_flow = + this.inner + .do_poll_read(output, &mut *this.encoder, input.as_mut()); + + let is_pending = input.is_none(); + if let Some(input) = input { + let len = input.written().len(); + this.reader.as_mut().consume(len); + } + + if let ControlFlow::Break(res) = control_flow { + break Poll::Ready(res); + } + + if is_pending { + return Poll::Pending; + } + } + } + } + }; +} +pub(crate) use impl_encoder; diff --git a/crates/async-compression/src/generic/bufread/mod.rs b/crates/async-compression/src/generic/bufread/mod.rs index 83ce0470..f3f686dd 100644 --- a/crates/async-compression/src/generic/bufread/mod.rs +++ b/crates/async-compression/src/generic/bufread/mod.rs @@ -1,3 +1,5 @@ mod decoder; +mod encoder; pub(crate) use decoder::*; +pub(crate) use encoder::*; diff --git a/crates/async-compression/src/tokio/bufread/generic/encoder.rs b/crates/async-compression/src/tokio/bufread/generic/encoder.rs index bea33f0b..aae85fdc 100644 --- a/crates/async-compression/src/tokio/bufread/generic/encoder.rs +++ b/crates/async-compression/src/tokio/bufread/generic/encoder.rs @@ -1,136 +1,12 @@ -use crate::codecs::Encode; -use crate::core::util::PartialBuffer; +use crate::{codecs::Encode, core::util::PartialBuffer, generic::bufread::impl_encoder}; use core::{ pin::Pin, task::{Context, Poll}, }; -use futures_core::ready; -use pin_project_lite::pin_project; use std::io::{IoSlice, Result}; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; -#[derive(Debug)] -enum State { - Encoding, - Flushing, - Finishing, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Encoder { - #[pin] - reader: R, - encoder: E, - state: State, - } -} - -impl Encoder { - pub fn new(reader: R, encoder: E) -> Self { - Self { - reader, - encoder, - state: State::Encoding, - } - } - - pub fn with_capacity(reader: R, encoder: E, _cap: usize) -> Self { - Self::new(reader, encoder) - } -} - -impl Encoder { - pub fn get_ref(&self) -> &R { - &self.reader - } - - pub fn get_mut(&mut self) -> &mut R { - &mut self.reader - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { - self.project().reader - } - - pub(crate) fn get_encoder_ref(&self) -> &E { - &self.encoder - } - - pub fn into_inner(self) -> R { - self.reader - } -} -impl Encoder { - fn do_poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - output: &mut PartialBuffer<&mut [u8]>, - ) -> Poll> { - let mut this = self.project(); - let mut read = 0usize; - - loop { - *this.state = match this.state { - State::Encoding => { - let res = this.reader.as_mut().poll_fill_buf(cx); - - match res { - Poll::Pending => { - if read == 0 { - return Poll::Pending; - } else { - State::Flushing - } - } - Poll::Ready(res) => { - let input = res?; - - if input.is_empty() { - State::Finishing - } else { - let mut input = PartialBuffer::new(input); - this.encoder.encode(&mut input, output)?; - let len = input.written().len(); - this.reader.as_mut().consume(len); - read += len; - - State::Encoding - } - } - } - } - - State::Flushing => { - if this.encoder.flush(output)? { - read = 0; - State::Encoding - } else { - State::Flushing - } - } - - State::Finishing => { - if this.encoder.finish(output)? { - State::Done - } else { - State::Finishing - } - } - - State::Done => State::Done, - }; - - if let State::Done = *this.state { - return Poll::Ready(Ok(())); - } - if output.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } -} +impl_encoder!(); impl AsyncRead for Encoder { fn poll_read(