Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions pingora-core/src/listeners/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,50 @@ pub trait TlsAccept {

pub type TlsAcceptCallbacks = Box<dyn TlsAccept + Send + Sync>;

/// Callback for processing raw bytes before TLS handshake.
///
/// This trait allows applications to read and process data from the raw TCP stream
/// before the TLS handshake occurs. This is useful for protocols like HAProxy's
/// PROXY protocol, which sends client address information before TLS.
///
/// # Example
///
/// ```rust,ignore
/// use pingora_core::listeners::PreTlsProcess;
/// use pingora_core::protocols::l4::stream::Stream as L4Stream;
/// use async_trait::async_trait;
///
/// struct ProxyProtocolHandler;
///
/// #[async_trait]
/// impl PreTlsProcess for ProxyProtocolHandler {
/// async fn process(&self, stream: &mut L4Stream) -> pingora_error::Result<()> {
/// // Read PROXY protocol header, update socket digest, etc.
/// Ok(())
/// }
/// }
/// ```
#[async_trait]
pub trait PreTlsProcess: Send + Sync {
/// Process the raw stream before TLS handshake.
///
/// The implementation can read bytes from the stream (e.g., PROXY protocol header)
/// and update the stream's socket digest with parsed information such as the
/// real client address.
///
/// If this method returns an error, the connection will be dropped.
async fn process(&self, stream: &mut L4Stream) -> Result<()>;
}

/// Type alias for a boxed pre-TLS processor.
pub type PreTlsCallback = Arc<dyn PreTlsProcess>;

struct TransportStackBuilder {
l4: ServerAddress,
tls: Option<TlsSettings>,
#[cfg(feature = "connection_filter")]
connection_filter: Option<Arc<dyn ConnectionFilter>>,
pre_tls_callback: Option<PreTlsCallback>,
}

impl TransportStackBuilder {
Expand All @@ -148,6 +187,7 @@ impl TransportStackBuilder {
Ok(TransportStack {
l4,
tls: self.tls.take().map(|tls| Arc::new(tls.build())),
pre_tls_callback: self.pre_tls_callback.clone(),
})
}
}
Expand All @@ -156,6 +196,7 @@ impl TransportStackBuilder {
pub(crate) struct TransportStack {
l4: ListenerEndpoint,
tls: Option<Arc<Acceptor>>,
pre_tls_callback: Option<PreTlsCallback>,
}

impl TransportStack {
Expand All @@ -168,6 +209,7 @@ impl TransportStack {
Ok(UninitializedStream {
l4: stream,
tls: self.tls.clone(),
pre_tls_callback: self.pre_tls_callback.clone(),
})
}

Expand All @@ -179,11 +221,18 @@ impl TransportStack {
pub(crate) struct UninitializedStream {
l4: L4Stream,
tls: Option<Arc<Acceptor>>,
pre_tls_callback: Option<PreTlsCallback>,
}

impl UninitializedStream {
pub async fn handshake(mut self) -> Result<Stream> {
self.l4.set_buffer();

// Process pre-TLS data if a callback is configured (e.g., PROXY protocol)
if let Some(ref callback) = self.pre_tls_callback {
callback.process(&mut self.l4).await?;
}

if let Some(tls) = self.tls {
let tls_stream = tls.tls_handshake(self.l4).await?;
Ok(Box::new(tls_stream))
Expand All @@ -205,6 +254,7 @@ pub struct Listeners {
stacks: Vec<TransportStackBuilder>,
#[cfg(feature = "connection_filter")]
connection_filter: Option<Arc<dyn ConnectionFilter>>,
pre_tls_callback: Option<PreTlsCallback>,
}

impl Listeners {
Expand All @@ -214,6 +264,7 @@ impl Listeners {
stacks: vec![],
#[cfg(feature = "connection_filter")]
connection_filter: None,
pre_tls_callback: None,
}
}
/// Create a new [`Listeners`] with a TCP server endpoint from the given string.
Expand Down Expand Up @@ -294,13 +345,43 @@ impl Listeners {
}
}

/// Set a pre-TLS callback for all endpoints in this listener collection.
///
/// The callback will be invoked after TCP accept but before the TLS handshake,
/// allowing the application to read and process data such as PROXY protocol
/// headers that arrive before TLS.
///
/// # Example
///
/// ```rust,ignore
/// use pingora_core::listeners::{Listeners, PreTlsProcess};
/// use std::sync::Arc;
///
/// let callback = Arc::new(MyProxyProtocolHandler::new());
/// let mut listeners = Listeners::new();
/// listeners.set_pre_tls_callback(callback);
/// listeners.add_tls("0.0.0.0:443", "cert.pem", "key.pem")?;
/// ```
pub fn set_pre_tls_callback(&mut self, callback: PreTlsCallback) {
log::debug!("Setting pre-TLS callback on Listeners");

// Store the callback for future endpoints
self.pre_tls_callback = Some(callback.clone());

// Apply to existing stacks
for stack in &mut self.stacks {
stack.pre_tls_callback = Some(callback.clone());
}
}

/// Add the given [`ServerAddress`] to `self` with the given [`TlsSettings`] if provided
pub fn add_endpoint(&mut self, l4: ServerAddress, tls: Option<TlsSettings>) {
self.stacks.push(TransportStackBuilder {
l4,
tls,
#[cfg(feature = "connection_filter")]
connection_filter: self.connection_filter.clone(),
pre_tls_callback: self.pre_tls_callback.clone(),
})
}

Expand Down
13 changes: 13 additions & 0 deletions pingora-core/src/protocols/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,19 @@ impl Session {
}
}

/// Give up the H2 stream with a custom reason.
///
/// For H2, this sends a `RST_STREAM` frame with the specified reason.
/// For H1, subrequests, and custom sessions, this is a no-op since they don't support
/// stream reset reasons.
///
/// See [`super::v2::server::HttpSession::shutdown_with_reason`] for available reasons.
pub fn shutdown_with_reason(&mut self, reason: h2::Reason) {
if let Self::H2(s) = self {
s.shutdown_with_reason(reason);
}
}

pub fn to_h1_raw(&self) -> Bytes {
match self {
Self::H1(s) => s.get_headers_raw_bytes(),
Expand Down
17 changes: 15 additions & 2 deletions pingora-core/src/protocols/http/v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,23 @@ impl HttpSession {

/// Give up the stream abruptly.
///
/// This will send a `INTERNAL_ERROR` stream error to the client
/// This will send an `INTERNAL_ERROR` stream error to the client.
pub fn shutdown(&mut self) {
self.shutdown_with_reason(h2::Reason::INTERNAL_ERROR);
}

/// Give up the stream abruptly with a custom reason.
///
/// This will send a `RST_STREAM` frame with the given reason to the client.
///
/// Useful reasons include:
/// - [`h2::Reason::HTTP_1_1_REQUIRED`] - Signal to the client that HTTP/1.1 should be used
/// instead. Per RFC 7540 §9.1.2, clients should retry the request over HTTP/1.1.
/// - [`h2::Reason::CANCEL`] - Indicate the stream is no longer needed.
/// - [`h2::Reason::REFUSED_STREAM`] - Indicate the stream was refused before processing.
pub fn shutdown_with_reason(&mut self, reason: h2::Reason) {
if !self.ended {
self.send_response.send_reset(h2::Reason::INTERNAL_ERROR);
self.send_response.send_reset(reason);
}
}

Expand Down
7 changes: 5 additions & 2 deletions pingora-core/src/protocols/l4/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,11 @@ impl Stream {
Ok(())
}

/// Put Some data back to the head of the stream to be read again
pub(crate) fn rewind(&mut self, data: &[u8]) {
/// Put some data back to the head of the stream to be read again.
///
/// This is useful when you've read data to detect a protocol (e.g., PROXY protocol)
/// but the data wasn't what you expected, so you need to "unread" it.
pub fn rewind(&mut self, data: &[u8]) {
if !data.is_empty() {
self.rewind_read_buf.push(data.to_vec());
}
Expand Down