Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 3 additions & 129 deletions crates/async-compression/src/futures/bufread/generic/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,138 +1,12 @@
use crate::{codecs::Encode, core::util::PartialBuffer, generic::bufread::impl_encoder};
use core::{
pin::Pin,
task::{Context, Poll},
};
use std::io::Result;

use crate::codecs::Encode;
use crate::core::util::PartialBuffer;
use futures_core::ready;
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite, IoSlice};
use pin_project_lite::pin_project;

#[derive(Debug)]
enum State {
Encoding,
Flushing,
Finishing,
Done,
}

pin_project! {
#[derive(Debug)]
pub struct Encoder<R, E> {
#[pin]
reader: R,
encoder: E,
state: State,
}
}

impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
pub fn new(reader: R, encoder: E) -> Self {
Self {
reader,
encoder,
state: State::Encoding,
}
}

pub fn with_capacity(reader: R, encoder: E, _cap: usize) -> Self {
Self::new(reader, encoder)
}
}

impl<R, E> Encoder<R, E> {
pub fn get_ref(&self) -> &R {
&self.reader
}

pub fn get_mut(&mut self) -> &mut R {
&mut self.reader
}

pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
self.project().reader
}

pub(crate) fn get_encoder_ref(&self) -> &E {
&self.encoder
}

pub fn into_inner(self) -> R {
self.reader
}
}

impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
fn do_poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
output: &mut PartialBuffer<&mut [u8]>,
) -> Poll<Result<()>> {
let mut this = self.project();
let mut read = 0usize;

loop {
*this.state = match this.state {
State::Encoding => {
let res = this.reader.as_mut().poll_fill_buf(cx);

match res {
Poll::Pending => {
if read == 0 {
return Poll::Pending;
} else {
State::Flushing
}
}
Poll::Ready(res) => {
let input = res?;

if input.is_empty() {
State::Finishing
} else {
let mut input = PartialBuffer::new(input);
this.encoder.encode(&mut input, output)?;
let len = input.written().len();
this.reader.as_mut().consume(len);
read += len;

State::Encoding
}
}
}
}

State::Flushing => {
if this.encoder.flush(output)? {
read = 0;
State::Encoding
} else {
State::Flushing
}
}

State::Finishing => {
if this.encoder.finish(output)? {
State::Done
} else {
State::Finishing
}
}

State::Done => State::Done,
};
use std::io::Result;

if let State::Done = *this.state {
return Poll::Ready(Ok(()));
}
if output.unwritten().is_empty() {
return Poll::Ready(Ok(()));
}
}
}
}
impl_encoder!();

impl<R: AsyncBufRead, E: Encode> AsyncRead for Encoder<R, E> {
fn poll_read(
Expand Down
191 changes: 191 additions & 0 deletions crates/async-compression/src/generic/bufread/encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
use crate::{codecs::Encode, core::util::PartialBuffer};
use std::{io::Result, ops::ControlFlow};

#[derive(Debug)]
enum State {
Encoding(usize),
Flushing,
Finishing,
Done,
}

#[derive(Debug)]
pub struct Encoder {
state: State,
}

impl Default for Encoder {
fn default() -> Self {
Self {
state: State::Encoding(0),
}
}
}

impl Encoder {
/// `input` - should be `None` if `Poll::Pending`.
pub fn do_poll_read(
&mut self,
output: &mut PartialBuffer<&mut [u8]>,
encoder: &mut impl Encode,
mut input: Option<&mut PartialBuffer<&[u8]>>,
) -> ControlFlow<Result<()>> {
loop {
self.state = match self.state {
State::Encoding(mut read) => match input.as_mut() {
None => {
if read == 0 {
// Poll for more data
// TODO (nobodyxu): Return Ok if `!output.written().is_empty()`
break;
} else {
State::Flushing
}
}
Some(input) => {
if input.unwritten().is_empty() {
State::Finishing
} else {
if let Err(err) = encoder.encode(input, output) {
return ControlFlow::Break(Err(err));
}

read += input.written().len();

// Poll for more data
break;
}
}
},

State::Flushing => match encoder.flush(output) {
Ok(true) => {
self.state = State::Encoding(0);

// Poll for more data
break;
}
Ok(false) => State::Flushing,
Err(err) => return ControlFlow::Break(Err(err)),
},

State::Finishing => match encoder.finish(output) {
Ok(true) => State::Done,
Ok(false) => State::Finishing,
Err(err) => return ControlFlow::Break(Err(err)),
},

State::Done => return ControlFlow::Break(Ok(())),
};

if output.unwritten().is_empty() {
return ControlFlow::Break(Ok(()));
}
}

if output.unwritten().is_empty() {
ControlFlow::Break(Ok(()))
} else {
ControlFlow::Continue(())
}
}
}

macro_rules! impl_encoder {
() => {
use crate::generic::bufread::Encoder as GenericEncoder;

use std::ops::ControlFlow;

use futures_core::ready;
use pin_project_lite::pin_project;

pin_project! {
#[derive(Debug)]
pub struct Encoder<R, E> {
#[pin]
reader: R,
encoder: E,
inner: GenericEncoder,
}
}

impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
pub fn new(reader: R, encoder: E) -> Self {
Self {
reader,
encoder,
inner: Default::default(),
}
}

pub fn with_capacity(reader: R, encoder: E, _cap: usize) -> Self {
Self::new(reader, encoder)
}
}

impl<R, E> Encoder<R, E> {
pub fn get_ref(&self) -> &R {
&self.reader
}

pub fn get_mut(&mut self) -> &mut R {
&mut self.reader
}

pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
self.project().reader
}

pub(crate) fn get_encoder_ref(&self) -> &E {
&self.encoder
}

pub fn into_inner(self) -> R {
self.reader
}
}

impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
fn do_poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
output: &mut PartialBuffer<&mut [u8]>,
) -> Poll<Result<()>> {
let mut this = self.project();

if let ControlFlow::Break(res) =
this.inner.do_poll_read(output, &mut *this.encoder, None)
{
return Poll::Ready(res);
}

loop {
let mut input = match this.reader.as_mut().poll_fill_buf(cx) {
Poll::Pending => None,
Poll::Ready(res) => Some(PartialBuffer::new(res?)),
};

let control_flow =
this.inner
.do_poll_read(output, &mut *this.encoder, input.as_mut());

let is_pending = input.is_none();
if let Some(input) = input {
let len = input.written().len();
this.reader.as_mut().consume(len);
}

if let ControlFlow::Break(res) = control_flow {
break Poll::Ready(res);
}

if is_pending {
return Poll::Pending;
}
}
}
}
};
}
pub(crate) use impl_encoder;
2 changes: 2 additions & 0 deletions crates/async-compression/src/generic/bufread/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod decoder;
mod encoder;

pub(crate) use decoder::*;
pub(crate) use encoder::*;
Loading