Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 96 additions & 8 deletions pingora-proxy/src/proxy_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,23 @@ where
// cache miss
if session.cache.is_cache_locked() {
// Another request is filling the cache; try waiting til that's done and retry.
let lock_status = session.cache.cache_lock_wait().await;
if self.handle_lock_status(session, ctx, lock_status) {
continue;
// Race the cache lock wait against client disconnect detection
let lock_status =
Self::cache_lock_wait_or_disconnect(session, ctx).await;
if let Some(status) = lock_status {
if self.handle_lock_status(session, ctx, status) {
continue;
} else {
break None;
}
} else {
break None;
// Client disconnected while waiting for cache lock
debug!(
"Client disconnected during cache lock wait, {}",
self.inner.request_summary(session, ctx)
);
// Return with downstream not reusable
break Some((false, None));
}
} else {
self.inner.cache_miss(session, ctx);
Expand Down Expand Up @@ -194,11 +206,22 @@ where
let will_serve_stale = session.cache.can_serve_stale_updating()
&& self.inner.should_serve_stale(session, ctx, None);
if !will_serve_stale {
let lock_status = session.cache.cache_lock_wait().await;
if self.handle_lock_status(session, ctx, lock_status) {
continue;
// Race the cache lock wait against client disconnect detection
let lock_status =
Self::cache_lock_wait_or_disconnect(session, ctx).await;
if let Some(status) = lock_status {
if self.handle_lock_status(session, ctx, status) {
continue;
} else {
break None;
}
} else {
break None;
// Client disconnected while waiting for cache lock
debug!(
"Client disconnected during cache lock wait (stale), {}",
self.inner.request_summary(session, ctx)
);
break Some((false, None));
}
}
// else continue to serve stale
Expand Down Expand Up @@ -944,6 +967,71 @@ where
LockStatus::Waiting => panic!("impossible LockStatus::Waiting"),
}
}

/// Wait for a cache lock while also detecting client disconnection.
///
/// This function races the cache lock wait against monitoring for client disconnect.
/// If the client disconnects while waiting for the cache lock, this function returns
/// `None` to signal that the request should be aborted.
///
/// Returns:
/// - `Some(LockStatus)` if the cache lock completes (writer released the lock)
/// - `None` if the client disconnected while waiting
async fn cache_lock_wait_or_disconnect(
session: &mut Session,
_ctx: &SV::CTX,
) -> Option<LockStatus>
where
SV: ProxyHttp + Send + Sync + 'static,
SV::CTX: Send + Sync,
{
// Check if we should try to detect client disconnect.
// For subrequests (background cache fills), there's no real downstream client,
// so we skip disconnect detection.
if session.subrequest_ctx.is_some() {
// This is a subrequest - no downstream client to monitor
return Some(session.cache.cache_lock_wait().await);
}

// Determine if we can safely wait for idle (client disconnect).
// We can only call read_body_or_idle(true) if:
// 1. The request body is done/empty (common for GET requests)
// 2. We haven't started streaming the body yet
let body_done = session.as_mut().is_body_done();

if !body_done {
// Request body is still being sent - we can't safely monitor for disconnect
// without potentially interfering with body reading.
// Fall back to the regular cache lock wait.
return Some(session.cache.cache_lock_wait().await);
}

// Race cache lock wait against client disconnect detection.
tokio::select! {
biased;

// Prefer completing the cache lock wait if both are ready
lock_status = session.cache.cache_lock_wait() => {
Some(lock_status)
}

// Monitor for client disconnect
disconnect_result = session.downstream_session.read_body_or_idle(true) => {
// read_body_or_idle(true) returns an error when client disconnects
match disconnect_result {
Ok(_) => {
// This shouldn't happen with true flag when body is done,
// but treat it as disconnect
None
}
Err(_) => {
// Client disconnected
None
}
}
}
}
}
}

