diff --git a/crates/async-compression/src/futures/bufread/generic/decoder.rs b/crates/async-compression/src/futures/bufread/generic/decoder.rs index 6b911428..16ddb548 100644 --- a/crates/async-compression/src/futures/bufread/generic/decoder.rs +++ b/crates/async-compression/src/futures/bufread/generic/decoder.rs @@ -1,16 +1,18 @@ -use crate::{codecs::Decode, core::util::PartialBuffer, generic::bufread::impl_decoder}; - +use crate::{ + codecs::DecodeV2, + core::util::{PartialBuffer, WriteBuffer}, + generic::bufread::impl_decoder, +}; use core::{ pin::Pin, task::{Context, Poll}, }; -use std::io::{IoSlice, Result}; - use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; +use std::io::{IoSlice, Result}; impl_decoder!(); -impl AsyncRead for Decoder { +impl AsyncRead for Decoder { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -20,10 +22,10 @@ impl AsyncRead for Decoder { return Poll::Ready(Ok(0)); } - let mut output = PartialBuffer::new(buf); + let mut output = WriteBuffer::new_initialized(buf); match self.do_poll_read(cx, &mut output)? { Poll::Pending if output.written().is_empty() => Poll::Pending, - _ => Poll::Ready(Ok(output.written().len())), + _ => Poll::Ready(Ok(output.written_len())), } } } diff --git a/crates/async-compression/src/futures/bufread/generic/encoder.rs b/crates/async-compression/src/futures/bufread/generic/encoder.rs index 0496c2bd..c4deea46 100644 --- a/crates/async-compression/src/futures/bufread/generic/encoder.rs +++ b/crates/async-compression/src/futures/bufread/generic/encoder.rs @@ -1,14 +1,18 @@ -use crate::{codecs::Encode, core::util::PartialBuffer, generic::bufread::impl_encoder}; -use core::{ +use crate::{ + codecs::EncodeV2, + core::util::{PartialBuffer, WriteBuffer}, + generic::bufread::impl_encoder, +}; +use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite, IoSlice}; +use std::{ + io::Result, pin::Pin, task::{Context, Poll}, }; -use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite, IoSlice}; -use std::io::Result; impl_encoder!(); -impl AsyncRead for Encoder { +impl AsyncRead for Encoder { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -18,10 +22,10 @@ impl AsyncRead for Encoder { return Poll::Ready(Ok(0)); } - let mut output = PartialBuffer::new(buf); + let mut output = WriteBuffer::new_initialized(buf); match self.do_poll_read(cx, &mut output)? { Poll::Pending if output.written().is_empty() => Poll::Pending, - _ => Poll::Ready(Ok(output.written().len())), + _ => Poll::Ready(Ok(output.written_len())), } } } diff --git a/crates/async-compression/src/generic/bufread/decoder.rs b/crates/async-compression/src/generic/bufread/decoder.rs index 41b94100..71040aca 100644 --- a/crates/async-compression/src/generic/bufread/decoder.rs +++ b/crates/async-compression/src/generic/bufread/decoder.rs @@ -1,5 +1,7 @@ -use crate::codecs::Decode; -use crate::core::util::PartialBuffer; +use crate::{ + codecs::DecodeV2, + core::util::{PartialBuffer, WriteBuffer}, +}; use std::{io::Result, ops::ControlFlow}; @@ -31,10 +33,10 @@ impl Decoder { self.multiple_members = enabled; } - pub fn do_poll_read( + pub fn do_poll_read( &mut self, - output: &mut PartialBuffer<&mut [u8]>, - decoder: &mut D, + output: &mut WriteBuffer<'_>, + decoder: &mut dyn DecodeV2, input: &mut PartialBuffer<&[u8]>, mut first: bool, ) -> ControlFlow> { @@ -95,12 +97,12 @@ impl Decoder { } }; - if output.unwritten().is_empty() { + if output.has_no_spare_space() { return ControlFlow::Break(Ok(())); } } - if output.unwritten().is_empty() { + if output.has_no_spare_space() { ControlFlow::Break(Ok(())) } else { ControlFlow::Continue(()) @@ -127,7 +129,7 @@ macro_rules! impl_decoder { } } - impl Decoder { + impl Decoder { pub fn new(reader: R, decoder: D) -> Self { Self { reader, @@ -159,40 +161,44 @@ macro_rules! impl_decoder { } } - impl Decoder { - fn do_poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - output: &mut PartialBuffer<&mut [u8]>, - ) -> Poll> { - let mut this = self.project(); - - if let ControlFlow::Break(res) = this.inner.do_poll_read( - output, - this.decoder, - &mut PartialBuffer::new(&[][..]), - true, - ) { - return Poll::Ready(res); - } + fn do_poll_read( + inner: &mut GenericDecoder, + decoder: &mut dyn DecodeV2, + mut reader: Pin<&mut dyn AsyncBufRead>, + cx: &mut Context<'_>, + output: &mut WriteBuffer<'_>, + ) -> Poll> { + if let ControlFlow::Break(res) = + inner.do_poll_read(output, decoder, &mut PartialBuffer::new(&[][..]), true) + { + return Poll::Ready(res); + } - loop { - let mut input = - PartialBuffer::new(ready!(this.reader.as_mut().poll_fill_buf(cx))?); + loop { + let mut input = PartialBuffer::new(ready!(reader.as_mut().poll_fill_buf(cx))?); - let control_flow = - this.inner - .do_poll_read(output, this.decoder, &mut input, false); + let control_flow = inner.do_poll_read(output, decoder, &mut input, false); - let bytes_read = input.written().len(); - this.reader.as_mut().consume(bytes_read); + let bytes_read = input.written().len(); + reader.as_mut().consume(bytes_read); - if let ControlFlow::Break(res) = control_flow { - break Poll::Ready(res); - } + if let ControlFlow::Break(res) = control_flow { + break Poll::Ready(res); } } } + + impl Decoder { + fn do_poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + output: &mut WriteBuffer<'_>, + ) -> Poll> { + let this = self.project(); + + do_poll_read(this.inner, this.decoder, this.reader, cx, output) + } + } }; } pub(crate) use impl_decoder; diff --git a/crates/async-compression/src/generic/bufread/encoder.rs b/crates/async-compression/src/generic/bufread/encoder.rs index e6fe4b07..bc303f88 100644 --- a/crates/async-compression/src/generic/bufread/encoder.rs +++ b/crates/async-compression/src/generic/bufread/encoder.rs @@ -1,4 +1,7 @@ -use crate::{codecs::Encode, core::util::PartialBuffer}; +use crate::{ + codecs::EncodeV2, + core::util::{PartialBuffer, WriteBuffer}, +}; use std::{io::Result, ops::ControlFlow}; #[derive(Debug)] @@ -26,8 +29,8 @@ impl Encoder { /// `input` - should be `None` if `Poll::Pending`. pub fn do_poll_read( &mut self, - output: &mut PartialBuffer<&mut [u8]>, - encoder: &mut impl Encode, + output: &mut WriteBuffer<'_>, + encoder: &mut dyn EncodeV2, mut input: Option<&mut PartialBuffer<&[u8]>>, ) -> ControlFlow> { loop { @@ -81,12 +84,12 @@ impl Encoder { State::Done => return ControlFlow::Break(Ok(())), }; - if output.unwritten().is_empty() { + if output.has_no_spare_space() { return ControlFlow::Break(Ok(())); } } - if output.unwritten().is_empty() { + if output.has_no_spare_space() { ControlFlow::Break(Ok(())) } else { ControlFlow::Continue(()) @@ -113,7 +116,7 @@ macro_rules! impl_encoder { } } - impl Encoder { + impl Encoder { pub fn new(reader: R, encoder: E) -> Self { Self { reader, @@ -149,46 +152,52 @@ macro_rules! impl_encoder { } } - impl Encoder { - fn do_poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - output: &mut PartialBuffer<&mut [u8]>, - ) -> Poll> { - let mut this = self.project(); - - if let ControlFlow::Break(res) = - this.inner.do_poll_read(output, &mut *this.encoder, None) - { - return Poll::Ready(res); - } + fn do_poll_read( + inner: &mut GenericEncoder, + encoder: &mut dyn EncodeV2, + mut reader: Pin<&mut dyn AsyncBufRead>, + cx: &mut Context<'_>, + output: &mut WriteBuffer<'_>, + ) -> Poll> { + if let ControlFlow::Break(res) = inner.do_poll_read(output, encoder, None) { + return Poll::Ready(res); + } - loop { - let mut input = match this.reader.as_mut().poll_fill_buf(cx) { - Poll::Pending => None, - Poll::Ready(res) => Some(PartialBuffer::new(res?)), - }; + loop { + let mut input = match reader.as_mut().poll_fill_buf(cx) { + Poll::Pending => None, + Poll::Ready(res) => Some(PartialBuffer::new(res?)), + }; - let control_flow = - this.inner - .do_poll_read(output, &mut *this.encoder, input.as_mut()); + let control_flow = inner.do_poll_read(output, encoder, input.as_mut()); - let is_pending = input.is_none(); - if let Some(input) = input { - let len = input.written().len(); - this.reader.as_mut().consume(len); - } + let is_pending = input.is_none(); + if let Some(input) = input { + let len = input.written().len(); + reader.as_mut().consume(len); + } - if let ControlFlow::Break(res) = control_flow { - break Poll::Ready(res); - } + if let ControlFlow::Break(res) = control_flow { + break Poll::Ready(res); + } - if is_pending { - return Poll::Pending; - } + if is_pending { + return Poll::Pending; } } } + + impl Encoder { + fn do_poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + output: &mut WriteBuffer<'_>, + ) -> Poll> { + let this = self.project(); + + do_poll_read(this.inner, this.encoder, this.reader, cx, output) + } + } }; } pub(crate) use impl_encoder; diff --git a/crates/async-compression/src/generic/write/decoder.rs b/crates/async-compression/src/generic/write/decoder.rs index ec3035b0..0ab58bab 100644 --- a/crates/async-compression/src/generic/write/decoder.rs +++ b/crates/async-compression/src/generic/write/decoder.rs @@ -1,4 +1,8 @@ -use crate::{codecs::Decode, core::util::PartialBuffer, generic::write::AsyncBufWrite}; +use crate::{ + codecs::DecodeV2, + core::util::{PartialBuffer, WriteBuffer}, + generic::write::AsyncBufWrite, +}; use futures_core::ready; use std::{ io, @@ -32,11 +36,11 @@ impl Decoder { cx: &mut Context<'_>, input: &mut PartialBuffer<&[u8]>, mut writer: Pin<&mut dyn AsyncBufWrite>, - decoder: &mut impl Decode, + decoder: &mut dyn DecodeV2, ) -> Poll> { loop { let output = ready!(writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); + let mut output = WriteBuffer::new_initialized(output); self.state = match self.state { State::Decoding => { @@ -60,7 +64,7 @@ impl Decoder { } }; - let produced = output.written().len(); + let produced = output.written_len(); writer.as_mut().produce(produced); if let State::Done = self.state { @@ -78,7 +82,7 @@ impl Decoder { cx: &mut Context<'_>, buf: &[u8], writer: Pin<&mut dyn AsyncBufWrite>, - decoder: &mut impl Decode, + decoder: &mut dyn DecodeV2, ) -> Poll> { if buf.is_empty() { return Poll::Ready(Ok(0)); @@ -96,11 +100,11 @@ impl Decoder { &mut self, cx: &mut Context<'_>, mut writer: Pin<&mut dyn AsyncBufWrite>, - decoder: &mut impl Decode, + decoder: &mut dyn DecodeV2, ) -> Poll> { loop { let output = ready!(writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); + let mut output = WriteBuffer::new_initialized(output); let (state, done) = match self.state { State::Decoding => { @@ -121,7 +125,7 @@ impl Decoder { self.state = state; - let produced = output.written().len(); + let produced = output.written_len(); writer.as_mut().produce(produced); if done { @@ -144,7 +148,7 @@ impl Decoder { macro_rules! impl_decoder { ($poll_close: tt) => { use crate::{ - codecs::Decode, core::util::PartialBuffer, generic::write::Decoder as GenericDecoder, + codecs::DecodeV2, core::util::PartialBuffer, generic::write::Decoder as GenericDecoder, }; use futures_core::ready; use pin_project_lite::pin_project; @@ -159,7 +163,7 @@ macro_rules! impl_decoder { } } - impl Decoder { + impl Decoder { pub fn new(writer: W, decoder: D) -> Self { Self { writer: BufWriter::new(writer), @@ -187,7 +191,7 @@ macro_rules! impl_decoder { } } - impl Decoder { + impl Decoder { fn do_poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); @@ -195,7 +199,7 @@ macro_rules! impl_decoder { } } - impl AsyncWrite for Decoder { + impl AsyncWrite for Decoder { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/crates/async-compression/src/generic/write/encoder.rs b/crates/async-compression/src/generic/write/encoder.rs index faac492a..8c030780 100644 --- a/crates/async-compression/src/generic/write/encoder.rs +++ b/crates/async-compression/src/generic/write/encoder.rs @@ -1,12 +1,15 @@ +use crate::{ + codecs::EncodeV2, + core::util::{PartialBuffer, WriteBuffer}, + generic::write::AsyncBufWrite, +}; +use futures_core::ready; use std::{ io, pin::Pin, task::{Context, Poll}, }; -use crate::{codecs::Encode, core::util::PartialBuffer, generic::write::AsyncBufWrite}; -use futures_core::ready; - #[derive(Debug)] enum State { Encoding, @@ -33,11 +36,11 @@ impl Encoder { cx: &mut Context<'_>, input: &mut PartialBuffer<&[u8]>, mut writer: Pin<&mut dyn AsyncBufWrite>, - encoder: &mut impl Encode, + encoder: &mut dyn EncodeV2, ) -> Poll> { loop { let output = ready!(writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); + let mut output = WriteBuffer::new_initialized(output); self.state = match self.state { State::Encoding => { @@ -50,7 +53,7 @@ impl Encoder { } }; - let produced = output.written().len(); + let produced = output.written_len(); writer.as_mut().produce(produced); if input.unwritten().is_empty() { @@ -64,7 +67,7 @@ impl Encoder { cx: &mut Context<'_>, buf: &[u8], writer: Pin<&mut dyn AsyncBufWrite>, - encoder: &mut impl Encode, + encoder: &mut dyn EncodeV2, ) -> Poll> { if buf.is_empty() { return Poll::Ready(Ok(0)); @@ -82,11 +85,11 @@ impl Encoder { &mut self, cx: &mut Context<'_>, mut writer: Pin<&mut dyn AsyncBufWrite>, - encoder: &mut impl Encode, + encoder: &mut dyn EncodeV2, ) -> Poll> { loop { let output = ready!(writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); + let mut output = WriteBuffer::new_initialized(output); let done = match self.state { State::Encoding => encoder.flush(&mut output)?, @@ -96,7 +99,7 @@ impl Encoder { } }; - let produced = output.written().len(); + let produced = output.written_len(); writer.as_mut().produce(produced); if done { @@ -109,11 +112,11 @@ impl Encoder { &mut self, cx: &mut Context<'_>, mut writer: Pin<&mut dyn AsyncBufWrite>, - encoder: &mut impl Encode, + encoder: &mut dyn EncodeV2, ) -> Poll> { loop { let output = ready!(writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); + let mut output = WriteBuffer::new_initialized(output); self.state = match self.state { State::Encoding | State::Finishing => { @@ -127,7 +130,7 @@ impl Encoder { State::Done => State::Done, }; - let produced = output.written().len(); + let produced = output.written_len(); writer.as_mut().produce(produced); if let State::Done = self.state { @@ -139,7 +142,7 @@ impl Encoder { macro_rules! impl_encoder { ($poll_close: tt) => { - use crate::{codecs::Encode, generic::write::Encoder as GenericEncoder}; + use crate::{codecs::EncodeV2, generic::write::Encoder as GenericEncoder}; use futures_core::ready; use pin_project_lite::pin_project; @@ -191,7 +194,7 @@ macro_rules! impl_encoder { } } - impl AsyncWrite for Encoder { + impl AsyncWrite for Encoder { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/crates/async-compression/src/tokio/bufread/generic/decoder.rs b/crates/async-compression/src/tokio/bufread/generic/decoder.rs index d5e6406a..2daff045 100644 --- a/crates/async-compression/src/tokio/bufread/generic/decoder.rs +++ b/crates/async-compression/src/tokio/bufread/generic/decoder.rs @@ -1,4 +1,8 @@ -use crate::{codecs::Decode, core::util::PartialBuffer, generic::bufread::impl_decoder}; +use crate::{ + codecs::DecodeV2, + core::util::{PartialBuffer, WriteBuffer}, + generic::bufread::impl_decoder, +}; use core::{ pin::Pin, @@ -10,7 +14,7 @@ use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; impl_decoder!(); -impl AsyncRead for Decoder { +impl AsyncRead for Decoder { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -20,11 +24,11 @@ impl AsyncRead for Decoder { return Poll::Ready(Ok(())); } - let mut output = PartialBuffer::new(buf.initialize_unfilled()); + let mut output = WriteBuffer::new_initialized(buf.initialize_unfilled()); match self.do_poll_read(cx, &mut output)? { Poll::Pending if output.written().is_empty() => Poll::Pending, _ => { - let len = output.written().len(); + let len = output.written_len(); buf.advance(len); Poll::Ready(Ok(())) } @@ -32,7 +36,7 @@ impl AsyncRead for Decoder { } } -impl AsyncWrite for Decoder { +impl AsyncWrite for Decoder { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/crates/async-compression/src/tokio/bufread/generic/encoder.rs b/crates/async-compression/src/tokio/bufread/generic/encoder.rs index aae85fdc..edda53dd 100644 --- a/crates/async-compression/src/tokio/bufread/generic/encoder.rs +++ b/crates/async-compression/src/tokio/bufread/generic/encoder.rs @@ -1,14 +1,18 @@ -use crate::{codecs::Encode, core::util::PartialBuffer, generic::bufread::impl_encoder}; -use core::{ +use crate::{ + codecs::EncodeV2, + core::util::{PartialBuffer, WriteBuffer}, + generic::bufread::impl_encoder, +}; +use std::{ + io::{IoSlice, Result}, pin::Pin, task::{Context, Poll}, }; -use std::io::{IoSlice, Result}; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; impl_encoder!(); -impl AsyncRead for Encoder { +impl AsyncRead for Encoder { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -18,11 +22,11 @@ impl AsyncRead for Encoder { return Poll::Ready(Ok(())); } - let mut output = PartialBuffer::new(buf.initialize_unfilled()); + let mut output = WriteBuffer::new_initialized(buf.initialize_unfilled()); match self.do_poll_read(cx, &mut output)? { Poll::Pending if output.written().is_empty() => Poll::Pending, _ => { - let len = output.written().len(); + let len = output.written_len(); buf.advance(len); Poll::Ready(Ok(())) }