Skip to content

Commit ba687ed

Browse files
committed
test: #246
1 parent f0a5c74 commit ba687ed

File tree

1 file changed

+93
-0
lines changed

1 file changed

+93
-0
lines changed

tests/issues.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
#![cfg(all(feature = "tokio", feature = "zstd"))]
2+
#![allow(clippy::unusual_byte_groupings)]
3+
4+
use std::{
5+
io,
6+
pin::Pin,
7+
task::{ready, Context, Poll},
8+
};
9+
10+
use async_compression::tokio::write::ZstdEncoder;
11+
use tokio::io::{AsyncWrite, AsyncWriteExt as _};
12+
13+
/// <https://github.com/Nullus157/async-compression/issues/246>
14+
#[tokio::test]
15+
async fn issue_246() {
16+
let mut zstd_encoder = Transparent::new(ZstdEncoder::new(DelayedShutdown::default()));
17+
zstd_encoder.shutdown().await.unwrap();
18+
}
19+
20+
pin_project_lite::pin_project! {
21+
/// A simple wrapper struct that follows the [`AsyncWrite`] protocol.
22+
struct Transparent<T> {
23+
#[pin] inner: T
24+
}
25+
}
26+
27+
impl<T> Transparent<T> {
28+
fn new(inner: T) -> Self {
29+
Self { inner }
30+
}
31+
}
32+
33+
impl<T: AsyncWrite> AsyncWrite for Transparent<T> {
34+
fn poll_write(
35+
self: Pin<&mut Self>,
36+
cx: &mut Context<'_>,
37+
buf: &[u8],
38+
) -> Poll<Result<usize, io::Error>> {
39+
self.project().inner.poll_write(cx, buf)
40+
}
41+
42+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
43+
self.project().inner.poll_flush(cx)
44+
}
45+
46+
/// To quote the [`AsyncWrite`] docs:
47+
/// > Invocation of a shutdown implies an invocation of flush.
48+
/// > Once this method returns Ready it implies that a flush successfully happened before the shutdown happened.
49+
/// > That is, callers don't need to call flush before calling shutdown.
50+
/// > They can rely that by calling shutdown any pending buffered data will be written out.
51+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
52+
let mut this = self.project();
53+
ready!(this.inner.as_mut().poll_flush(cx))?;
54+
this.inner.poll_shutdown(cx)
55+
}
56+
}
57+
58+
pin_project_lite::pin_project! {
59+
/// Yields [`Poll::Pending`] the first time [`AsyncWrite::poll_shutdown`] is called.
60+
#[derive(Default)]
61+
struct DelayedShutdown {
62+
contents: Vec<u8>,
63+
num_times_shutdown_called: u8,
64+
}
65+
}
66+
67+
impl AsyncWrite for DelayedShutdown {
68+
fn poll_write(
69+
self: Pin<&mut Self>,
70+
cx: &mut Context<'_>,
71+
buf: &[u8],
72+
) -> Poll<Result<usize, io::Error>> {
73+
let _ = cx;
74+
self.project().contents.extend_from_slice(buf);
75+
Poll::Ready(Ok(buf.len()))
76+
}
77+
78+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
79+
let _ = cx;
80+
Poll::Ready(Ok(()))
81+
}
82+
83+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
84+
match self.project().num_times_shutdown_called {
85+
it @ 0 => {
86+
*it += 1;
87+
cx.waker().wake_by_ref();
88+
Poll::Pending
89+
}
90+
_ => Poll::Ready(Ok(())),
91+
}
92+
}
93+
}

0 commit comments

Comments
 (0)