fn cache_hit_header(cache: &HttpCache) -> Box<ResponseHeader> {
Expand Down
262 changes: 262 additions & 0 deletions pingora-proxy/tests/test_upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3118,4 +3118,266 @@ mod test_cache {
assert_eq!(headers["x-cache-status"], "hit");
assert_eq!(res.text().await.unwrap(), "hello world");
}

/// Test that cache lock readers exit quickly when their client disconnects.
///
/// This test demonstrates a bug where readers waiting on a cache lock continue
/// waiting for the full lock timeout even when their downstream client has disconnected.
///
/// The scenario:
/// 1. Writer request holds the cache lock while fetching from a slow upstream (5s delay)
/// 2. Multiple reader requests start waiting on the cache lock
/// 3. Reader clients disconnect after a short time (100ms)
/// 4. BUG: Readers continue waiting for the full lock timeout (2s) instead of exiting
///
/// Expected behavior (after fix): Readers should detect client disconnect and exit quickly
/// Current behavior (bug): Readers wait for full lock timeout regardless of client state
#[tokio::test]
async fn test_cache_lock_reader_client_disconnect() {
use std::time::Instant;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio::time::sleep;

init();

// Use a unique URL to avoid interference from other tests
let url_path = "/sleep/test_cache_lock_reader_client_disconnect.txt";
let proxy_addr = "127.0.0.1:6148";
let full_url = format!("http://{}{}", proxy_addr, url_path);

let test_start = Instant::now();

// Start the writer request - this will hold the cache lock while waiting for upstream
// The upstream will take 5 seconds to respond, which is longer than the 2s lock timeout
let writer_handle = tokio::spawn({
let full_url = full_url.clone();
async move {
let res = reqwest::Client::new()
.get(&full_url)
.header("x-lock", "true")
.header("x-set-sleep", "5") // 5 second upstream delay
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
res.text().await.unwrap()
}
});

// Give the writer time to acquire the lock
sleep(Duration::from_millis(100)).await;

// Start multiple reader requests that will wait on the cache lock
// Each reader will have their TCP connection dropped after 100ms
let reader_count = 3;
let mut reader_handles = Vec::new();

for i in 0..reader_count {
let url_path = url_path.to_string();
let proxy_addr = proxy_addr.to_string();

reader_handles.push(tokio::spawn(async move {
let reader_start = Instant::now();

// Connect with raw TCP so we can drop the connection
let mut stream = TcpStream::connect(&proxy_addr).await.unwrap();

// Send HTTP/1.1 request
let request = format!(
"GET {} HTTP/1.1\r\n\
Host: 127.0.0.1:6148\r\n\
x-lock: true\r\n\
x-set-sleep: 0\r\n\
Connection: close\r\n\
\r\n",
url_path
);
stream.write_all(request.as_bytes()).await.unwrap();

// Wait a short time (reader should be waiting on cache lock by now)
sleep(Duration::from_millis(100)).await;

// Drop the connection - simulating client disconnect
drop(stream);

let elapsed = reader_start.elapsed();
println!("Reader {} disconnected after {:?}", i, elapsed);
elapsed
}));
}

// Wait for all reader handles to complete
// Note: The readers complete when we drop their connection (from client side),
// but the server-side tasks may still be running
for handle in reader_handles {
let _elapsed = handle.await.unwrap();
}

let readers_done_time = test_start.elapsed();
println!("All readers dropped after {:?}", readers_done_time);

// The key assertion: if the bug is fixed, the server should have released
// resources for the disconnected readers quickly. We can't directly observe
// server-side behavior from here, but we can test indirectly by:
// 1. Checking that the writer can complete normally
// 2. Measuring total test time

// Wait for writer to complete
let _body = writer_handle.await.unwrap();

let total_time = test_start.elapsed();
println!("Total test time: {:?}", total_time);

// The test passes if it completes. The real verification is in the
// test_cache_lock_reader_disconnect_timing test below which measures
// server-side behavior.
}

/// Test that server-side handlers exit quickly when clients disconnect during cache lock wait.
///
/// This is the key regression test for the cache lock client disconnect fix.
///
/// Setup:
/// - Cache lock age timeout: 2 seconds (configured in server_utils.rs)
/// - Writer: holds lock while upstream responds slowly (0.3s)
/// - Readers: wait on lock, then disconnect after 100ms
///
/// Without fix (BUG):
/// - Server handlers wait for full 2-second age timeout
/// - Server tries to write to disconnected clients -> errors after 2s
/// - We observe "cache unlocked AgeTimeout" in logs
///
/// With fix:
/// - Server handlers detect disconnect immediately via tokio::select!
/// - Server exits cache lock wait within ~100ms
/// - We observe "Client disconnected during cache lock wait" in logs
///
/// The test measures time from reader disconnect to server-side completion
/// by observing when the server stops processing the request.
#[tokio::test]
async fn test_cache_lock_reader_disconnect_timing() {
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::{sleep, timeout};

init();

// Use a unique URL for this test
let url_path = "/sleep/test_cache_lock_reader_disconnect_timing.txt";
let proxy_addr = "127.0.0.1:6148";
let full_url = format!("http://{}{}", proxy_addr, url_path);

let test_start = Instant::now();

// Counter to track when server finishes processing disconnected readers
// We detect this by when the TCP connection is fully closed from server side
let server_close_count = Arc::new(AtomicU32::new(0));

// Start writer with upstream delay LONGER than the 2s lock timeout
// This ensures readers will hit the lock timeout (without fix) or
// detect disconnect (with fix) before the writer completes
let writer_handle = tokio::spawn({
let full_url = full_url.clone();
async move {
let res = reqwest::Client::new()
.get(&full_url)
.header("x-lock", "true")
.header("x-set-sleep", "5") // 5 seconds - longer than 2s lock timeout
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
res.text().await.unwrap()
}
});

// Give writer time to acquire lock
sleep(Duration::from_millis(100)).await;

// Start readers that will disconnect early, but keep connection open
// to observe when server closes it
let mut reader_handles = Vec::new();
for i in 0..3 {
let url_path = url_path.to_string();
let proxy_addr = proxy_addr.to_string();
let close_count = server_close_count.clone();

reader_handles.push(tokio::spawn(async move {
let mut stream = TcpStream::connect(&proxy_addr).await.unwrap();
let request = format!(
"GET {} HTTP/1.1\r\n\
Host: 127.0.0.1:6148\r\n\
x-lock: true\r\n\
Connection: close\r\n\
\r\n",
url_path
);
stream.write_all(request.as_bytes()).await.unwrap();

// Wait for request to be received and reader to start waiting on lock
sleep(Duration::from_millis(100)).await;

// Shutdown write side to signal disconnect to server
// But keep read side open to observe when server closes
let _ = stream.shutdown().await;

let disconnect_time = Instant::now();
println!("Reader {} sent shutdown at {:?}", i, test_start.elapsed());

// Wait for server to close the connection (or timeout)
// With fix: Server detects shutdown and closes quickly
// Without fix: Server waits for lock timeout, then closes
let mut buf = [0u8; 1024];
let read_result = timeout(Duration::from_secs(5), stream.read(&mut buf)).await;

let server_close_time = disconnect_time.elapsed();
close_count.fetch_add(1, Ordering::SeqCst);

println!(
"Reader {} server closed after {:?} (result: {:?})",
i, server_close_time, read_result
);

server_close_time
}));
}

// Wait for all readers to observe server close
let mut close_times = Vec::new();
for handle in reader_handles {
let close_time = handle.await.unwrap();
close_times.push(close_time);
}

// Get the maximum close time (slowest reader)
let max_close_time = close_times.iter().max().unwrap();
println!(
"Max server close time: {:?}, total test time: {:?}",
max_close_time,
test_start.elapsed()
);

// KEY ASSERTION: With the fix, server should close connections quickly
// (within ~500ms of client shutdown).
// Without fix, server waits for 2-second lock timeout.
//
// We use 1 second as threshold - well above fixed behavior (~100ms)
// but well below buggy behavior (~2000ms)
assert!(
max_close_time.as_millis() < 1000,
"Server took too long to close connection: {:?}. \
Expected <1s with fix, but got >1s suggesting server waited for lock timeout. \
This indicates the cache lock disconnect detection fix may not be working.",
max_close_time
);

// Wait for writer to complete
let _body = writer_handle.await.unwrap();

println!("Test completed successfully in {:?}", test_start.elapsed());
}
}