Skip to content

Commit aca00b6

Browse files
Handle UpgradedBody in more filters
UpgradedBody was incorrectly no longer invoked for upstream response body filters or for downstream modules. Additionally allow supporting the rare case where UpgradedBody may be cached.
1 parent bcbd680 commit aca00b6

File tree

8 files changed

+142
-51
lines changed

8 files changed

+142
-51
lines changed

.bleep

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
a83d44c1042cc8071f1361441e606e50a22c68f4
1+
71f26703aeb326cc03ccf2d200a1784c915ffb49

pingora-proxy/src/lib.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ where
357357
.await?;
358358
None
359359
}
360-
HttpTask::Body(data, eos) => self
360+
HttpTask::Body(data, eos) | HttpTask::UpgradedBody(data, eos) => self
361361
.inner
362362
.upstream_response_body_filter(session, data, *eos, ctx)?,
363363
HttpTask::Trailer(Some(trailers)) => {
@@ -563,6 +563,7 @@ impl Session {
563563
}
564564

565565
pub async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
566+
let mut seen_upgraded = self.was_upgraded();
566567
for task in tasks.iter_mut() {
567568
match task {
568569
HttpTask::Header(resp, end) => {
@@ -574,6 +575,11 @@ impl Session {
574575
self.downstream_modules_ctx
575576
.response_body_filter(data, *end)?;
576577
}
578+
HttpTask::UpgradedBody(data, end) => {
579+
seen_upgraded = true;
580+
self.downstream_modules_ctx
581+
.response_body_filter(data, *end)?;
582+
}
577583
HttpTask::Trailer(trailers) => {
578584
if let Some(buf) = self
579585
.downstream_modules_ctx
@@ -584,6 +590,7 @@ impl Session {
584590
//
585591
// Note, this will not work if end of stream has already
586592
// been seen or we've written content-length bytes.
593+
// (Trailers should never come after upgraded body)
587594
*task = HttpTask::Body(Some(buf), true);
588595
}
589596
}
@@ -597,7 +604,11 @@ impl Session {
597604
// Note, this will not work if end of stream has already
598605
// been seen or we've written content-length bytes.
599606
if let Some(buf) = self.downstream_modules_ctx.response_done_filter()? {
600-
*task = HttpTask::Body(Some(buf), true);
607+
if seen_upgraded {
608+
*task = HttpTask::UpgradedBody(Some(buf), true);
609+
} else {
610+
*task = HttpTask::Body(Some(buf), true);
611+
}
601612
}
602613
}
603614
_ => { /* Failed */ }

pingora-proxy/src/proxy_cache.rs

Lines changed: 48 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -661,52 +661,49 @@ where
661661
}
662662
}
663663
}
664-
HttpTask::Body(data, end_stream) => match data {
665-
Some(d) => {
666-
if session.cache.enabled() {
667-
// TODO: do this async
668-
// fail if writing the body would exceed the max_file_size_bytes
669-
let body_size_allowed =
670-
session.cache.track_body_bytes_for_max_file_size(d.len());
671-
if !body_size_allowed {
672-
debug!("chunked response exceeded max cache size, remembering that it is uncacheable");
673-
session
674-
.cache
675-
.response_became_uncacheable(NoCacheReason::ResponseTooLarge);
676-
677-
return Error::e_explain(
678-
ERR_RESPONSE_TOO_LARGE,
679-
format!(
680-
"writing data of size {} bytes would exceed max file size of {} bytes",
681-
d.len(),
682-
session.cache.max_file_size_bytes().expect("max file size bytes must be set to exceed size")
683-
),
684-
);
685-
}
664+
HttpTask::Body(data, end_stream) | HttpTask::UpgradedBody(data, end_stream) => {
665+
// It is not normally advisable to cache upgraded responses
666+
// e.g. they are essentially close-delimited, so they are easily truncated
667+
// but the framework still allows for it
668+
match data {
669+
Some(d) => {
670+
if session.cache.enabled() {
671+
// TODO: do this async
672+
// fail if writing the body would exceed the max_file_size_bytes
673+
let body_size_allowed =
674+
session.cache.track_body_bytes_for_max_file_size(d.len());
675+
if !body_size_allowed {
676+
debug!("chunked response exceeded max cache size, remembering that it is uncacheable");
677+
session
678+
.cache
679+
.response_became_uncacheable(NoCacheReason::ResponseTooLarge);
680+
681+
return Error::e_explain(
682+
ERR_RESPONSE_TOO_LARGE,
683+
format!(
684+
"writing data of size {} bytes would exceed max file size of {} bytes",
685+
d.len(),
686+
session.cache.max_file_size_bytes().expect("max file size bytes must be set to exceed size")
687+
),
688+
);
689+
}
686690

687-
// this will panic if more data is sent after we see end_stream
688-
// but should be impossible in real world
689-
let miss_handler = session.cache.miss_handler().unwrap();
691+
// this will panic if more data is sent after we see end_stream
692+
// but should be impossible in real world
693+
let miss_handler = session.cache.miss_handler().unwrap();
690694

691-
miss_handler.write_body(d.clone(), *end_stream).await?;
692-
if *end_stream {
693-
session.cache.finish_miss_handler().await?;
695+
miss_handler.write_body(d.clone(), *end_stream).await?;
696+
if *end_stream {
697+
session.cache.finish_miss_handler().await?;
698+
}
694699
}
695700
}
696-
}
697-
None => {
698-
if session.cache.enabled() && *end_stream {
699-
session.cache.finish_miss_handler().await?;
701+
None => {
702+
if session.cache.enabled() && *end_stream {
703+
session.cache.finish_miss_handler().await?;
704+
}
700705
}
701706
}
702-
},
703-
HttpTask::UpgradedBody(..) => {
704-
// caching upgraded bodies isn't supported with and doesn't make sense with the HttpCache
705-
// (caller of cache http task will disable cache in the session)
706-
return Error::e_explain(
707-
InternalError,
708-
"Unexpected UpgradedBody task while caching",
709-
);
710707
}
711708
HttpTask::Trailer(_) => {} // h1 trailer is not supported yet
712709
HttpTask::Done => {
@@ -2285,7 +2282,16 @@ impl ServeFromCache {
22852282
&mut self,
22862283
cache: &mut HttpCache,
22872284
range: &mut RangeBodyFilter,
2285+
upgraded: bool,
22882286
) -> Result<HttpTask> {
2287+
fn body_task(data: Bytes, upgraded: bool) -> HttpTask {
2288+
if upgraded {
2289+
HttpTask::UpgradedBody(Some(data), false)
2290+
} else {
2291+
HttpTask::Body(Some(data), false)
2292+
}
2293+
}
2294+
22892295
if !cache.enabled() {
22902296
// Cache is disabled due to internal error
22912297
// TODO: if nothing is sent to eyeball yet, figure out a way to recovery by
@@ -2317,7 +2323,7 @@ impl ServeFromCache {
23172323
}
23182324
loop {
23192325
if let Some(b) = cache.hit_handler().read_body().await? {
2320-
return Ok(HttpTask::Body(Some(b), false)); // false for now
2326+
return Ok(body_task(b, upgraded));
23212327
}
23222328
// EOF from hit handler for body requested
23232329
// if multipart, then seek again
@@ -2336,7 +2342,7 @@ impl ServeFromCache {
23362342
// safety: caller of enable_miss() call it only if the async_body_reader exist
23372343
loop {
23382344
if let Some(b) = cache.miss_body_reader().unwrap().read_body().await? {
2339-
return Ok(HttpTask::Body(Some(b), false)); // false for now
2345+
return Ok(body_task(b, upgraded));
23402346
} else {
23412347
// EOF from hit handler for body requested
23422348
// if multipart, then seek again

pingora-proxy/src/proxy_custom.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ where
317317
// partial read support, this check will also be false if cache is disabled.
318318
let support_cache_partial_read =
319319
session.cache.support_streaming_partial_write() == Some(true);
320+
let upgraded = session.was_upgraded();
320321

321322
tokio::select! {
322323
body = session.downstream_session.read_body_or_idle(downstream_state.is_done()), if downstream_state.can_poll() => {
@@ -423,7 +424,7 @@ where
423424
}
424425
}
425426

426-
task = serve_from_cache.next_http_task(&mut session.cache, &mut range_body_filter),
427+
task = serve_from_cache.next_http_task(&mut session.cache, &mut range_body_filter, upgraded),
427428
if !response_state.cached_done() && !downstream_state.is_errored() && serve_from_cache.is_on() => {
428429
let task = self.custom_response_filter(session, task?, ctx,
429430
&mut serve_from_cache,

pingora-proxy/src/proxy_h1.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ where
370370
// partial read support, this check will also be false if cache is disabled.
371371
let support_cache_partial_read =
372372
session.cache.support_streaming_partial_write() == Some(true);
373+
let upgraded = session.was_upgraded();
373374

374375
tokio::select! {
375376
// only try to send to pipe if there is capacity to avoid deadlock
@@ -500,7 +501,7 @@ where
500501
}
501502
},
502503

503-
task = serve_from_cache.next_http_task(&mut session.cache, &mut range_body_filter),
504+
task = serve_from_cache.next_http_task(&mut session.cache, &mut range_body_filter, upgraded),
504505
if !response_state.cached_done() && !downstream_state.is_errored() && serve_from_cache.is_on() => {
505506

506507
let task = self.h1_response_filter(session, task?, ctx,
@@ -720,7 +721,13 @@ where
720721
Ok(HttpTask::Body(data, end))
721722
}
722723
HttpTask::UpgradedBody(mut data, end) => {
723-
// range / caching doesn't apply to upgraded body
724+
if track_max_cache_size {
725+
session
726+
.cache
727+
.track_body_bytes_for_max_file_size(data.as_ref().map_or(0, |d| d.len()));
728+
}
729+
730+
// range doesn't apply to upgraded body
724731
if let Some(duration) = self
725732
.inner
726733
.response_body_filter(session, &mut data, end, ctx)?

pingora-proxy/src/proxy_h2.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ where
340340
// partial read support, this check will also be false if cache is disabled.
341341
let support_cache_partial_read =
342342
session.cache.support_streaming_partial_write() == Some(true);
343+
let upgraded = session.was_upgraded();
343344

344345
// Similar logic in h1 need to reserve capacity first to avoid deadlock
345346
// But we don't need to do the same because the h2 client_body pipe is unbounded (never block)
@@ -447,7 +448,7 @@ where
447448
}
448449
}
449450

450-
task = serve_from_cache.next_http_task(&mut session.cache, &mut range_body_filter),
451+
task = serve_from_cache.next_http_task(&mut session.cache, &mut range_body_filter, upgraded),
451452
if !response_state.cached_done() && !downstream_state.is_errored() && serve_from_cache.is_on() => {
452453
let task = self.h2_response_filter(session, task?, ctx,
453454
&mut serve_from_cache,

pingora-proxy/tests/test_upstream.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,6 +1138,55 @@ mod test_cache {
11381138
assert_eq!(res.text().await.unwrap(), "");
11391139
}
11401140

1141+
#[tokio::test]
1142+
async fn test_cache_websocket_101() {
1143+
// Test the unlikely scenario in which users may want to cache WS
1144+
init();
1145+
1146+
// First request - should be a miss
1147+
let mut stream = TcpStream::connect("127.0.0.1:6148").await.unwrap();
1148+
let req = concat!(
1149+
"GET /unique/test_cache_websocket_101/upgrade HTTP/1.1\r\n",
1150+
"Host: 127.0.0.1\r\n",
1151+
"Upgrade: websocket\r\n",
1152+
"Connection: Upgrade\r\n",
1153+
"X-Cache-Websocket: 1\r\n",
1154+
"\r\n"
1155+
);
1156+
stream.write_all(req.as_bytes()).await.unwrap();
1157+
stream.flush().await.unwrap();
1158+
1159+
let expected_payload = b"hello\n";
1160+
let fut = read_response(&mut stream, expected_payload.len());
1161+
let (resp_header, resp_body) = timeout(Duration::from_secs(5), fut).await.unwrap();
1162+
1163+
assert_eq!(resp_header.status, 101);
1164+
assert_eq!(resp_header.headers["Upgrade"], "websocket");
1165+
assert_eq!(resp_header.headers["x-cache-status"], "miss");
1166+
assert_eq!(resp_body, expected_payload);
1167+
1168+
// Second request - should be a cache hit
1169+
let mut stream = TcpStream::connect("127.0.0.1:6148").await.unwrap();
1170+
let req = concat!(
1171+
"GET /unique/test_cache_websocket_101/upgrade HTTP/1.1\r\n",
1172+
"Host: 127.0.0.1\r\n",
1173+
"Upgrade: websocket\r\n",
1174+
"Connection: Upgrade\r\n",
1175+
"X-Cache-Websocket: 1\r\n",
1176+
"\r\n"
1177+
);
1178+
stream.write_all(req.as_bytes()).await.unwrap();
1179+
stream.flush().await.unwrap();
1180+
1181+
let fut = read_response(&mut stream, expected_payload.len());
1182+
let (resp_header, resp_body) = timeout(Duration::from_secs(5), fut).await.unwrap();
1183+
1184+
assert_eq!(resp_header.status, 101);
1185+
assert_eq!(resp_header.headers["Upgrade"], "websocket");
1186+
assert_eq!(resp_header.headers["x-cache-status"], "hit");
1187+
assert_eq!(resp_body, expected_payload);
1188+
}
1189+
11411190
#[tokio::test]
11421191
async fn test_1xx_caching() {
11431192
// 1xx shouldn't interfere with HTTP caching

pingora-proxy/tests/utils/server_utils.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use pingora_proxy::{FailToProxy, ProxyHttp, Session};
4747
use std::collections::{HashMap, HashSet};
4848
use std::sync::Arc;
4949
use std::thread;
50-
use std::time::Duration;
50+
use std::time::{Duration, SystemTime};
5151

5252
pub struct ExampleProxyHttps {}
5353

@@ -574,10 +574,26 @@ impl ProxyHttp for ExampleProxyCache {
574574

575575
fn response_cache_filter(
576576
&self,
577-
_session: &Session,
577+
session: &Session,
578578
resp: &ResponseHeader,
579579
_ctx: &mut Self::CTX,
580580
) -> Result<RespCacheable> {
581+
// Allow testing the unlikely case of caching a 101 response
582+
if resp.status == 101
583+
&& session
584+
.req_header()
585+
.headers
586+
.contains_key("x-cache-websocket")
587+
{
588+
return Ok(RespCacheable::Cacheable(CacheMeta::new(
589+
SystemTime::now() + Duration::from_secs(5),
590+
SystemTime::now(),
591+
0,
592+
0,
593+
resp.clone(),
594+
)));
595+
}
596+
581597
let cc = CacheControl::from_resp_headers(resp);
582598
Ok(resp_cacheable(
583599
cc.as_ref(),

0 commit comments

Comments
 (0)