Skip to content

Commit 2be76af

Browse files
committed
fixes
1 parent b25c34c commit 2be76af

File tree

8 files changed

+426
-397
lines changed

8 files changed

+426
-397
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,8 @@ path = "tests/integration.rs"
242242
required-features = ["full"]
243243

244244
[[test]]
245-
name = "ready_stream"
246-
path = "tests/ready_stream.rs"
245+
name = "ready_on_poll_stream"
246+
path = "tests/ready_on_poll_stream.rs"
247247
required-features = ["full"]
248248

249249
[[test]]

src/proto/h1/dispatch.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,15 @@ where
170170
// benchmarks often use. Perhaps it should be a config option instead.
171171
for _ in 0..16 {
172172
let _ = self.poll_read(cx)?;
173-
let _ = self.poll_write(cx)?;
174-
let conn_ready = self.poll_flush(cx)?.is_ready();
173+
let write_ready = self.poll_write(cx)?.is_ready();
174+
let flush_ready = self.poll_flush(cx)?.is_ready();
175175

176176
// If we can write more body and the connection is ready, we should
177177
// write again. If we return `Ready(Ok(())` here, we will yield
178-
// without a guaranteed wakeup from the write side of the connection.
178+
// without a guaranteed wake-up from the write side of the connection.
179179
// This would lead to a deadlock if we also don't expect reads.
180-
let wants_write_again = self.can_write_again() && conn_ready;
180+
let wants_write_again = self.can_write_again() && (write_ready || flush_ready);
181+
181182
// This could happen if reading paused before blocking on IO,
182183
// such as getting to the end of a framed message, but then
183184
// writing/flushing set the state back to Init. In that case,
@@ -187,16 +188,30 @@ where
187188
// Using this instead of task::current() and notify() inside
188189
// the Conn is noticeably faster in pipelined benchmarks.
189190
let wants_read_again = self.conn.wants_read_again();
191+
190192
// If we cannot write or read again, we yield and rely on the
191-
// wakeup from the connection futures.
193+
// wake-up from the connection futures.
192194
if !(wants_write_again || wants_read_again) {
193-
//break;
194195
return Poll::Ready(Ok(()));
195196
}
196-
}
197197

198+
// If we are continuing only because "wants_write_again", check if write is ready.
199+
if !wants_read_again && wants_write_again {
200+
// If write was ready, just proceed with the loop
201+
if write_ready {
202+
continue;
203+
}
204+
// Write was previously pending, but may have become ready since polling flush, so
205+
// we need to check it again. If we simply proceeded, the case of an unbuffered
206+
// writer where flush is always ready would cause us to hot loop.
207+
if self.poll_write(cx)?.is_pending() {
208+
// write is pending, so it is safe to yield and rely on wake-up from connection
209+
// futures.
210+
return Poll::Ready(Ok(()));
211+
}
212+
}
213+
}
198214
trace!("poll_loop yielding (self = {:p})", self);
199-
200215
task::yield_now(cx).map(|never| match never {})
201216
}
202217

tests/h1_server/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
mod streams;
2+
mod test_framework;
3+
4+
pub use streams::StreamReadHalf;
5+
pub use test_framework::{init_tracing, run_body_test, TestConfig, TestStream};

tests/h1_server/streams.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use hyper::rt::{Read, ReadBufCursor};
2+
use pin_project_lite::pin_project;
3+
use std::io;
4+
use std::pin::Pin;
5+
use std::task::{ready, Context, Poll};
6+
use tokio::sync::mpsc;
7+
8+
// Common read half shared by both stream types
9+
pin_project! {
10+
#[derive(Debug)]
11+
pub struct StreamReadHalf {
12+
#[pin]
13+
read_rx: mpsc::UnboundedReceiver<Vec<u8>>,
14+
read_buffer: Vec<u8>,
15+
}
16+
}
17+
18+
impl StreamReadHalf {
19+
pub fn new(read_rx: mpsc::UnboundedReceiver<Vec<u8>>) -> Self {
20+
Self {
21+
read_rx,
22+
read_buffer: Vec::new(),
23+
}
24+
}
25+
26+
/// Receive data written to this stream by the other end (async)
27+
pub async fn recv(&mut self) -> Option<Vec<u8>> {
28+
self.read_rx.recv().await
29+
}
30+
}
31+
32+
impl Read for StreamReadHalf {
33+
fn poll_read(
34+
mut self: Pin<&mut Self>,
35+
cx: &mut Context<'_>,
36+
mut buf: ReadBufCursor<'_>,
37+
) -> Poll<io::Result<()>> {
38+
let mut this = self.as_mut().project();
39+
40+
// First, try to satisfy the read request from the internal buffer
41+
if !this.read_buffer.is_empty() {
42+
let to_read = std::cmp::min(this.read_buffer.len(), buf.remaining());
43+
// Copy data from internal buffer to the read buffer
44+
buf.put_slice(&this.read_buffer[..to_read]);
45+
// Remove the consumed data from the internal buffer
46+
this.read_buffer.drain(..to_read);
47+
return Poll::Ready(Ok(()));
48+
}
49+
50+
// If internal buffer is empty, try to get data from the channel
51+
match this.read_rx.as_mut().get_mut().try_recv() {
52+
Ok(data) => {
53+
// Copy as much data as we can fit in the buffer
54+
let to_read = std::cmp::min(data.len(), buf.remaining());
55+
buf.put_slice(&data[..to_read]);
56+
57+
// Store any remaining data in the internal buffer for next time
58+
if to_read < data.len() {
59+
let remaining = &data[to_read..];
60+
this.read_buffer.extend_from_slice(remaining);
61+
}
62+
Poll::Ready(Ok(()))
63+
}
64+
Err(mpsc::error::TryRecvError::Empty) => {
65+
match ready!(this.read_rx.poll_recv(cx)) {
66+
Some(data) => {
67+
// Copy as much data as we can fit in the buffer
68+
let to_read = std::cmp::min(data.len(), buf.remaining());
69+
buf.put_slice(&data[..to_read]);
70+
71+
// Store any remaining data in the internal buffer for next time
72+
if to_read < data.len() {
73+
let remaining = &data[to_read..];
74+
this.read_buffer.extend_from_slice(remaining);
75+
}
76+
Poll::Ready(Ok(()))
77+
}
78+
None => Poll::Ready(Ok(())),
79+
}
80+
}
81+
Err(mpsc::error::TryRecvError::Disconnected) => {
82+
// Channel closed, return EOF
83+
Poll::Ready(Ok(()))
84+
}
85+
}
86+
}
87+
}

