Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
e2089546a5962c0f65c081211d604dadd9330195
3a69c94066ae8bab489c56856c576bacfd341d6b
9 changes: 6 additions & 3 deletions pingora-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,18 @@ windows-sys = { version = "0.59.0", features = ["Win32_Networking_WinSock"] }
h2 = { workspace = true, features = ["unstable"] }
tokio-stream = { version = "0.1", features = ["full"] }
env_logger = "0.11"
reqwest = { version = "0.11", features = [
reqwest = { version = "0.12", features = [
"rustls-tls",
"http2",
], default-features = false }
hyper = "0.14"
hyper = { version = "1", features = ["client", "http1", "http2"] }
hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2"] }
http-body-util = "0.1"
rstest = "0.23.0"
rustls = "0.23"

[target.'cfg(unix)'.dev-dependencies]
hyperlocal = "0.8"
hyperlocal = "0.9"
jemallocator = "0.5"

[features]
Expand Down
42 changes: 37 additions & 5 deletions pingora-core/src/connectors/http/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use bytes::Bytes;
use h2::client::SendRequest;
use log::debug;
use parking_lot::{Mutex, RwLock};
use pingora_error::{Error, ErrorType::*, OrErr, Result};
use pingora_error::{Error, ErrorType::*, OkOrErr, OrErr, Result};
use pingora_pool::{ConnectionMeta, ConnectionPool, PoolNode};
use std::collections::HashMap;
use std::io::ErrorKind;
Expand Down Expand Up @@ -350,10 +350,10 @@ impl Connector {
settings.stream_window_size = peer_options.and_then(|o| o.h2_stream_window_size);
settings.connection_window_size = peer_options.and_then(|o| o.h2_connection_window_size);
let conn = handshake(stream, settings).await?;
let h2_stream = conn
.spawn_stream()
.await?
.expect("newly created connections should have at least one free stream");
let h2_stream = conn.spawn_stream().await?.or_err(
H2Error,
"newly created connection has no free streams (server may have sent GOAWAY)",
)?;
if conn.more_streams_allowed() {
self.in_use_pool.insert(peer.reuse_hash(), conn);
}
Expand Down Expand Up @@ -997,4 +997,36 @@ mod tests {
stream.read_response_header().await.unwrap();
assert_eq!(stream.response_header().unwrap().status, 200);
}

/// `spawn_stream()` must return `Ok(None)` when the server sends
/// GOAWAY(NO_ERROR) before any streams are opened.
#[tokio::test]
async fn test_spawn_stream_goaway_no_error_returns_none() {
let (client_io, server_io) = tokio::io::duplex(65536);
let (send_req, connection) = h2::client::handshake(client_io).await.unwrap();
let (closed_tx, closed_rx) = watch::channel(false);
let ping_timeout = Arc::new(AtomicBool::new(false));
let conn = ConnectionRef::new(send_req, closed_rx, ping_timeout, 0, 10, Digest::default());

let conn_handle = tokio::spawn(async move {
let _ = connection.await;
let _ = closed_tx.send(true);
});

let mut server_conn = h2::server::handshake(server_io).await.unwrap();
server_conn.graceful_shutdown();
let _ = server_conn.accept().await;
drop(server_conn);

conn_handle.await.unwrap();

let result = conn.spawn_stream().await;
assert!(
result.is_ok(),
"expected Ok(None), got Err: {:?}",
result.as_ref().err()
);
assert!(result.unwrap().is_none());
assert!(conn.is_shutting_down());
}
}
2 changes: 1 addition & 1 deletion pingora-core/src/connectors/l4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ mod tests {
let move_flag = Arc::clone(&flag);

peer.options.upstream_tcp_sock_tweak_hook = Some(Arc::new(move |_| {
move_flag.fetch_xor(true, Ordering::SeqCst);
move_flag.fetch_not(Ordering::SeqCst);
Ok(())
}));

Expand Down
3 changes: 3 additions & 0 deletions pingora-core/src/connectors/tls/rustls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ impl TlsConnector {
where
Self: Sized,
{
// rustls 0.23+ requires an explicit CryptoProvider.
pingora_rustls::install_default_crypto_provider();

// NOTE: Rustls only supports TLS 1.2 & 1.3

// TODO: currently using Rustls defaults
Expand Down
3 changes: 3 additions & 0 deletions pingora-core/src/listeners/tls/rustls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ impl TlsSettings {
///
/// Todo: Return a result instead of panicking XD
pub fn build(self) -> Acceptor {
// rustls 0.23+ requires an explicit CryptoProvider.
pingora_rustls::install_default_crypto_provider();

let Ok(Some((certs, key))) = load_certs_and_key_files(&self.cert_path, &self.key_path)
else {
panic!(
Expand Down
71 changes: 68 additions & 3 deletions pingora-core/src/protocols/http/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,18 @@ impl ResponseCompressionCtx {
Action::Compress(algorithm) => {
let idx = algorithm.index();
let compressor = match algorithm {
Algorithm::Dcz => dictionary.as_ref().and_then(|d| {
algorithm.maybe_compressor_with_dictionary(levels[idx], d)
}),
Algorithm::Dcz => {
// RFC 9842: dictionary-compressed responses vary on
// Available-Dictionary so caches don't serve this variant
// to clients with a different or missing dictionary.
let enc = dictionary.as_ref().and_then(|d| {
algorithm.maybe_compressor_with_dictionary(levels[idx], d)
});
if enc.is_some() {
add_vary_header(resp, &AVAILABLE_DICTIONARY);
}
enc
}
_ => algorithm.compressor(levels[idx]),
};
(compressor, preserve_etag[idx])
Expand Down Expand Up @@ -780,6 +789,13 @@ fn compressible(resp: &ResponseHeader) -> bool {
}
}

/// Header name for the Available-Dictionary request header ([RFC 9842]).
/// TODO: Replace with http::header when available.
///
/// [RFC 9842]: https://datatracker.ietf.org/doc/html/rfc9842
static AVAILABLE_DICTIONARY: http::HeaderName =
http::HeaderName::from_static("available-dictionary");

// add Vary header with the specified value or extend an existing Vary header value
fn add_vary_header(resp: &mut ResponseHeader, value: &http::header::HeaderName) {
use http::header::{HeaderValue, VARY};
Expand Down Expand Up @@ -1055,6 +1071,11 @@ mod tests_dictionary_compression {
resp.headers.get("content-encoding").unwrap().as_bytes(),
b"dcz"
);
// RFC 9842: DCZ responses must vary on Available-Dictionary.
assert!(resp.headers.get_all("vary").iter().any(|v| v
.as_bytes()
.split(|b| *b == b',')
.any(|t| t.trim_ascii().eq_ignore_ascii_case(b"available-dictionary"))));

let input = Bytes::from_static(b"The quick brown fox jumps over the lazy dog again.");
let compressed = ctx.response_body_filter(Some(&input), true).unwrap();
Expand All @@ -1080,6 +1101,11 @@ mod tests_dictionary_compression {

// no dictionary set, no compression applied
assert!(resp.headers.get("content-encoding").is_none());
// No compression → no Vary: available-dictionary.
assert!(!resp.headers.get_all("vary").iter().any(|v| v
.as_bytes()
.split(|b| *b == b',')
.any(|t| t.trim_ascii().eq_ignore_ascii_case(b"available-dictionary"))));
}

#[test]
Expand All @@ -1099,6 +1125,11 @@ mod tests_dictionary_compression {

// dcz first but no dictionary, no automatic fallback
assert!(resp.headers.get("content-encoding").is_none());
// No compression → no Vary: available-dictionary.
assert!(!resp.headers.get_all("vary").iter().any(|v| v
.as_bytes()
.split(|b| *b == b',')
.any(|t| t.trim_ascii().eq_ignore_ascii_case(b"available-dictionary"))));
}

#[test]
Expand Down Expand Up @@ -1152,6 +1183,11 @@ mod tests_dictionary_compression {
resp.headers.get("transfer-encoding").unwrap().as_bytes(),
b"chunked"
);
// RFC 9842: DCZ responses must vary on Available-Dictionary.
assert!(resp.headers.get_all("vary").iter().any(|v| v
.as_bytes()
.split(|b| *b == b',')
.any(|t| t.trim_ascii().eq_ignore_ascii_case(b"available-dictionary"))));

let chunk1 = Bytes::from_static(b"First chunk. ");
let output1 = ctx.response_body_filter(Some(&chunk1), false);
Expand All @@ -1166,4 +1202,33 @@ mod tests_dictionary_compression {
assert_eq!(total_in, chunk1.len() + chunk2.len());
assert!(total_out > 0);
}

#[test]
fn regular_compression_no_available_dictionary_vary() {
// Gzip compression should produce Vary: Accept-Encoding but NOT
// Vary: available-dictionary.
let mut ctx = ResponseCompressionCtx::new(3, false, false);

let mut req = RequestHeader::build("GET", b"/page.html", None).unwrap();
req.insert_header("accept-encoding", "gzip").unwrap();
ctx.request_filter(&req);

let mut resp = ResponseHeader::build(200, None).unwrap();
resp.insert_header("content-type", "text/html").unwrap();
resp.insert_header("content-length", "1000").unwrap();
ctx.response_header_filter(&mut resp, false);

assert_eq!(
resp.headers.get("content-encoding").unwrap().as_bytes(),
b"gzip"
);
assert!(resp.headers.get_all("vary").iter().any(|v| v
.as_bytes()
.split(|b| *b == b',')
.any(|t| t.trim_ascii().eq_ignore_ascii_case(b"accept-encoding"))));
assert!(!resp.headers.get_all("vary").iter().any(|v| v
.as_bytes()
.split(|b| *b == b',')
.any(|t| t.trim_ascii().eq_ignore_ascii_case(b"available-dictionary"))));
}
}
Loading
Loading