diff --git a/pingora-proxy/src/proxy_cache.rs b/pingora-proxy/src/proxy_cache.rs index c026d739..fc07ddfe 100644 --- a/pingora-proxy/src/proxy_cache.rs +++ b/pingora-proxy/src/proxy_cache.rs @@ -160,11 +160,23 @@ where // cache miss if session.cache.is_cache_locked() { // Another request is filling the cache; try waiting til that's done and retry. - let lock_status = session.cache.cache_lock_wait().await; - if self.handle_lock_status(session, ctx, lock_status) { - continue; + // Race the cache lock wait against client disconnect detection + let lock_status = + Self::cache_lock_wait_or_disconnect(session, ctx).await; + if let Some(status) = lock_status { + if self.handle_lock_status(session, ctx, status) { + continue; + } else { + break None; + } } else { - break None; + // Client disconnected while waiting for cache lock + debug!( + "Client disconnected during cache lock wait, {}", + self.inner.request_summary(session, ctx) + ); + // Return with downstream not reusable + break Some((false, None)); } } else { self.inner.cache_miss(session, ctx); @@ -194,11 +206,22 @@ where let will_serve_stale = session.cache.can_serve_stale_updating() && self.inner.should_serve_stale(session, ctx, None); if !will_serve_stale { - let lock_status = session.cache.cache_lock_wait().await; - if self.handle_lock_status(session, ctx, lock_status) { - continue; + // Race the cache lock wait against client disconnect detection + let lock_status = + Self::cache_lock_wait_or_disconnect(session, ctx).await; + if let Some(status) = lock_status { + if self.handle_lock_status(session, ctx, status) { + continue; + } else { + break None; + } } else { - break None; + // Client disconnected while waiting for cache lock + debug!( + "Client disconnected during cache lock wait (stale), {}", + self.inner.request_summary(session, ctx) + ); + break Some((false, None)); } } // else continue to serve stale @@ -944,6 +967,71 @@ where LockStatus::Waiting => panic!("impossible LockStatus::Waiting"), } } + + /// Wait for a cache lock while also detecting client disconnection. + /// + /// This function races the cache lock wait against monitoring for client disconnect. + /// If the client disconnects while waiting for the cache lock, this function returns + /// `None` to signal that the request should be aborted. + /// + /// Returns: + /// - `Some(LockStatus)` if the cache lock completes (writer released the lock) + /// - `None` if the client disconnected while waiting + async fn cache_lock_wait_or_disconnect( + session: &mut Session, + _ctx: &SV::CTX, + ) -> Option + where + SV: ProxyHttp + Send + Sync + 'static, + SV::CTX: Send + Sync, + { + // Check if we should try to detect client disconnect. + // For subrequests (background cache fills), there's no real downstream client, + // so we skip disconnect detection. + if session.subrequest_ctx.is_some() { + // This is a subrequest - no downstream client to monitor + return Some(session.cache.cache_lock_wait().await); + } + + // Determine if we can safely wait for idle (client disconnect). + // We can only call read_body_or_idle(true) if: + // 1. The request body is done/empty (common for GET requests) + // 2. We haven't started streaming the body yet + let body_done = session.as_mut().is_body_done(); + + if !body_done { + // Request body is still being sent - we can't safely monitor for disconnect + // without potentially interfering with body reading. + // Fall back to the regular cache lock wait. + return Some(session.cache.cache_lock_wait().await); + } + + // Race cache lock wait against client disconnect detection. + tokio::select! { + biased; + + // Prefer completing the cache lock wait if both are ready + lock_status = session.cache.cache_lock_wait() => { + Some(lock_status) + } + + // Monitor for client disconnect + disconnect_result = session.downstream_session.read_body_or_idle(true) => { + // read_body_or_idle(true) returns an error when client disconnects + match disconnect_result { + Ok(_) => { + // This shouldn't happen with true flag when body is done, + // but treat it as disconnect + None + } + Err(_) => { + // Client disconnected + None + } + } + } + } + } } fn cache_hit_header(cache: &HttpCache) -> Box { diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs index 5125d30a..1b72835f 100644 --- a/pingora-proxy/tests/test_upstream.rs +++ b/pingora-proxy/tests/test_upstream.rs @@ -3118,4 +3118,266 @@ mod test_cache { assert_eq!(headers["x-cache-status"], "hit"); assert_eq!(res.text().await.unwrap(), "hello world"); } + + /// Test that cache lock readers exit quickly when their client disconnects. + /// + /// This test demonstrates a bug where readers waiting on a cache lock continue + /// waiting for the full lock timeout even when their downstream client has disconnected. + /// + /// The scenario: + /// 1. Writer request holds the cache lock while fetching from a slow upstream (5s delay) + /// 2. Multiple reader requests start waiting on the cache lock + /// 3. Reader clients disconnect after a short time (100ms) + /// 4. BUG: Readers continue waiting for the full lock timeout (2s) instead of exiting + /// + /// Expected behavior (after fix): Readers should detect client disconnect and exit quickly + /// Current behavior (bug): Readers wait for full lock timeout regardless of client state + #[tokio::test] + async fn test_cache_lock_reader_client_disconnect() { + use std::time::Instant; + use tokio::io::AsyncWriteExt; + use tokio::net::TcpStream; + use tokio::time::sleep; + + init(); + + // Use a unique URL to avoid interference from other tests + let url_path = "/sleep/test_cache_lock_reader_client_disconnect.txt"; + let proxy_addr = "127.0.0.1:6148"; + let full_url = format!("http://{}{}", proxy_addr, url_path); + + let test_start = Instant::now(); + + // Start the writer request - this will hold the cache lock while waiting for upstream + // The upstream will take 5 seconds to respond, which is longer than the 2s lock timeout + let writer_handle = tokio::spawn({ + let full_url = full_url.clone(); + async move { + let res = reqwest::Client::new() + .get(&full_url) + .header("x-lock", "true") + .header("x-set-sleep", "5") // 5 second upstream delay + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + res.text().await.unwrap() + } + }); + + // Give the writer time to acquire the lock + sleep(Duration::from_millis(100)).await; + + // Start multiple reader requests that will wait on the cache lock + // Each reader will have their TCP connection dropped after 100ms + let reader_count = 3; + let mut reader_handles = Vec::new(); + + for i in 0..reader_count { + let url_path = url_path.to_string(); + let proxy_addr = proxy_addr.to_string(); + + reader_handles.push(tokio::spawn(async move { + let reader_start = Instant::now(); + + // Connect with raw TCP so we can drop the connection + let mut stream = TcpStream::connect(&proxy_addr).await.unwrap(); + + // Send HTTP/1.1 request + let request = format!( + "GET {} HTTP/1.1\r\n\ + Host: 127.0.0.1:6148\r\n\ + x-lock: true\r\n\ + x-set-sleep: 0\r\n\ + Connection: close\r\n\ + \r\n", + url_path + ); + stream.write_all(request.as_bytes()).await.unwrap(); + + // Wait a short time (reader should be waiting on cache lock by now) + sleep(Duration::from_millis(100)).await; + + // Drop the connection - simulating client disconnect + drop(stream); + + let elapsed = reader_start.elapsed(); + println!("Reader {} disconnected after {:?}", i, elapsed); + elapsed + })); + } + + // Wait for all reader handles to complete + // Note: The readers complete when we drop their connection (from client side), + // but the server-side tasks may still be running + for handle in reader_handles { + let _elapsed = handle.await.unwrap(); + } + + let readers_done_time = test_start.elapsed(); + println!("All readers dropped after {:?}", readers_done_time); + + // The key assertion: if the bug is fixed, the server should have released + // resources for the disconnected readers quickly. We can't directly observe + // server-side behavior from here, but we can test indirectly by: + // 1. Checking that the writer can complete normally + // 2. Measuring total test time + + // Wait for writer to complete + let _body = writer_handle.await.unwrap(); + + let total_time = test_start.elapsed(); + println!("Total test time: {:?}", total_time); + + // The test passes if it completes. The real verification is in the + // test_cache_lock_reader_disconnect_timing test below which measures + // server-side behavior. + } + + /// Test that server-side handlers exit quickly when clients disconnect during cache lock wait. + /// + /// This is the key regression test for the cache lock client disconnect fix. + /// + /// Setup: + /// - Cache lock age timeout: 2 seconds (configured in server_utils.rs) + /// - Writer: holds lock while upstream responds slowly (0.3s) + /// - Readers: wait on lock, then disconnect after 100ms + /// + /// Without fix (BUG): + /// - Server handlers wait for full 2-second age timeout + /// - Server tries to write to disconnected clients -> errors after 2s + /// - We observe "cache unlocked AgeTimeout" in logs + /// + /// With fix: + /// - Server handlers detect disconnect immediately via tokio::select! + /// - Server exits cache lock wait within ~100ms + /// - We observe "Client disconnected during cache lock wait" in logs + /// + /// The test measures time from reader disconnect to server-side completion + /// by observing when the server stops processing the request. + #[tokio::test] + async fn test_cache_lock_reader_disconnect_timing() { + use std::sync::atomic::{AtomicU32, Ordering}; + use std::sync::Arc; + use std::time::Instant; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::TcpStream; + use tokio::time::{sleep, timeout}; + + init(); + + // Use a unique URL for this test + let url_path = "/sleep/test_cache_lock_reader_disconnect_timing.txt"; + let proxy_addr = "127.0.0.1:6148"; + let full_url = format!("http://{}{}", proxy_addr, url_path); + + let test_start = Instant::now(); + + // Counter to track when server finishes processing disconnected readers + // We detect this by when the TCP connection is fully closed from server side + let server_close_count = Arc::new(AtomicU32::new(0)); + + // Start writer with upstream delay LONGER than the 2s lock timeout + // This ensures readers will hit the lock timeout (without fix) or + // detect disconnect (with fix) before the writer completes + let writer_handle = tokio::spawn({ + let full_url = full_url.clone(); + async move { + let res = reqwest::Client::new() + .get(&full_url) + .header("x-lock", "true") + .header("x-set-sleep", "5") // 5 seconds - longer than 2s lock timeout + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + res.text().await.unwrap() + } + }); + + // Give writer time to acquire lock + sleep(Duration::from_millis(100)).await; + + // Start readers that will disconnect early, but keep connection open + // to observe when server closes it + let mut reader_handles = Vec::new(); + for i in 0..3 { + let url_path = url_path.to_string(); + let proxy_addr = proxy_addr.to_string(); + let close_count = server_close_count.clone(); + + reader_handles.push(tokio::spawn(async move { + let mut stream = TcpStream::connect(&proxy_addr).await.unwrap(); + let request = format!( + "GET {} HTTP/1.1\r\n\ + Host: 127.0.0.1:6148\r\n\ + x-lock: true\r\n\ + Connection: close\r\n\ + \r\n", + url_path + ); + stream.write_all(request.as_bytes()).await.unwrap(); + + // Wait for request to be received and reader to start waiting on lock + sleep(Duration::from_millis(100)).await; + + // Shutdown write side to signal disconnect to server + // But keep read side open to observe when server closes + let _ = stream.shutdown().await; + + let disconnect_time = Instant::now(); + println!("Reader {} sent shutdown at {:?}", i, test_start.elapsed()); + + // Wait for server to close the connection (or timeout) + // With fix: Server detects shutdown and closes quickly + // Without fix: Server waits for lock timeout, then closes + let mut buf = [0u8; 1024]; + let read_result = timeout(Duration::from_secs(5), stream.read(&mut buf)).await; + + let server_close_time = disconnect_time.elapsed(); + close_count.fetch_add(1, Ordering::SeqCst); + + println!( + "Reader {} server closed after {:?} (result: {:?})", + i, server_close_time, read_result + ); + + server_close_time + })); + } + + // Wait for all readers to observe server close + let mut close_times = Vec::new(); + for handle in reader_handles { + let close_time = handle.await.unwrap(); + close_times.push(close_time); + } + + // Get the maximum close time (slowest reader) + let max_close_time = close_times.iter().max().unwrap(); + println!( + "Max server close time: {:?}, total test time: {:?}", + max_close_time, + test_start.elapsed() + ); + + // KEY ASSERTION: With the fix, server should close connections quickly + // (within ~500ms of client shutdown). + // Without fix, server waits for 2-second lock timeout. + // + // We use 1 second as threshold - well above fixed behavior (~100ms) + // but well below buggy behavior (~2000ms) + assert!( + max_close_time.as_millis() < 1000, + "Server took too long to close connection: {:?}. \ + Expected <1s with fix, but got >1s suggesting server waited for lock timeout. \ + This indicates the cache lock disconnect detection fix may not be working.", + max_close_time + ); + + // Wait for writer to complete + let _body = writer_handle.await.unwrap(); + + println!("Test completed successfully in {:?}", test_start.elapsed()); + } }