Skip to content

Commit f8a9721

Browse files
committed
tests: ready_stream as pathological example
1 parent f9f8f44 commit f8a9721

File tree

2 files changed

+239
-0
lines changed

2 files changed

+239
-0
lines changed

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ tokio = { version = "1", features = [
6666
] }
6767
tokio-test = "0.4"
6868
tokio-util = "0.7.10"
69+
tracing-subscriber = "0.3"
6970

7071
[features]
7172
# Nothing by default
@@ -239,6 +240,11 @@ name = "integration"
239240
path = "tests/integration.rs"
240241
required-features = ["full"]
241242

243+
[[test]]
244+
name = "ready_stream"
245+
path = "tests/ready_stream.rs"
246+
required-features = ["full", "tracing"]
247+
242248
[[test]]
243249
name = "server"
244250
path = "tests/server.rs"

tests/ready_stream.rs

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
use http_body_util::StreamBody;
2+
use hyper::body::Bytes;
3+
use hyper::body::Frame;
4+
use hyper::rt::{Read, ReadBufCursor, Write};
5+
use hyper::server::conn::http1;
6+
use hyper::service::service_fn;
7+
use hyper::{Response, StatusCode};
8+
use pin_project_lite::pin_project;
9+
use std::convert::Infallible;
10+
use std::io;
11+
use std::pin::Pin;
12+
use std::task::{ready, Context, Poll};
13+
use tokio::sync::mpsc;
14+
use tracing::{error, info};
15+
16+
pin_project! {
17+
#[derive(Debug)]
18+
pub struct TxReadyStream {
19+
#[pin]
20+
read_rx: mpsc::UnboundedReceiver<Vec<u8>>,
21+
write_tx: mpsc::UnboundedSender<Vec<u8>>,
22+
read_buffer: Vec<u8>,
23+
poll_since_write:bool,
24+
flush_count: usize,
25+
}
26+
}
27+
28+
impl TxReadyStream {
29+
fn new(
30+
read_rx: mpsc::UnboundedReceiver<Vec<u8>>,
31+
write_tx: mpsc::UnboundedSender<Vec<u8>>,
32+
) -> Self {
33+
Self {
34+
read_rx,
35+
write_tx,
36+
read_buffer: Vec::new(),
37+
poll_since_write: true,
38+
flush_count: 0,
39+
}
40+
}
41+
42+
/// Create a new pair of connected ReadyStreams. Returns two streams that are connected to each other.
43+
fn new_pair() -> (Self, Self) {
44+
let (s1_tx, s2_rx) = mpsc::unbounded_channel();
45+
let (s2_tx, s1_rx) = mpsc::unbounded_channel();
46+
let s1 = Self::new(s1_rx, s1_tx);
47+
let s2 = Self::new(s2_rx, s2_tx);
48+
(s1, s2)
49+
}
50+
51+
/// Send data to the other end of the stream (this will be available for reading on the other stream)
52+
fn send(&self, data: &[u8]) -> Result<(), mpsc::error::SendError<Vec<u8>>> {
53+
self.write_tx.send(data.to_vec())
54+
}
55+
56+
/// Receive data written to this stream by the other end (async)
57+
async fn recv(&mut self) -> Option<Vec<u8>> {
58+
self.read_rx.recv().await
59+
}
60+
}
61+
62+
impl Read for TxReadyStream {
63+
fn poll_read(
64+
mut self: Pin<&mut Self>,
65+
cx: &mut Context<'_>,
66+
mut buf: ReadBufCursor<'_>,
67+
) -> Poll<io::Result<()>> {
68+
let mut this = self.as_mut().project();
69+
70+
// First, try to satisfy the read request from the internal buffer
71+
if !this.read_buffer.is_empty() {
72+
let to_read = std::cmp::min(this.read_buffer.len(), buf.remaining());
73+
// Copy data from internal buffer to the read buffer
74+
buf.put_slice(&this.read_buffer[..to_read]);
75+
// Remove the consumed data from the internal buffer
76+
this.read_buffer.drain(..to_read);
77+
return Poll::Ready(Ok(()));
78+
}
79+
80+
// If internal buffer is empty, try to get data from the channel
81+
match this.read_rx.try_recv() {
82+
Ok(data) => {
83+
// Copy as much data as we can fit in the buffer
84+
let to_read = std::cmp::min(data.len(), buf.remaining());
85+
buf.put_slice(&data[..to_read]);
86+
87+
// Store any remaining data in the internal buffer for next time
88+
if to_read < data.len() {
89+
let remaining = &data[to_read..];
90+
this.read_buffer.extend_from_slice(remaining);
91+
}
92+
Poll::Ready(Ok(()))
93+
}
94+
Err(mpsc::error::TryRecvError::Empty) => {
95+
match ready!(this.read_rx.poll_recv(cx)) {
96+
Some(data) => {
97+
// Copy as much data as we can fit in the buffer
98+
let to_read = std::cmp::min(data.len(), buf.remaining());
99+
buf.put_slice(&data[..to_read]);
100+
101+
// Store any remaining data in the internal buffer for next time
102+
if to_read < data.len() {
103+
let remaining = &data[to_read..];
104+
this.read_buffer.extend_from_slice(remaining);
105+
}
106+
Poll::Ready(Ok(()))
107+
}
108+
None => Poll::Ready(Ok(())),
109+
}
110+
}
111+
Err(mpsc::error::TryRecvError::Disconnected) => {
112+
// Channel closed, return EOF
113+
Poll::Ready(Ok(()))
114+
}
115+
}
116+
}
117+
}
118+
119+
impl Write for TxReadyStream {
120+
fn poll_write(
121+
mut self: Pin<&mut Self>,
122+
_cx: &mut Context<'_>,
123+
buf: &[u8],
124+
) -> Poll<io::Result<usize>> {
125+
if !self.poll_since_write {
126+
return Poll::Pending;
127+
}
128+
self.poll_since_write = false;
129+
let this = self.project();
130+
let buf = Vec::from(&buf[..buf.len()]);
131+
let len = buf.len();
132+
133+
// Send data through the channel - this should always be ready for unbounded channels
134+
match this.write_tx.send(buf) {
135+
Ok(_) => {
136+
// Increment write count
137+
Poll::Ready(Ok(len))
138+
}
139+
Err(_) => {
140+
error!("ReadyStream::poll_write failed - channel closed");
141+
Poll::Ready(Err(io::Error::new(
142+
io::ErrorKind::BrokenPipe,
143+
"Write channel closed",
144+
)))
145+
}
146+
}
147+
}
148+
149+
fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
150+
self.flush_count += 1;
151+
// We require two flushes to complete each chunk, simulating a success at the end of the old
152+
// poll loop. After all chunks are written, we always succeed on flush to allow for finish.
153+
if self.flush_count % 2 != 0 && self.flush_count < TOTAL_CHUNKS * 2 {
154+
return Poll::Pending;
155+
}
156+
self.poll_since_write = true;
157+
Poll::Ready(Ok(()))
158+
}
159+
160+
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
161+
Poll::Ready(Ok(()))
162+
}
163+
}
164+
165+
fn init_tracing() {
166+
use std::sync::Once;
167+
static INIT: Once = Once::new();
168+
INIT.call_once(|| {
169+
tracing_subscriber::fmt()
170+
.with_max_level(tracing::Level::INFO)
171+
.with_target(true)
172+
.with_thread_ids(true)
173+
.with_thread_names(true)
174+
.init();
175+
});
176+
}
177+
178+
const TOTAL_CHUNKS: usize = 16;
179+
180+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
181+
async fn body_test() {
182+
init_tracing();
183+
// Create a pair of connected streams
184+
let (server_stream, mut client_stream) = TxReadyStream::new_pair();
185+
186+
let mut http_builder = http1::Builder::new();
187+
http_builder.max_buf_size(CHUNK_SIZE);
188+
const CHUNK_SIZE: usize = 64 * 1024;
189+
let service = service_fn(|_| async move {
190+
info!(
191+
"Creating payload of {} chunks of {} KiB each ({} MiB total)...",
192+
TOTAL_CHUNKS,
193+
CHUNK_SIZE / 1024,
194+
TOTAL_CHUNKS * CHUNK_SIZE / (1024 * 1024)
195+
);
196+
let bytes = Bytes::from(vec![0; CHUNK_SIZE]);
197+
let data = vec![bytes.clone(); TOTAL_CHUNKS];
198+
let stream = futures_util::stream::iter(
199+
data.into_iter()
200+
.map(|b| Ok::<_, Infallible>(Frame::data(b))),
201+
);
202+
let body = StreamBody::new(stream);
203+
info!("Server: Sending data response...");
204+
Ok::<_, hyper::Error>(
205+
Response::builder()
206+
.status(StatusCode::OK)
207+
.header("content-type", "application/octet-stream")
208+
.header("content-length", (TOTAL_CHUNKS * CHUNK_SIZE).to_string())
209+
.body(body)
210+
.unwrap(),
211+
)
212+
});
213+
214+
let server_task = tokio::spawn(async move {
215+
let conn = http_builder.serve_connection(server_stream, service);
216+
if let Err(e) = conn.await {
217+
error!("Server connection error: {}", e);
218+
}
219+
});
220+
221+
let get_request = "GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n";
222+
client_stream.send(get_request.as_bytes()).unwrap();
223+
224+
info!("Client is reading response...");
225+
let mut bytes_received = 0;
226+
while let Some(chunk) = client_stream.recv().await {
227+
bytes_received += chunk.len();
228+
}
229+
// Clean up
230+
server_task.abort();
231+
232+
info!(bytes_received, "Client done receiving bytes");
233+
}

0 commit comments

Comments
 (0)