From a1527ccff796d5c83d9639641b4d638384475e30 Mon Sep 17 00:00:00 2001 From: Cody Carlsen Date: Wed, 6 May 2026 12:36:56 -0700 Subject: [PATCH 1/2] Support HTTP/1.1 request pipelining on the downstream session MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add opt-in HTTP/1.1 pipelining via HttpSession::set_pipelining_enabled(). When enabled, pipelined requests on a keep-alive connection are served sequentially in request order per RFC 9112 §9.3.2, matching nginx behavior on the same traffic shape. Default (off) is unchanged: the second pipelined request is dropped or surfaced as a 400 by the body-pump idle branch. Non-adopters are untouched: ServerSession::finish() and the H1 HttpSession::reuse() keep their pre-pipelining Result> signatures and discard any captured pipelined prefix. Adopters call finish_reuse() / reuse_pipelined() to receive ReusedHttpConnection. Covers both wire shapes: same-segment overread (both requests arrive in one read) via reuse_pipelined(), and two-segment overread (request N+1 arrives while request N's response is still being written) via read_body_or_idle()'s idle branch stashing non-zero reads into the body reader's overread surface. abort_on_close / half_closed are untouched so FIN handling is unchanged. HttpPersistentSettings carries pipelining_enabled + pipelined_prefix across keep-alive reuses; read_request() consumes the prefix first. Resolves #377, #673 --- pingora-core/src/apps/http_app.rs | 8 +- pingora-core/src/apps/mod.rs | 59 +- pingora-core/src/protocols/http/mod.rs | 2 +- pingora-core/src/protocols/http/server.rs | 85 ++- pingora-core/src/protocols/http/v1/body.rs | 41 ++ pingora-core/src/protocols/http/v1/server.rs | 719 +++++++++++++++++-- pingora-proxy/src/lib.rs | 12 +- 7 files changed, 832 insertions(+), 94 deletions(-) diff --git a/pingora-core/src/apps/http_app.rs b/pingora-core/src/apps/http_app.rs index f511012c..64a77876 100644 --- a/pingora-core/src/apps/http_app.rs +++ b/pingora-core/src/apps/http_app.rs @@ -98,8 +98,8 @@ where } } let persistent_settings = HttpPersistentSettings::for_session(&http); - match http.finish().await { - Ok(c) => c.map(|s| ReusedHttpStream::new(s, Some(persistent_settings))), + match http.finish_reuse().await { + Ok(c) => c.map(|s| ReusedHttpStream::from_reused_connection(s, persistent_settings)), Err(e) => { error!("HTTP server fails to finish the request: {e}"); None @@ -206,8 +206,8 @@ where ), } let persistent_settings = HttpPersistentSettings::for_session(&http); - match http.finish().await { - Ok(c) => c.map(|s| ReusedHttpStream::new(s, Some(persistent_settings))), + match http.finish_reuse().await { + Ok(c) => c.map(|s| ReusedHttpStream::from_reused_connection(s, persistent_settings)), Err(e) => { error!("HTTP server fails to finish the request: {e}"); None diff --git a/pingora-core/src/apps/mod.rs b/pingora-core/src/apps/mod.rs index 82989e5c..ae995a29 100644 --- a/pingora-core/src/apps/mod.rs +++ b/pingora-core/src/apps/mod.rs @@ -18,13 +18,14 @@ pub mod http_app; use crate::server::ShutdownWatch; use async_trait::async_trait; +use bytes::BytesMut; use log::{debug, error}; use std::any::Any; use std::future::poll_fn; use std::sync::Arc; use crate::protocols::http::v2::server; -use crate::protocols::http::ServerSession; +use crate::protocols::http::{ReusedHttpConnection, ServerSession}; use crate::protocols::Digest; use crate::protocols::Stream; use crate::protocols::ALPN; @@ -90,12 +91,26 @@ pub struct HttpServerOptions { /// user-defined context via [`set_user_context`](Self::set_user_context). The proxy layer /// populates this through `ProxyHttp::persist_connection_context` /// and delivers it to the next request through `ProxyHttp::on_connection_reuse`. +/// +/// Also carries pipelined-prefix bytes when the caller has opted into HTTP/1.1 +/// pipelining on the previous session. See +/// [`Self::set_pipelined_prefix`] and the +/// [`HttpSession::set_pipelining_enabled`](crate::protocols::http::v1::server::HttpSession::set_pipelining_enabled) +/// docs for the RFC 9112 §9.3.2 semantics. #[derive(Debug)] pub struct HttpPersistentSettings { keepalive_timeout: Option, keepalive_reuses_remaining: Option, /// User-defined context to carry to the next request on this connection. user_context: Option>, + /// Bytes read past the end of the previous request's body, to be parsed + /// as the next pipelined request on the reused connection. + pipelined_prefix: Option, + /// Whether HTTP/1.1 pipelining was enabled on the previous session; + /// propagates to the next session so the proxy-level opt-in sticks + /// across keepalive reuses without the adopter having to re-enable + /// it on every request. + pipelining_enabled: bool, } impl HttpPersistentSettings { @@ -104,6 +119,8 @@ impl HttpPersistentSettings { keepalive_timeout: session.get_keepalive(), keepalive_reuses_remaining: session.get_keepalive_reuses_remaining(), user_context: None, + pipelined_prefix: None, + pipelining_enabled: session.pipelining_enabled(), } } @@ -117,11 +134,21 @@ impl HttpPersistentSettings { self.user_context.take() } + /// Set pipelined-prefix bytes to be fed to the next session on this + /// connection. Called by the proxy layer when HTTP/1.1 pipelining is + /// enabled on the current session and overread bytes were present at + /// reuse time. + pub fn set_pipelined_prefix(&mut self, prefix: BytesMut) { + self.pipelined_prefix = Some(prefix); + } + pub fn apply_to_session(self, session: &mut ServerSession) { let Self { keepalive_timeout, mut keepalive_reuses_remaining, user_context, + pipelined_prefix, + pipelining_enabled, } = self; // Reduce the number of times the connection for this session can be @@ -135,6 +162,15 @@ impl HttpPersistentSettings { // Carry user context into the session for the proxy layer to consume session.set_connection_user_context(user_context); + + // Replay pipelining opt-in so it stays on across keepalive reuses. + session.set_pipelining_enabled(pipelining_enabled); + + // Feed any pipelined prefix bytes to the new session's request parser + // so they are treated as the start of the next request. + if let Some(prefix) = pipelined_prefix { + session.set_pipelined_prefix(prefix); + } } } @@ -152,6 +188,19 @@ impl ReusedHttpStream { } } + /// Build a reusable HTTP stream from a finished session, preserving any + /// pipelined prefix bytes in the persistent settings for the next request. + pub fn from_reused_connection( + reused: ReusedHttpConnection, + mut persistent_settings: HttpPersistentSettings, + ) -> Self { + let (stream, pipelined_prefix) = reused.into_parts(); + if let Some(prefix) = pipelined_prefix { + persistent_settings.set_pipelined_prefix(prefix); + } + Self::new(stream, Some(persistent_settings)) + } + pub fn consume(self) -> (Stream, Option) { (self.stream, self.persistent_settings) } @@ -162,9 +211,11 @@ impl ReusedHttpStream { pub trait HttpServerApp { /// Similar to the [`ServerApp`], this function is called whenever a new HTTP session is established. /// - /// After successful processing, [`ServerSession::finish()`] can be called to return an optionally reusable - /// connection back to the service. The caller needs to make sure that the connection is in a reusable state - /// i.e., no error or incomplete read or write headers or bodies. Otherwise a `None` should be returned. + /// After successful processing, [`ServerSession::finish_reuse()`] can be + /// called to return an optionally reusable connection back to the service. + /// The caller needs to make sure that the connection is in a reusable state + /// i.e., no error or incomplete read or write headers or bodies. Otherwise + /// a `None` should be returned. async fn process_new_http( self: &Arc, mut session: ServerSession, diff --git a/pingora-core/src/protocols/http/mod.rs b/pingora-core/src/protocols/http/mod.rs index f5bc729d..8ebeb51e 100644 --- a/pingora-core/src/protocols/http/mod.rs +++ b/pingora-core/src/protocols/http/mod.rs @@ -27,7 +27,7 @@ pub mod subrequest; pub mod v1; pub mod v2; -pub use server::Session as ServerSession; +pub use server::{ReusedHttpConnection, Session as ServerSession}; /// The Pingora server name string pub const SERVER_NAME: &[u8; 7] = b"Pingora"; diff --git a/pingora-core/src/protocols/http/server.rs b/pingora-core/src/protocols/http/server.rs index 438f3cb0..d95fbe44 100644 --- a/pingora-core/src/protocols/http/server.rs +++ b/pingora-core/src/protocols/http/server.rs @@ -22,7 +22,7 @@ use super::v2::server::HttpSession as SessionV2; use super::HttpTask; use crate::custom_session; use crate::protocols::{Digest, SocketAddr, Stream}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use http::HeaderValue; use http::{header::AsHeaderName, HeaderMap}; use pingora_error::{Error, Result}; @@ -30,6 +30,28 @@ use pingora_http::{RequestHeader, ResponseHeader}; use std::any::Any; use std::time::Duration; +/// A reusable HTTP/1.x connection and bytes already read for the next request. +#[derive(Debug)] +pub struct ReusedHttpConnection { + stream: Stream, + pipelined_prefix: Option, +} + +impl ReusedHttpConnection { + pub(crate) fn new(stream: Stream, pipelined_prefix: Option) -> Self { + Self { + stream, + pipelined_prefix, + } + } + + /// Split the reusable connection into its underlying stream and optional + /// bytes already read for the next pipelined request. + pub fn into_parts(self) -> (Stream, Option) { + (self.stream, self.pipelined_prefix) + } +} + /// HTTP server session object for both HTTP/1.x and HTTP/2 pub enum Session { H1(SessionV1), @@ -227,16 +249,35 @@ impl Session { } } - /// Finish the life of this request. - /// For H1, if connection reuse is supported, a Some(Stream) will be returned, otherwise None. + /// Finish the life of this request and return a reusable stream, if any. + /// + /// This is the compatibility path: it preserves the pre-pipelining + /// stream-only signature and discards any captured pipelined prefix bytes. + /// Callers that enable HTTP/1.1 pipelining with + /// [`Self::set_pipelining_enabled`] should use [`Self::finish_reuse`] + /// instead so the prefix is carried to the next request on the connection. + pub async fn finish(self) -> Result> { + self.finish_reuse().await.map(|opt| { + opt.map(|reused| { + let (stream, _pipelined_prefix) = reused.into_parts(); + stream + }) + }) + } + + /// Finish the life of this request and return pipelining-aware reuse + /// metadata, if the connection can be reused. + /// + /// For H1, if connection reuse is supported, a reusable connection will be returned, + /// otherwise None. /// For H2, always return None because H2 stream is not reusable. /// For subrequests, there is no true underlying stream to return. - pub async fn finish(self) -> Result> { + pub async fn finish_reuse(self) -> Result> { match self { Self::H1(mut s) => { // need to flush body due to buffering s.finish_body().await?; - s.reuse().await + s.reuse_pipelined().await } Self::H2(mut s) => { s.finish()?; @@ -830,6 +871,40 @@ impl Session { } } + /// Whether HTTP/1.1 request pipelining is enabled for this session. + /// + /// Always false for H2 / Subrequest / Custom (pipelining is an H/1.1-only + /// concept). For H1, see + /// [`HttpSession::set_pipelining_enabled`](crate::protocols::http::v1::server::HttpSession::set_pipelining_enabled). + pub fn pipelining_enabled(&self) -> bool { + match self { + Self::H1(s) => s.pipelining_enabled(), + _ => false, + } + } + + /// Enable or disable HTTP/1.1 request pipelining on this session. + /// + /// No-op for H2 / Subrequest / Custom. See + /// [`HttpSession::set_pipelining_enabled`](crate::protocols::http::v1::server::HttpSession::set_pipelining_enabled) + /// for semantics. + pub fn set_pipelining_enabled(&mut self, enabled: bool) { + if let Self::H1(s) = self { + s.set_pipelining_enabled(enabled); + } + } + + /// Set pipelined bytes to be parsed as the start of this session's request. + /// + /// No-op for non-H1 sessions. See + /// [`HttpSession::set_pipelined_prefix`](crate::protocols::http::v1::server::HttpSession::set_pipelined_prefix) + /// for the lifecycle. + pub fn set_pipelined_prefix(&mut self, prefix: BytesMut) { + if let Self::H1(s) = self { + s.set_pipelined_prefix(prefix); + } + } + /// Queue a downstream proxy task for cancel-safe writing. /// /// # Panics diff --git a/pingora-core/src/protocols/http/v1/body.rs b/pingora-core/src/protocols/http/v1/body.rs index 61872af6..d1c73cfb 100644 --- a/pingora-core/src/protocols/http/v1/body.rs +++ b/pingora-core/src/protocols/http/v1/body.rs @@ -254,6 +254,47 @@ impl BodyReader { self.get_body_overread().is_some_and(|b| !b.is_empty()) } + /// Take ownership of the overread bytes, leaving `None` in their place. + /// + /// Overread bytes are bytes that were read from the stream but are beyond the + /// end of the current request's body — i.e. they belong to a pipelined next + /// request on the same connection. Callers that support HTTP/1.1 pipelining + /// extract these bytes here and feed them to the next session's request + /// parser via [`HttpSession::set_pipelined_prefix`](super::server::HttpSession::set_pipelined_prefix). + pub fn take_body_overread(&mut self) -> Option { + self.body_buf_overread.take() + } + + /// Append bytes to the overread buffer from outside the body-parsing path. + /// + /// The body reader's overread buffer is normally populated by + /// [`Self::init_content_length`] (for zero-length bodies) and + /// [`Self::finish_body_buf`] (for sized bodies) when the stream read + /// that completed the current request also pulled in bytes from the + /// next pipelined request. That path covers the "both requests in one + /// read" shape. + /// + /// The "second request arrives in a separate read after the first + /// request's body is already done" shape is different: the body + /// reader never sees those bytes — they land on the idle-branch read + /// in [`super::server::HttpSession::read_body_or_idle`]. When + /// pipelining is enabled, that caller stashes the idle read here so + /// a single downstream overread surface covers both shapes and + /// [`Self::take_body_overread`] returns them uniformly. + pub fn push_body_overread(&mut self, bytes: &[u8]) { + if bytes.is_empty() { + return; + } + match self.body_buf_overread.as_mut() { + Some(buf) => buf.extend_from_slice(bytes), + None => { + let mut buf = BytesMut::with_capacity(bytes.len()); + buf.extend_from_slice(bytes); + self.body_buf_overread = Some(buf); + } + } + } + pub fn body_done(&self) -> bool { matches!(self.body_state, PS::Complete(_) | PS::Done(_)) } diff --git a/pingora-core/src/protocols/http/v1/server.rs b/pingora-core/src/protocols/http/v1/server.rs index 9144c6e5..b754e335 100644 --- a/pingora-core/src/protocols/http/v1/server.rs +++ b/pingora-core/src/protocols/http/v1/server.rs @@ -35,7 +35,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use super::body::{BodyReader, BodyWriter}; use super::common::*; use super::header::HeaderWriter; -use crate::protocols::http::{body_buffer::FixedBuffer, date, HttpTask}; +use crate::protocols::http::{body_buffer::FixedBuffer, date, HttpTask, ReusedHttpConnection}; use crate::protocols::{Digest, SocketAddr, Stream}; use crate::utils::{BufRef, KVRef}; @@ -136,6 +136,24 @@ pub struct HttpSession { /// Whether the cancel-safe proxy task API is enabled for this session. /// Defaults to false. Can be enabled via [`set_proxy_tasks_enabled`](Self::set_proxy_tasks_enabled). proxy_tasks_enabled: bool, + /// Whether HTTP/1.1 request pipelining is enabled for this session. + /// Defaults to false. Can be enabled via [`set_pipelining_enabled`](Self::set_pipelining_enabled). + /// See [`Self::set_pipelining_enabled`] for RFC 9112 §9.3.2 semantics. + pipelining_enabled: bool, + /// Pipelined bytes from the previous request on the same keep-alive connection, + /// to be parsed as the start of this session's request. Consumed on the first + /// call to [`Self::read_request`]. Set via [`Self::set_pipelined_prefix`] after + /// the previous session's [`BodyReader::take_body_overread`] yielded bytes. + pipelined_prefix: Option, + /// Set once the idle-branch of [`Self::read_body_or_idle`] has read the + /// first bytes of a pipelined next request and pushed them onto the body + /// reader's overread surface. Further idle polls on the same request + /// return pending instead of re-reading the stream, so the body-pump + /// `tokio::select!` loop can exit via its other branches while the + /// stashed bytes travel through `reuse_pipelined()` + + /// [`super::super::HttpPersistentSettings`] into the next session. + /// Scoped narrowly so it cannot affect FIN / `abort_on_close` semantics. + pipelined_idle_bytes_stashed: bool, } impl HttpSession { @@ -181,6 +199,72 @@ impl HttpSession { half_closed: false, abort_on_close: true, proxy_tasks_enabled: false, + pipelining_enabled: false, + pipelined_prefix: None, + pipelined_idle_bytes_stashed: false, + } + } + + async fn read_request_buf( + &mut self, + buf: &mut BytesMut, + already_read: usize, + ) -> Result> { + let read_result = { + let read_event = self.underlying_stream.read_buf(buf); + match self.keepalive_timeout { + KeepaliveStatus::Timeout(d) => match timeout(d, read_event).await { + Ok(res) => res, + Err(e) => { + debug!("keepalive timeout {d:?} reached, {e}"); + return Ok(None); + } + }, + KeepaliveStatus::Infinite => { + // FIXME: this should only apply to reads between requests + read_event.await + } + KeepaliveStatus::Off => match self.read_timeout { + Some(t) => match timeout(t, read_event).await { + Ok(res) => res, + Err(e) => { + debug!("read timeout {t:?} reached, {e}"); + return Error::e_explain(ReadTimedout, format!("timeout: {t:?}")); + } + }, + None => read_event.await, + }, + } + }; + + match read_result { + Ok(n_read) => { + if n_read == 0 { + if already_read > 0 { + Error::e_explain( + ConnectionClosed, + format!( + "while reading request headers, bytes already read: {}", + already_read + ), + ) + } else { + /* common when client decides to close a keepalived session */ + debug!("Client prematurely closed connection with 0 byte sent"); + Ok(None) + } + } else { + Ok(Some(n_read)) + } + } + Err(e) => { + if already_read > 0 { + Error::e_because(ReadError, "while reading request headers", e) + } else { + /* nothing harmful since we have not ready any thing yet */ + Ok(None) + } + } } } @@ -193,6 +277,26 @@ impl HttpSession { self.buf.clear(); let mut buf = BytesMut::with_capacity(INIT_HEADER_BUF_SIZE); let mut already_read: usize = 0; + // If the caller (e.g. the proxy layer completing a pipelined request on + // a reused keep-alive connection) handed us bytes that were read past + // the end of the previous request's body, pre-fill our parse buffer so + // the header parser sees them as the start of this request. The loop + // below tries to parse first when we already have pipelined bytes — + // a pipelined prefix can contain a complete request header, in which + // case we must NOT issue another stream read (which would block). + let mut skip_next_read = false; + if let Some(prefix) = self + .pipelined_prefix + .take() + .filter(|prefix| !prefix.is_empty()) + { + if buf.capacity() < prefix.len() { + buf.reserve(prefix.len() - buf.capacity()); + } + buf.extend_from_slice(&prefix); + already_read = prefix.len(); + skip_next_read = true; + } loop { if already_read > MAX_HEADER_SIZE { /* NOTE: this check only blocks second read. The first large read is allowed @@ -204,61 +308,19 @@ impl HttpSession { ); } - let read_result = { - let read_event = self.underlying_stream.read_buf(&mut buf); - match self.keepalive_timeout { - KeepaliveStatus::Timeout(d) => match timeout(d, read_event).await { - Ok(res) => res, - Err(e) => { - debug!("keepalive timeout {d:?} reached, {e}"); - return Ok(None); - } - }, - KeepaliveStatus::Infinite => { - // FIXME: this should only apply to reads between requests - read_event.await - } - KeepaliveStatus::Off => match self.read_timeout { - Some(t) => match timeout(t, read_event).await { - Ok(res) => res, - Err(e) => { - debug!("read timeout {t:?} reached, {e}"); - return Error::e_explain(ReadTimedout, format!("timeout: {t:?}")); - } - }, - None => read_event.await, - }, - } - }; - let n = match read_result { - Ok(n_read) => { - if n_read == 0 { - if already_read > 0 { - return Error::e_explain( - ConnectionClosed, - format!( - "while reading request headers, bytes already read: {}", - already_read - ), - ); - } else { - /* common when client decides to close a keepalived session */ - debug!("Client prematurely closed connection with 0 byte sent"); - return Ok(None); - } - } - n_read - } - - Err(e) => { - if already_read > 0 { - return Error::e_because(ReadError, "while reading request headers", e); - } - /* nothing harmful since we have not ready any thing yet */ - return Ok(None); - } - }; - already_read += n; + // On the first iteration after a pipelined prefix was injected, + // attempt to parse what we already have before issuing a stream + // read. If the prefix contains a complete request header, a + // subsequent read_buf() would block for data that may never come + // (the client already pipelined everything it had to send for + // this request and is waiting for our response). + if skip_next_read { + skip_next_read = false; + } else if let Some(n) = self.read_request_buf(&mut buf, already_read).await? { + already_read += n; + } else { + return Ok(None); + } // Use loop as GOTO to retry escaped request buffer, not a real loop loop { @@ -331,6 +393,7 @@ impl HttpSession { self.request_header = Some(request_header); self.body_reader.reinit(); + self.pipelined_idle_bytes_stashed = false; self.response_written = None; self.respect_keepalive(); @@ -885,6 +948,56 @@ impl HttpSession { self.proxy_tasks_enabled = enabled; } + /// Whether HTTP/1.1 request pipelining is enabled for this session. + pub fn pipelining_enabled(&self) -> bool { + self.pipelining_enabled + } + + /// Enable or disable HTTP/1.1 request pipelining on this session. + /// + /// When enabled, if the client pipelines requests on a single keep-alive + /// connection (sends request N+1 before reading response N), the proxy will + /// serve each request sequentially with responses in request order as + /// required by RFC 9112 §9.3.2. Each pipelined request still goes through + /// independent upstream selection; only the downstream connection is reused. + /// + /// When disabled (default), pipelined bytes received alongside request N + /// cause the session to be marked un-reusable: response N is still + /// delivered, the connection closes, and request N+1 is dropped. Clients + /// are expected to detect the close and retry on a fresh connection per + /// RFC 9112 §9.3.2. + /// + /// Sequential dispatch only: response N must be fully written before + /// request N+1 begins processing. No parallel pipelining. + pub fn set_pipelining_enabled(&mut self, enabled: bool) { + self.pipelining_enabled = enabled; + } + + /// Set pipelined bytes to be parsed as the start of this session's request. + /// + /// Called by the proxy layer when continuing a keep-alive connection whose + /// previous session yielded overread bytes. The prefix is consumed on the + /// first [`Self::read_request`] call; the parser treats the prefix + any + /// further stream reads as the next request's header + body bytes. + pub fn set_pipelined_prefix(&mut self, prefix: BytesMut) { + debug_assert!( + self.pipelined_prefix.is_none(), + "pipelined prefix already set" + ); + self.pipelined_prefix = Some(prefix); + } + + /// Take ownership of bytes read past the end of this session's request + /// body. When non-empty, those bytes are the start of a pipelined + /// follow-up request on the same keep-alive connection and should be + /// fed to the next session via [`Self::set_pipelined_prefix`]. + /// + /// Returns `None` when no overread is present. After this call, the + /// session's body-reader no longer holds the bytes. + pub(crate) fn take_body_overread(&mut self) -> Option { + self.body_reader.take_body_overread() + } + async fn do_write_body_buf(&mut self) -> Result> { // Don't flush empty chunks, they are considered end of body for chunks if self.body_write_buf.is_empty() { @@ -1029,13 +1142,21 @@ impl HttpSession { /// This function will (async) block forever until the client closes the connection. pub async fn idle(&mut self) -> Result { - // NOTE: this implementation breaks http pipelining, ideally we need poll_error - // NOTE: buf cannot be empty, openssl-rs read() requires none empty buf. - let mut buf: [u8; 1] = [0; 1]; - self.underlying_stream - .read(&mut buf) + // OpenSSL read requires a non-empty buffer. Keep this probe at one byte + // so idle-style reads consume at most one byte before returning control. + self.read_idle_probe("during HTTP idle state") + .await + .map(|(_, read)| read) + } + + async fn read_idle_probe(&mut self, context: &'static str) -> Result<([u8; 1], usize)> { + let mut probe = [0; 1]; + let read = self + .underlying_stream + .read(&mut probe) .await - .or_err(ReadError, "during HTTP idle state") + .or_err(ReadError, context)?; + Ok((probe, read)) } /// This function will return body bytes (same as [`Self::read_body_bytes()`]), but after @@ -1069,8 +1190,22 @@ impl HttpSession { } return std::future::pending().await; } + // When pipelining is enabled and an earlier idle read already + // stashed the next request's bytes as overread, any further + // poll of this function on the same request must not read + // the stream again (the proxy's body-pump `select!` loop + // will call back into here repeatedly until its other + // branches resolve the request). Go straight to pending. + // `abort_on_close` and the FIN handling above stay untouched + // — this branch is exclusive to the pipelining case where + // bytes (not FIN) arrived on the idle poll. + if self.pipelining_enabled && self.pipelined_idle_bytes_stashed { + return std::future::pending().await; + } // XXX: account for upgraded body reader change, if the read half split from the write half - let read = self.idle().await?; + let (probe, read) = self + .read_idle_probe("during HTTP body-or-idle state") + .await?; if read == 0 { self.half_closed = true; self.set_keepalive(None); @@ -1089,6 +1224,33 @@ impl HttpSession { // will fail. std::future::pending().await } + } else if self.pipelining_enabled { + // The read bytes are the start of a pipelined next + // request on this keep-alive connection (RFC 9112 + // §9.3.2). Stash them on the body reader's overread + // surface so the existing `take_body_overread` + + // `HttpPersistentSettings` extraction path picks them + // up at `reuse_pipelined()` time and feeds them to the next + // session via `set_pipelined_prefix`. + // + // Returning pending (rather than `Ok(None)` or an + // error) signals the body-pump `tokio::select!` loop + // that the downstream has no more body work to do on + // this request — the loop exits naturally via the + // upstream-response-done / response-write-done + // branches, and `finish_reuse()` runs its standard + // pipelining extraction. Crucially, we do NOT touch + // `half_closed` or `abort_on_close`; those control TCP FIN + // handling and must still abort immediately when the client + // disconnects. + // Keep the stash and flag update adjacent and synchronous. + // Once the prefix byte is handed to the overread path, the + // flag prevents later idle polls for this request from + // reading the stream again. + self.body_reader.push_body_overread(&probe[..read]); + self.pipelined_idle_bytes_stashed = true; + debug!("pipelined request bytes stashed as overread ({read} bytes)"); + std::future::pending().await } else { Error::e_explain(ConnectError, "Sent data after end of body") } @@ -1237,32 +1399,51 @@ impl HttpSession { .map(|d| d.local_addr())? } - /// Consume `self`, if the connection can be reused, the underlying stream will be returned - /// to be fed to the next [`Self::new()`]. This drains any remaining request body if it hasn't - /// yet been read and the stream is reusable. + /// Consume `self` and return the underlying stream if the connection can be reused. + /// + /// This is the compatibility path: it preserves the pre-pipelining + /// stream-only signature and discards any captured pipelined prefix bytes. + /// Callers that enable HTTP/1.1 pipelining with + /// [`Self::set_pipelining_enabled`] should use [`Self::reuse_pipelined`] + /// instead so the prefix is carried to the next request on the connection. + pub async fn reuse(self) -> Result> { + self.reuse_pipelined().await.map(|opt| { + opt.map(|reused| { + let (stream, _pipelined_prefix) = reused.into_parts(); + stream + }) + }) + } + + /// Consume `self`, if the connection can be reused, the underlying stream and any pipelined + /// prefix bytes will be returned to be fed to the next [`Self::new()`]. This drains any + /// remaining request body if it hasn't yet been read and the stream is reusable. /// /// The next session can just call [`Self::read_request()`]. /// /// If the connection cannot be reused, the underlying stream will be closed and `None` will be /// returned. If there was an error while draining any remaining request body that error will /// be returned. - pub async fn reuse(mut self) -> Result> { + pub async fn reuse_pipelined(mut self) -> Result> { if !self.will_keepalive() { debug!("HTTP shutdown connection"); self.shutdown().await; Ok(None) } else { self.drain_request_body().await?; - // XXX: currently pipelined requests are not properly read without - // pipelining support, and pingora 400s if pipelined requests are sent - // in the middle of another request. - // We will mark the connection as un-reusable so it may be closed, - // the pipelined request left unread, and the client can attempt to resend - if self.body_reader.has_bytes_overread() { + if self.body_reader.has_bytes_overread() && !self.pipelining_enabled { debug!("bytes overread on request, disallowing reuse"); Ok(None) } else { - Ok(Some(self.underlying_stream)) + let pipelined_prefix = self + .pipelining_enabled + .then(|| self.take_body_overread()) + .flatten() + .filter(|prefix| !prefix.is_empty()); + Ok(Some(ReusedHttpConnection::new( + self.underlying_stream, + pipelined_prefix, + ))) } } } @@ -3746,3 +3927,393 @@ mod test_abort_on_close { assert!(s.abort_on_close); } } + +#[cfg(test)] +mod test_pipelining { + //! Tests for HTTP/1.1 request pipelining support (RFC 9112 §9.3.2). + //! + //! Pipelining is an opt-in behavior: when enabled via + //! [`HttpSession::set_pipelining_enabled`], the session tolerates + //! overread bytes on reuse (they belong to the next request) and a new + //! session can have them fed in via [`HttpSession::set_pipelined_prefix`]. + //! + //! When disabled (default), overread bytes cause [`HttpSession::reuse`] + //! to return `Ok(None)` so the connection closes — the historical + //! behavior preserved for callers that do not opt in. + + use super::*; + use rstest::rstest; + use tokio_test::io::Builder; + + fn init_log() { + let _ = env_logger::builder().is_test(true).try_init(); + } + + /// Default state: pipelining is off. + #[tokio::test] + async fn pipelining_disabled_by_default() { + init_log(); + let mock_io = Builder::new().build(); + let s = HttpSession::new(Box::new(mock_io)); + assert!(!s.pipelining_enabled()); + } + + /// Toggling the pipelining flag is round-trippable. + #[tokio::test] + async fn set_pipelining_enabled_toggles() { + init_log(); + let mock_io = Builder::new().build(); + let mut s = HttpSession::new(Box::new(mock_io)); + assert!(!s.pipelining_enabled()); + s.set_pipelining_enabled(true); + assert!(s.pipelining_enabled()); + s.set_pipelining_enabled(false); + assert!(!s.pipelining_enabled()); + } + + /// When pipelining is disabled (default), overread bytes must cause + /// reuse to return `None`. Pipelining opt-in must not regress that + /// compatibility behavior. + #[rstest] + #[case(true)] // pipelining explicitly off + #[case(false)] // pipelining flag never set + #[tokio::test] + async fn reuse_rejects_overread_when_pipelining_disabled(#[case] explicit_off: bool) { + init_log(); + let request = + b"GET / HTTP/1.1\r\nHost: pingora.org\r\nContent-Length: 0\r\n\r\npipelined_next"; + let mock_io = Builder::new().read(request).build(); + let mut s = HttpSession::new(Box::new(mock_io)); + if explicit_off { + s.set_pipelining_enabled(false); + } + s.read_request().await.unwrap(); + // Overread is captured when body reading initializes — poll the + // body to trigger the init_content_length path. + let _ = s.read_body_bytes().await.unwrap(); + assert!(s.body_reader.has_bytes_overread()); + let reused = s.reuse().await.unwrap(); + assert!( + reused.is_none(), + "reuse must return None without pipelining" + ); + } + + /// When pipelining is enabled and overread bytes are present, + /// reuse returns both the stream and the extracted prefix. + #[tokio::test] + async fn reuse_allows_overread_when_pipelining_enabled() { + init_log(); + let request = + b"GET / HTTP/1.1\r\nHost: pingora.org\r\nContent-Length: 0\r\n\r\npipelined_next"; + let mock_io = Builder::new().read(request).build(); + let mut s = HttpSession::new(Box::new(mock_io)); + s.set_pipelining_enabled(true); + s.read_request().await.unwrap(); + let _ = s.read_body_bytes().await.unwrap(); + assert!(s.body_reader.has_bytes_overread()); + + let reused = s + .reuse_pipelined() + .await + .unwrap() + .expect("connection reusable"); + let (_stream, prefix) = reused.into_parts(); + let prefix = prefix.expect("overread must be returned as pipelined prefix"); + assert_eq!(prefix.as_ref(), b"pipelined_next"); + } + + /// Same-read pipelining with no prior body poll still extracts the prefix. + #[tokio::test] + async fn reuse_extracts_prefix_without_body_poll() { + init_log(); + let req1 = b"GET /one HTTP/1.1\r\nHost: pingora.org\r\n\r\n"; + let req2 = b"GET /two HTTP/1.1\r\nHost: pingora.org\r\n\r\n"; + let mut combined = Vec::with_capacity(req1.len() + req2.len()); + combined.extend_from_slice(req1); + combined.extend_from_slice(req2); + + let mock_io = Builder::new().read(&combined).build(); + let mut a = HttpSession::new(Box::new(mock_io)); + a.set_pipelining_enabled(true); + a.read_request().await.unwrap(); + assert_eq!(a.req_header().uri.path(), "/one"); + + let reused = a + .reuse_pipelined() + .await + .unwrap() + .expect("connection reusable"); + let (stream, prefix) = reused.into_parts(); + let prefix = prefix.expect("pipelined prefix must be extracted during reuse"); + assert_eq!(prefix.as_ref(), req2); + + let mut b = HttpSession::new(stream); + b.set_pipelining_enabled(true); + b.set_pipelined_prefix(prefix); + b.read_request() + .await + .unwrap() + .expect("pipelined request must parse"); + assert_eq!(b.req_header().uri.path(), "/two"); + } + + /// Content-Length: 0 has the same extraction requirement as absent length. + #[tokio::test] + async fn reuse_extracts_content_length_zero_prefix_without_body_poll() { + init_log(); + let req1 = b"GET /one HTTP/1.1\r\nHost: pingora.org\r\nContent-Length: 0\r\n\r\n"; + let req2 = b"GET /two HTTP/1.1\r\nHost: pingora.org\r\nContent-Length: 0\r\n\r\n"; + let mut combined = Vec::with_capacity(req1.len() + req2.len()); + combined.extend_from_slice(req1); + combined.extend_from_slice(req2); + + let mock_io = Builder::new().read(&combined).build(); + let mut a = HttpSession::new(Box::new(mock_io)); + a.set_pipelining_enabled(true); + a.read_request().await.unwrap(); + assert_eq!(a.req_header().uri.path(), "/one"); + + let reused = a + .reuse_pipelined() + .await + .unwrap() + .expect("connection reusable"); + let (_stream, prefix) = reused.into_parts(); + let prefix = prefix.expect("pipelined prefix must be extracted during reuse"); + assert_eq!(prefix.as_ref(), req2); + } + + /// The new session parses the pipelined prefix as the start of a + /// request without issuing any stream read — the mock_io allows no + /// reads, so if read_request() tried to pull from the stream it would + /// panic. This is the essential pipelining property: a prefix that + /// already contains a complete request is parsed without waiting for + /// additional bytes. + #[tokio::test] + async fn read_request_consumes_complete_prefix_without_stream_read() { + init_log(); + let prefix = b"GET /two HTTP/1.1\r\nHost: pingora.org\r\nContent-Length: 0\r\n\r\n"; + // Mock IO that would panic on any read — ensures the parse is + // wholly satisfied by the pipelined prefix. + let mock_io = Builder::new().build(); + let mut s = HttpSession::new(Box::new(mock_io)); + s.set_pipelined_prefix(BytesMut::from(&prefix[..])); + let n = s + .read_request() + .await + .unwrap() + .expect("request must parse from prefix alone"); + assert!(n > 0); + assert_eq!(s.req_header().uri.path(), "/two"); + } + + /// When the prefix is only the beginning of a request, read_request() + /// continues to read from the stream to complete the header. + #[tokio::test] + async fn read_request_falls_through_to_stream_for_partial_prefix() { + init_log(); + let prefix = b"GET /two HTTP/1.1\r\nHost: "; + let rest = b"pingora.org\r\nContent-Length: 0\r\n\r\n"; + let mock_io = Builder::new().read(rest).build(); + let mut s = HttpSession::new(Box::new(mock_io)); + s.set_pipelined_prefix(BytesMut::from(&prefix[..])); + let n = s + .read_request() + .await + .unwrap() + .expect("request must parse across prefix + stream"); + assert!(n > 0); + assert_eq!(s.req_header().uri.path(), "/two"); + } + + /// Body-pump path: request 2's bytes arrive in a SEPARATE read + /// after request 1 has been fully consumed. The proxy's body-pump + /// loop polls the downstream socket via + /// [`HttpSession::read_body_or_idle`]`(true)` while request 1's + /// response is still being written. The idle branch at + /// `read_body_or_idle` currently raises + /// `ConnectError("Sent data after end of body")` when the idle + /// read returns > 0 bytes — which is exactly the shape pipelining + /// traffic takes when requests span TCP segment boundaries. + /// + /// This covers the two-segment pipelining case: request 2's bytes + /// arrive during the proxy's idle poll, not during request 1's body + /// read. The reuse_pipelined() overread path (already covered by the tests + /// above) never fires because request 2's bytes were never in + /// `body_buf_overread` to begin with. + /// + /// When pipelining is enabled on the session, this branch must + /// NOT raise `ConnectError`. Instead, the byte(s) read by + /// `idle()` must be stashed so the reuse_pipelined() path can hand them + /// to the next session via the standard `take_body_overread` + /// extractor. `idle()` uses a 1-byte probe buffer, so the + /// overread surface will typically hold 1 byte per idle poll — + /// the remaining bytes of request 2 stay on the underlying + /// stream and are read by the next session's `read_request` + /// (which seeds itself with the pipelined prefix and continues + /// reading from the stream to complete the header). + #[tokio::test] + async fn idle_read_stashes_bytes_when_pipelining_enabled() { + init_log(); + let req1 = b"GET /one HTTP/1.1\r\nHost: pingora.org\r\nContent-Length: 0\r\n\r\n"; + // Only the first byte of req2 is queued — the idle-branch + // read in `read_body_or_idle` uses a 1-byte probe buffer, + // so that's all it will consume. The rest of req2 would + // live on the kernel socket buffer in real traffic and be + // drained by the next session. + let req2_first = b"G"; + + // No `.wait(...)` between the two reads — we want the + // second read to be immediately available once the first + // consumer polls. `tokio-test::io::Builder` delivers reads + // one poll at a time regardless, which is what models a + // TCP segment boundary for our purposes. + let mock_io = Builder::new().read(&req1[..]).read(&req2_first[..]).build(); + + let mut s = HttpSession::new(Box::new(mock_io)); + s.set_pipelining_enabled(true); + + // Consume request 1 fully. Body is zero-length so body_done + // is true; no overread is captured in body_buf_overread + // because req2's bytes were NOT in the same read as req1. + s.read_request().await.unwrap(); + assert_eq!(s.req_header().uri.path(), "/one"); + let _ = s.read_body_bytes().await.unwrap(); + assert!(s.is_body_done()); + assert!( + !s.body_reader.has_bytes_overread(), + "precondition: req2 must arrive in a separate read, not as overread on req1" + ); + + // This is the proxy's body-pump poll. Post-fix, the idle + // branch reads the byte, pushes it to the body reader's + // overread surface, and stays pending — signaling the + // body-pump `select!` loop that the downstream has no more + // body activity to wait on (the loop exits via its other + // branches when the upstream response completes). + // + // We assert the *causal* invariant, not a wall-clock one: + // poll the future repeatedly, yielding between polls to + // let the mock I/O stack drain, until either (a) it + // resolves (which is a failure — it MUST stay pending) or + // (b) we observe enough bookkeeping progress to know the + // idle read has completed. The proxy_tasks channel via + // `proxy_tasks_rx` isn't wired in this test, so "enough + // progress" is signaled by tracking `poll_count` alone; + // the actual overread presence is asserted after the + // future is dropped. + // + // Scope the future in an async block so its borrow on `s` + // ends when we exit the block — the body-reader check + // needs a fresh borrow. + { + let fut = s.read_body_or_idle(true); + tokio::pin!(fut); + // Drive the future forward a bounded number of times. + // Under the fix it will always stay Pending; a broken + // fix resolves Ready in the first few polls. + for _ in 0..10 { + match futures::poll!(fut.as_mut()) { + std::task::Poll::Pending => { + tokio::task::yield_now().await; + } + std::task::Poll::Ready(Err(e)) => panic!( + "read_body_or_idle(true) must not raise an error when \ + pipelining is enabled and the idle read returns > 0 bytes \ + (those bytes are the start of pipelined request 2, not \ + illegal trailing body). Got error: {e:?}" + ), + std::task::Poll::Ready(Ok(body)) => panic!( + "read_body_or_idle(true) must stay pending after stashing \ + pipelined bytes (the body-pump `select!` exits via its \ + other branches). Got body: {body:?}" + ), + } + } + // Future still pending — exit the scope, which drops + // `fut` and releases the mutable borrow on `s`. + } + + // The byte must be extractable as overread, so the + // standard reuse_pipelined() + HttpPersistentSettings pipeline can + // hand it to the next session. + let overread = s + .take_body_overread() + .expect("pipelined request 2 byte must be retrievable as overread"); + assert_eq!( + overread.as_ref(), + req2_first, + "stashed bytes must be the idle-read probe byte from request 2" + ); + } + + /// Symmetric to the test above: pipelining OFF means the idle + /// branch still raises `ConnectError` as it did pre-patch. This + /// preserves upstream behavior for non-adopters. + #[tokio::test] + async fn idle_read_still_raises_when_pipelining_disabled() { + init_log(); + let req1 = b"GET /one HTTP/1.1\r\nHost: pingora.org\r\nContent-Length: 0\r\n\r\n"; + // Single byte of req2 — idle-branch read uses a 1-byte probe + // buffer, error path fires, mock is fully drained. + let req2_first = b"G"; + + let mock_io = Builder::new().read(&req1[..]).read(&req2_first[..]).build(); + + let mut s = HttpSession::new(Box::new(mock_io)); + // Leave pipelining at the default (off). + s.read_request().await.unwrap(); + let _ = s.read_body_bytes().await.unwrap(); + assert!(s.is_body_done()); + + let err = s + .read_body_or_idle(true) + .await + .expect_err("pipelining off: idle read > 0 must raise ConnectError"); + assert_eq!( + *err.etype(), + pingora_error::ErrorType::ConnectError, + "non-adopter callers must still see ConnectError on surplus idle bytes" + ); + } + + /// End-to-end: session A finishes with overread, bytes are extracted, + /// session B consumes them via set_pipelined_prefix and parses the + /// pipelined request without reading from the (empty) stream. + #[tokio::test] + async fn pipelined_request_chain_end_to_end() { + init_log(); + + // Session A: read request 1 with pipelined request 2 bytes appended. + let req1 = b"GET /one HTTP/1.1\r\nHost: pingora.org\r\nContent-Length: 0\r\n\r\n"; + let req2 = b"GET /two HTTP/1.1\r\nHost: pingora.org\r\nContent-Length: 0\r\n\r\n"; + let mut combined = Vec::with_capacity(req1.len() + req2.len()); + combined.extend_from_slice(req1); + combined.extend_from_slice(req2); + + let mock_io_a = Builder::new().read(&combined).build(); + let mut a = HttpSession::new(Box::new(mock_io_a)); + a.set_pipelining_enabled(true); + a.read_request().await.unwrap(); + assert_eq!(a.req_header().uri.path(), "/one"); + // Poll the body to trigger init_content_length which captures + // the bytes past Content-Length: 0 as overread. + let _ = a.read_body_bytes().await.unwrap(); + assert!(a.body_reader.has_bytes_overread()); + + let overread = a.take_body_overread().expect("overread present"); + + // Session B: construct with an empty stream (pipelined prefix is + // everything we need), feed the overread, parse the next request. + let mock_io_b = Builder::new().build(); + let mut b = HttpSession::new(Box::new(mock_io_b)); + b.set_pipelining_enabled(true); + b.set_pipelined_prefix(overread); + b.read_request() + .await + .unwrap() + .expect("pipelined request must parse"); + assert_eq!(b.req_header().uri.path(), "/two"); + } +} diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index 4ce9e5e5..23a64e46 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -442,11 +442,11 @@ where } session .downstream_session - .finish() + .finish_reuse() .await .ok() .flatten() - .map(|s| ReusedHttpStream::new(s, Some(persistent_settings))) + .map(|s| ReusedHttpStream::from_reused_connection(s, persistent_settings)) } else { None } @@ -917,11 +917,11 @@ where } return session .downstream_session - .finish() + .finish_reuse() .await .ok() .flatten() - .map(|s| ReusedHttpStream::new(s, Some(persistent_settings))); + .map(|s| ReusedHttpStream::from_reused_connection(s, persistent_settings)); } /* else continue */ } @@ -1106,11 +1106,11 @@ where } session .downstream_session - .finish() + .finish_reuse() .await .ok() .flatten() - .map(|s| ReusedHttpStream::new(s, Some(persistent_settings))) + .map(|s| ReusedHttpStream::from_reused_connection(s, persistent_settings)) } else { None } From ae26ae0236a609368ff47c3067b357f9b774556f Mon Sep 17 00:00:00 2001 From: Cody Carlsen Date: Wed, 6 May 2026 12:56:30 -0700 Subject: [PATCH 2/2] Add pipelining proxy example Minimal ProxyHttp that opts in via set_pipelining_enabled(true) in early_request_filter. Matches the reproducer shape from #377: pipelined GETs on one connection now both return responses. --- pingora-proxy/examples/pipelining.rs | 70 ++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 pingora-proxy/examples/pipelining.rs diff --git a/pingora-proxy/examples/pipelining.rs b/pingora-proxy/examples/pipelining.rs new file mode 100644 index 00000000..1d596200 --- /dev/null +++ b/pingora-proxy/examples/pipelining.rs @@ -0,0 +1,70 @@ +// Copyright 2026 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; + +use pingora_core::server::configuration::Opt; +use pingora_core::server::Server; +use pingora_core::upstreams::peer::HttpPeer; +use pingora_core::Result; +use pingora_proxy::{ProxyHttp, Session}; + +pub struct PipelinedGateway; + +#[async_trait] +impl ProxyHttp for PipelinedGateway { + type CTX = (); + fn new_ctx(&self) -> Self::CTX {} + + async fn early_request_filter( + &self, + session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result<()> { + // Opt in once per session; persists across keep-alive reuses. + session.set_pipelining_enabled(true); + Ok(()) + } + + async fn upstream_peer( + &self, + _session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result> { + let peer = HttpPeer::new(("httpbin.org", 80), false, "httpbin.org".into()); + Ok(Box::new(peer)) + } +} + +// RUST_LOG=INFO cargo run --example pipelining +// +// Two pipelined GETs on one connection (expect two `HTTP/1.1 200` lines): +// printf 'GET /get HTTP/1.1\r\nHost: httpbin.org\r\n\r\nGET /get HTTP/1.1\r\nHost: httpbin.org\r\n\r\n' \ +// | ncat --no-shutdown localhost 6191 \ +// | grep -oE 'HTTP/1.1 [0-9]{3}' + +fn main() { + env_logger::init(); + + let opt = Opt::parse_args(); + let mut my_server = Server::new(Some(opt)).unwrap(); + my_server.bootstrap(); + + let mut my_proxy = + pingora_proxy::http_proxy_service(&my_server.configuration, PipelinedGateway); + my_proxy.add_tcp("0.0.0.0:6191"); + my_server.add_service(my_proxy); + + my_server.run_forever(); +}