Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pingora-core/src/apps/http_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ where
}
}
let persistent_settings = HttpPersistentSettings::for_session(&http);
match http.finish().await {
Ok(c) => c.map(|s| ReusedHttpStream::new(s, Some(persistent_settings))),
match http.finish_reuse().await {
Ok(c) => c.map(|s| ReusedHttpStream::from_reused_connection(s, persistent_settings)),
Err(e) => {
error!("HTTP server fails to finish the request: {e}");
None
Expand Down Expand Up @@ -206,8 +206,8 @@ where
),
}
let persistent_settings = HttpPersistentSettings::for_session(&http);
match http.finish().await {
Ok(c) => c.map(|s| ReusedHttpStream::new(s, Some(persistent_settings))),
match http.finish_reuse().await {
Ok(c) => c.map(|s| ReusedHttpStream::from_reused_connection(s, persistent_settings)),
Err(e) => {
error!("HTTP server fails to finish the request: {e}");
None
Expand Down
59 changes: 55 additions & 4 deletions pingora-core/src/apps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ pub mod http_app;

use crate::server::ShutdownWatch;
use async_trait::async_trait;
use bytes::BytesMut;
use log::{debug, error};
use std::any::Any;
use std::future::poll_fn;
use std::sync::Arc;

use crate::protocols::http::v2::server;
use crate::protocols::http::ServerSession;
use crate::protocols::http::{ReusedHttpConnection, ServerSession};
use crate::protocols::Digest;
use crate::protocols::Stream;
use crate::protocols::ALPN;
Expand Down Expand Up @@ -90,12 +91,26 @@ pub struct HttpServerOptions {
/// user-defined context via [`set_user_context`](Self::set_user_context). The proxy layer
/// populates this through `ProxyHttp::persist_connection_context`
/// and delivers it to the next request through `ProxyHttp::on_connection_reuse`.
///
/// Also carries pipelined-prefix bytes when the caller has opted into HTTP/1.1
/// pipelining on the previous session. See
/// [`Self::set_pipelined_prefix`] and the
/// [`HttpSession::set_pipelining_enabled`](crate::protocols::http::v1::server::HttpSession::set_pipelining_enabled)
/// docs for the RFC 9112 §9.3.2 semantics.
#[derive(Debug)]
pub struct HttpPersistentSettings {
keepalive_timeout: Option<u64>,
keepalive_reuses_remaining: Option<u32>,
/// User-defined context to carry to the next request on this connection.
user_context: Option<Box<dyn Any + Send + Sync>>,
/// Bytes read past the end of the previous request's body, to be parsed
/// as the next pipelined request on the reused connection.
pipelined_prefix: Option<BytesMut>,
/// Whether HTTP/1.1 pipelining was enabled on the previous session;
/// propagates to the next session so the proxy-level opt-in sticks
/// across keepalive reuses without the adopter having to re-enable
/// it on every request.
pipelining_enabled: bool,
}

impl HttpPersistentSettings {
Expand All @@ -104,6 +119,8 @@ impl HttpPersistentSettings {
keepalive_timeout: session.get_keepalive(),
keepalive_reuses_remaining: session.get_keepalive_reuses_remaining(),
user_context: None,
pipelined_prefix: None,
pipelining_enabled: session.pipelining_enabled(),
}
}

Expand All @@ -117,11 +134,21 @@ impl HttpPersistentSettings {
self.user_context.take()
}

/// Set pipelined-prefix bytes to be fed to the next session on this
/// connection. Called by the proxy layer when HTTP/1.1 pipelining is
/// enabled on the current session and overread bytes were present at
/// reuse time.
pub fn set_pipelined_prefix(&mut self, prefix: BytesMut) {
self.pipelined_prefix = Some(prefix);
}

pub fn apply_to_session(self, session: &mut ServerSession) {
let Self {
keepalive_timeout,
mut keepalive_reuses_remaining,
user_context,
pipelined_prefix,
pipelining_enabled,
} = self;

// Reduce the number of times the connection for this session can be
Expand All @@ -135,6 +162,15 @@ impl HttpPersistentSettings {

// Carry user context into the session for the proxy layer to consume
session.set_connection_user_context(user_context);

// Replay pipelining opt-in so it stays on across keepalive reuses.
session.set_pipelining_enabled(pipelining_enabled);

// Feed any pipelined prefix bytes to the new session's request parser
// so they are treated as the start of the next request.
if let Some(prefix) = pipelined_prefix {
session.set_pipelined_prefix(prefix);
}
}
}

Expand All @@ -152,6 +188,19 @@ impl ReusedHttpStream {
}
}

/// Build a reusable HTTP stream from a finished session, preserving any
/// pipelined prefix bytes in the persistent settings for the next request.
pub fn from_reused_connection(
reused: ReusedHttpConnection,
mut persistent_settings: HttpPersistentSettings,
) -> Self {
let (stream, pipelined_prefix) = reused.into_parts();
if let Some(prefix) = pipelined_prefix {
persistent_settings.set_pipelined_prefix(prefix);
}
Self::new(stream, Some(persistent_settings))
}

pub fn consume(self) -> (Stream, Option<HttpPersistentSettings>) {
(self.stream, self.persistent_settings)
}
Expand All @@ -162,9 +211,11 @@ impl ReusedHttpStream {
pub trait HttpServerApp {
/// Similar to the [`ServerApp`], this function is called whenever a new HTTP session is established.
///
/// After successful processing, [`ServerSession::finish()`] can be called to return an optionally reusable
/// connection back to the service. The caller needs to make sure that the connection is in a reusable state
/// i.e., no error or incomplete read or write headers or bodies. Otherwise a `None` should be returned.
/// After successful processing, [`ServerSession::finish_reuse()`] can be
/// called to return an optionally reusable connection back to the service.
/// The caller needs to make sure that the connection is in a reusable state
/// i.e., no error or incomplete read or write headers or bodies. Otherwise
/// a `None` should be returned.
async fn process_new_http(
self: &Arc<Self>,
mut session: ServerSession,
Expand Down
2 changes: 1 addition & 1 deletion pingora-core/src/protocols/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub mod subrequest;
pub mod v1;
pub mod v2;

pub use server::Session as ServerSession;
pub use server::{ReusedHttpConnection, Session as ServerSession};

/// The Pingora server name string
pub const SERVER_NAME: &[u8; 7] = b"Pingora";
Expand Down
85 changes: 80 additions & 5 deletions pingora-core/src/protocols/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,36 @@ use super::v2::server::HttpSession as SessionV2;
use super::HttpTask;
use crate::custom_session;
use crate::protocols::{Digest, SocketAddr, Stream};
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use http::HeaderValue;
use http::{header::AsHeaderName, HeaderMap};
use pingora_error::{Error, Result};
use pingora_http::{RequestHeader, ResponseHeader};
use std::any::Any;
use std::time::Duration;

/// A reusable HTTP/1.x connection and bytes already read for the next request.
#[derive(Debug)]
pub struct ReusedHttpConnection {
stream: Stream,
pipelined_prefix: Option<BytesMut>,
}

impl ReusedHttpConnection {
pub(crate) fn new(stream: Stream, pipelined_prefix: Option<BytesMut>) -> Self {
Self {
stream,
pipelined_prefix,
}
}

/// Split the reusable connection into its underlying stream and optional
/// bytes already read for the next pipelined request.
pub fn into_parts(self) -> (Stream, Option<BytesMut>) {
(self.stream, self.pipelined_prefix)
}
}

/// HTTP server session object for both HTTP/1.x and HTTP/2
pub enum Session {
H1(SessionV1),
Expand Down Expand Up @@ -227,16 +249,35 @@ impl Session {
}
}

/// Finish the life of this request.
/// For H1, if connection reuse is supported, a Some(Stream) will be returned, otherwise None.
/// Finish the life of this request and return a reusable stream, if any.
///
/// This is the compatibility path: it preserves the pre-pipelining
/// stream-only signature and discards any captured pipelined prefix bytes.
/// Callers that enable HTTP/1.1 pipelining with
/// [`Self::set_pipelining_enabled`] should use [`Self::finish_reuse`]
/// instead so the prefix is carried to the next request on the connection.
pub async fn finish(self) -> Result<Option<Stream>> {
self.finish_reuse().await.map(|opt| {
opt.map(|reused| {
let (stream, _pipelined_prefix) = reused.into_parts();
stream
})
})
}

/// Finish the life of this request and return pipelining-aware reuse
/// metadata, if the connection can be reused.
///
/// For H1, if connection reuse is supported, a reusable connection will be returned,
/// otherwise None.
/// For H2, always return None because H2 stream is not reusable.
/// For subrequests, there is no true underlying stream to return.
pub async fn finish(self) -> Result<Option<Stream>> {
pub async fn finish_reuse(self) -> Result<Option<ReusedHttpConnection>> {
match self {
Self::H1(mut s) => {
// need to flush body due to buffering
s.finish_body().await?;
s.reuse().await
s.reuse_pipelined().await
}
Self::H2(mut s) => {
s.finish()?;
Expand Down Expand Up @@ -830,6 +871,40 @@ impl Session {
}
}

/// Whether HTTP/1.1 request pipelining is enabled for this session.
///
/// Always false for H2 / Subrequest / Custom (pipelining is an H/1.1-only
/// concept). For H1, see
/// [`HttpSession::set_pipelining_enabled`](crate::protocols::http::v1::server::HttpSession::set_pipelining_enabled).
pub fn pipelining_enabled(&self) -> bool {
match self {
Self::H1(s) => s.pipelining_enabled(),
_ => false,
}
}

/// Enable or disable HTTP/1.1 request pipelining on this session.
///
/// No-op for H2 / Subrequest / Custom. See
/// [`HttpSession::set_pipelining_enabled`](crate::protocols::http::v1::server::HttpSession::set_pipelining_enabled)
/// for semantics.
pub fn set_pipelining_enabled(&mut self, enabled: bool) {
if let Self::H1(s) = self {
s.set_pipelining_enabled(enabled);
}
}

/// Set pipelined bytes to be parsed as the start of this session's request.
///
/// No-op for non-H1 sessions. See
/// [`HttpSession::set_pipelined_prefix`](crate::protocols::http::v1::server::HttpSession::set_pipelined_prefix)
/// for the lifecycle.
pub fn set_pipelined_prefix(&mut self, prefix: BytesMut) {
if let Self::H1(s) = self {
s.set_pipelined_prefix(prefix);
}
}

/// Queue a downstream proxy task for cancel-safe writing.
///
/// # Panics
Expand Down
41 changes: 41 additions & 0 deletions pingora-core/src/protocols/http/v1/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,47 @@ impl BodyReader {
self.get_body_overread().is_some_and(|b| !b.is_empty())
}

/// Take ownership of the overread bytes, leaving `None` in their place.
///
/// Overread bytes are bytes that were read from the stream but are beyond the
/// end of the current request's body — i.e. they belong to a pipelined next
/// request on the same connection. Callers that support HTTP/1.1 pipelining
/// extract these bytes here and feed them to the next session's request
/// parser via [`HttpSession::set_pipelined_prefix`](super::server::HttpSession::set_pipelined_prefix).
pub fn take_body_overread(&mut self) -> Option<BytesMut> {
self.body_buf_overread.take()
}

/// Append bytes to the overread buffer from outside the body-parsing path.
///
/// The body reader's overread buffer is normally populated by
/// [`Self::init_content_length`] (for zero-length bodies) and
/// [`Self::finish_body_buf`] (for sized bodies) when the stream read
/// that completed the current request also pulled in bytes from the
/// next pipelined request. That path covers the "both requests in one
/// read" shape.
///
/// The "second request arrives in a separate read after the first
/// request's body is already done" shape is different: the body
/// reader never sees those bytes — they land on the idle-branch read
/// in [`super::server::HttpSession::read_body_or_idle`]. When
/// pipelining is enabled, that caller stashes the idle read here so
/// a single downstream overread surface covers both shapes and
/// [`Self::take_body_overread`] returns them uniformly.
pub fn push_body_overread(&mut self, bytes: &[u8]) {
if bytes.is_empty() {
return;
}
match self.body_buf_overread.as_mut() {
Some(buf) => buf.extend_from_slice(bytes),
None => {
let mut buf = BytesMut::with_capacity(bytes.len());
buf.extend_from_slice(bytes);
self.body_buf_overread = Some(buf);
}
}
}

pub fn body_done(&self) -> bool {
matches!(self.body_state, PS::Complete(_) | PS::Done(_))
}
Expand Down
Loading
Loading