tests/h1_server/test_framework.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
use http_body_util::StreamBody;
2+
use hyper::body::Bytes;
3+
use hyper::body::Frame;
4+
use hyper::server::conn::http1;
5+
use hyper::service::service_fn;
6+
use hyper::{Response, StatusCode};
7+
use std::convert::Infallible;
8+
use tracing::{error, info};
9+
10+
pub struct TestConfig {
11+
pub total_chunks: usize,
12+
pub chunk_size: usize,
13+
}
14+
15+
impl Default for TestConfig {
16+
fn default() -> Self {
17+
Self {
18+
total_chunks: 16,
19+
chunk_size: 64 * 1024,
20+
}
21+
}
22+
}
23+
24+
pub fn init_tracing() {
25+
use std::sync::Once;
26+
static INIT: Once = Once::new();
27+
INIT.call_once(|| {
28+
tracing_subscriber::fmt()
29+
.with_max_level(tracing::Level::INFO)
30+
.with_target(true)
31+
.with_thread_ids(true)
32+
.with_thread_names(true)
33+
.init();
34+
});
35+
}
36+
37+
// Trait for streams that can send and receive data directly
38+
pub trait TestStream: hyper::rt::Read + hyper::rt::Write + Send + Unpin + 'static {
39+
fn send(&self, data: &[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
40+
fn recv(&mut self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<Vec<u8>>> + Send + '_>>;
41+
}
42+
43+
pub async fn run_body_test<S>(stream_pair: (S, S), config: TestConfig)
44+
where
45+
S: TestStream,
46+
{
47+
let (server_stream, mut client_stream) = stream_pair;
48+
49+
let mut http_builder = http1::Builder::new();
50+
http_builder.max_buf_size(config.chunk_size);
51+
52+
let total_chunks = config.total_chunks;
53+
let chunk_size = config.chunk_size;
54+
55+
let service = service_fn(move |_| {
56+
let total_chunks = total_chunks;
57+
let chunk_size = chunk_size;
58+
async move {
59+
info!(
60+
"Creating payload of {} chunks of {} KiB each ({} MiB total)...",
61+
total_chunks,
62+
chunk_size / 1024,
63+
total_chunks * chunk_size / (1024 * 1024)
64+
);
65+
let bytes = Bytes::from(vec![0; chunk_size]);
66+
let data = vec![bytes.clone(); total_chunks];
67+
let stream = futures_util::stream::iter(
68+
data.into_iter()
69+
.map(|b| Ok::<_, Infallible>(Frame::data(b))),
70+
);
71+
let body = StreamBody::new(stream);
72+
info!("Server: Sending data response...");
73+
Ok::<_, hyper::Error>(
74+
Response::builder()
75+
.status(StatusCode::OK)
76+
.header("content-type", "application/octet-stream")
77+
.header("content-length", (total_chunks * chunk_size).to_string())
78+
.body(body)
79+
.unwrap(),
80+
)
81+
}
82+
});
83+
84+
let server_task = tokio::spawn(async move {
85+
let conn = http_builder.serve_connection(Box::pin(server_stream), service);
86+
let conn_result = conn.await;
87+
if let Err(e) = &conn_result {
88+
error!("Server connection error: {}", e);
89+
}
90+
conn_result
91+
});
92+
93+
let get_request = "GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n";
94+
client_stream.send(get_request.as_bytes())
95+
.map_err(|e| Box::new(std::io::Error::new(std::io::ErrorKind::Other, format!("Failed to send request: {}", e))))
96+
.unwrap();
97+
98+
info!("Client is reading response...");
99+
let mut bytes_received = 0;
100+
let mut all_data = Vec::new();
101+
while let Some(chunk) = client_stream.recv().await {
102+
bytes_received += chunk.len();
103+
all_data.extend_from_slice(&chunk);
104+
}
105+
106+
// Clean up
107+
let result = server_task.await.unwrap();
108+
result.unwrap();
109+
110+
// Parse HTTP response to find body start
111+
// HTTP response format: "HTTP/1.1 200 OK\r\n...headers...\r\n\r\n<body>"
112+
let body_start = all_data.windows(4)
113+
.position(|w| w == b"\r\n\r\n")
114+
.map(|pos| pos + 4)
115+
.unwrap_or(0);
116+
117+
let body_bytes = bytes_received - body_start;
118+
assert_eq!(body_bytes, config.total_chunks * config.chunk_size,
119+
"Expected {} body bytes, got {} (total received: {}, headers: {})",
120+
config.total_chunks * config.chunk_size, body_bytes, bytes_received, body_start);
121+
info!(bytes_received, body_bytes, "Client done receiving bytes");
122+
}
123+

0 commit comments

Comments
 (0)