diff --git a/crates/adapter/src/fastly/cache.rs b/crates/adapter/src/fastly/cache.rs index 70a92eb2..06cbf503 100644 --- a/crates/adapter/src/fastly/cache.rs +++ b/crates/adapter/src/fastly/cache.rs @@ -20,6 +20,8 @@ bitflags::bitflags! { pub struct CacheLookupOptionsMask: u32 { const _RESERVED = 1 << 0; const REQUEST_HEADERS = 1 << 1; + const SERVICE_ID = 1 << 2; + const ALWAYS_USE_REQUESTED_RANGE = 1 << 3; } } @@ -113,6 +115,15 @@ mod cache { Self::REQUEST_HEADERS, value.contains(CacheLookupOptionsMask::REQUEST_HEADERS), ); + flags.set( + Self::SERVICE_ID, + value.contains(CacheLookupOptionsMask::SERVICE_ID), + ); + flags.set( + Self::ALWAYS_USE_REQUESTED_RANGE, + value.contains(CacheLookupOptionsMask::ALWAYS_USE_REQUESTED_RANGE), + ); + flags } } @@ -121,6 +132,9 @@ mod cache { fn from(value: CacheLookupOptions) -> Self { Self { request_headers: value.request_headers, + // service_id is not supported in Viceroy. + // We ignore the value, but pass through the flag, so Viceroy can still raise an + // error when it's used-but-unsupported. } } } diff --git a/lib/compute-at-edge-abi/cache.witx b/lib/compute-at-edge-abi/cache.witx index fe9d4451..3243a81f 100644 --- a/lib/compute-at-edge-abi/cache.witx +++ b/lib/compute-at-edge-abi/cache.witx @@ -24,6 +24,7 @@ $reserved $request_headers $service_id + $always_use_requested_range ) ) @@ -43,6 +44,7 @@ $request_headers $replace_strategy $service_id + $always_use_requested_range ) ) diff --git a/lib/data/viceroy-component-adapter.wasm b/lib/data/viceroy-component-adapter.wasm index 5451ccbc..99ae5969 100755 Binary files a/lib/data/viceroy-component-adapter.wasm and b/lib/data/viceroy-component-adapter.wasm differ diff --git a/lib/proptest-regressions/body.txt b/lib/proptest-regressions/body.txt new file mode 100644 index 00000000..3e65f2af --- /dev/null +++ b/lib/proptest-regressions/body.txt @@ -0,0 +1,8 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc 0008d74790fb20a629ca4cc6f2c016c4a944c17d3059ee654ef4a48595b1fd0f # shrinks to body = Channel(Receiver { chan: Rx { inner: Chan { tx: Tx { block_tail: 0x760140044890, tail_position: 3 }, semaphore: Semaphore { semaphore: Semaphore { permits: 0 }, bound: 2 }, rx_waker: AtomicWaker, tx_count: 0, rx_fields: "..." } } }) +cc c1b60ad59f679fb3a4fc4bc96c50acedfc321467a9c0792f66321938054c0cad # shrinks to (body, chunk_lengths) = (b"\xe8\xfdiT3\x1dyr", [0]) diff --git a/lib/proptest-regressions/collecting_body.txt b/lib/proptest-regressions/collecting_body.txt new file mode 100644 index 00000000..5ae0dc86 --- /dev/null +++ b/lib/proptest-regressions/collecting_body.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc c36f255c0841ebe6cdb08ce10d8e5a6f12777d08a3aa4b76dd9c65c3f0dfdd27 # shrinks to (body, start) = ([b"\0"], 0) diff --git a/lib/src/cache.rs b/lib/src/cache.rs index 3dde8e5e..b2cfa4ef 100644 --- a/lib/src/cache.rs +++ b/lib/src/cache.rs @@ -16,7 +16,7 @@ use http::{HeaderMap, HeaderValue}; mod store; mod variance; -use store::{CacheData, CacheKeyObjects, ObjectMeta, Obligation}; +use store::{CacheData, CacheKeyObjects, GetBodyBuilder, ObjectMeta, Obligation}; pub use variance::VaryRule; #[derive(Debug, thiserror::Error)] @@ -135,15 +135,36 @@ pub struct CacheEntry { key: CacheKey, found: Option, go_get: Option, + + /// Respect the range in body() even when the body length is not yet known. + /// + /// When a cached item is Found, the length of the cached item may or may not be known: + /// if no expected length was provided and the body is still streaming, the length is unknown. + /// + /// When always_use_requested_range is false, and the length is unknown, + /// body() returns the full body regardless of the requested range. + /// When always_use_requested_range is true, and the length is unknown, + /// body() blocks until the start of the range is available. + always_use_requested_range: bool, } impl CacheEntry { + /// Set the always_use_requested_range flag. + /// This applies to all subsequent lookups from this CacheEntry or future entries derived from it. + pub fn with_always_use_requested_range(self, always_use_requested_range: bool) -> Self { + Self { + always_use_requested_range, + ..self + } + } + /// Return a stub entry to hold in CacheBusy. pub fn stub(&self) -> CacheEntry { Self { key: self.key.clone(), found: None, go_get: None, + always_use_requested_range: false, } } @@ -172,6 +193,20 @@ impl CacheEntry { self.go_get.take().is_some() } + /// Access the body of the cached item, if available. + pub async fn body(&self, from: Option, to: Option) -> Result { + let found = self + .found + .as_ref() + .ok_or(crate::Error::CacheError(Error::Missing))?; + found + .get_body() + .with_range(from, to) + .with_always_use_requested_range(self.always_use_requested_range) + .build() + .await + } + /// Insert the provided body into the cache. /// /// Returns a CacheEntry where the new item is Found. @@ -186,14 +221,15 @@ impl CacheEntry { key: self.key.clone(), found: Some(found), go_get: None, + always_use_requested_range: self.always_use_requested_range, }) } /// Freshen the existing cache item according to the new write options, /// without changing the body. - pub fn update(&mut self, options: WriteOptions) -> Result<(), crate::Error> { + pub async fn update(&mut self, options: WriteOptions) -> Result<(), crate::Error> { let go_get = self.go_get.take().ok_or(Error::NotRevalidatable)?; - match go_get.update(options) { + match go_get.update(options).await { Ok(()) => Ok(()), Err((go_get, err)) => { // On failure, preserve the obligation. @@ -228,9 +264,8 @@ impl From> for Found { } impl Found { - /// Access the body of the cached object. - pub fn body(&self) -> Result { - self.data.as_ref().get_body() + fn get_body(&self) -> GetBodyBuilder { + self.data.as_ref().body() } /// Access the metadata of the cached object. @@ -283,6 +318,7 @@ impl Cache { key: key.clone(), found, go_get: None, + always_use_requested_range: false, } } @@ -301,11 +337,9 @@ impl Cache { .await; CacheEntry { key: key.clone(), - found: found.map(|data| Found { - data, - last_body_handle: None, - }), + found: found.map(|v| v.into()), go_get: obligation, + always_use_requested_range: false, } } @@ -578,7 +612,7 @@ mod tests { let nonempty = cache.lookup(&key, &HeaderMap::default()).await; let found = nonempty.found().expect("should have found inserted key"); - let got = found.body().unwrap().read_into_vec().await.unwrap(); + let got = found.get_body().build().await.unwrap().read_into_vec().await.unwrap(); assert_eq!(got, value); }); } @@ -720,6 +754,7 @@ mod tests { stale_while_revalidate: Duration::from_secs(10), ..WriteOptions::default() }) + .await .unwrap(); // After this, should get the new response: @@ -746,7 +781,7 @@ mod tests { stale_while_revalidate: Duration::from_secs(10), ..WriteOptions::default() }; - txn1.update(opts.clone()).unwrap_err(); + txn1.update(opts.clone()).await.unwrap_err(); // But we should still be able to insert. txn1.insert(opts.clone(), Body::empty()).unwrap(); diff --git a/lib/src/cache/store.rs b/lib/src/cache/store.rs index 20199baa..7ba30ba7 100644 --- a/lib/src/cache/store.rs +++ b/lib/src/cache/store.rs @@ -4,6 +4,7 @@ use crate::cache::{variance::VaryRule, Error}; use bytes::Bytes; use std::{ collections::{HashMap, VecDeque}, + future::Future, sync::{atomic::AtomicBool, Arc}, time::{Duration, Instant}, }; @@ -507,11 +508,14 @@ impl Obligation { } /// Fulfill the obligation by freshening the existing entry. - pub(super) fn update(mut self, options: WriteOptions) -> Result<(), (Self, crate::Error)> { + pub(super) async fn update( + mut self, + options: WriteOptions, + ) -> Result<(), (Self, crate::Error)> { let Some(present) = &self.present else { return Err((self, Error::NotRevalidatable.into())); }; - let body = match present.get_body() { + let body = match present.body().build().await { Ok(body) => body, Err(e) => return Err((self, e)), }; @@ -555,10 +559,112 @@ pub(crate) struct CacheData { body: CollectingBody, } +/// A holder for the get_body options. +pub struct GetBodyBuilder<'a> { + cache_data: &'a CacheData, + from: Option, + to: Option, + always_use_requested_range: bool, +} + +impl GetBodyBuilder<'_> { + /// Add range bounds to the body. + /// + /// If "from" is provided, "to" indicates an offset from the start of the cached item. + /// If "to" is provided but not "from", "to" indicates an offset from the end of the cached + /// item. + pub fn with_range(self, from: Option, to: Option) -> Self { + Self { from, to, ..self } + } + + pub fn with_always_use_requested_range(self, always_use_requested_range: bool) -> Self { + Self { + always_use_requested_range, + ..self + } + } +} + +impl<'a> GetBodyBuilder<'a> { + /// Access the body of this cached item. + /// + /// In some cases (streaming), the Future may not become ready until the first byte of output is available. + pub fn build(self) -> impl Future> + use<'a> { + async move { + // Early "return whole body" cases: + // "ignore requested range when length is unknown", the old default: + let ignore_requested_range = + !self.always_use_requested_range && self.cache_data.length().is_none(); + // No requested range provided: + let no_range_provided = self.from.is_none() && self.to.is_none(); + // Known length and invalid range: + let valid_range = match (self.cache_data.length(), self.from, self.to) { + (None, _, _) => true, + (Some(length), None, Some(to)) if !(1..=length).contains(&to) => false, + (Some(length), Some(from), _) if !(0..length).contains(&from) => false, + (Some(length), Some(from), Some(to)) if !(from..length).contains(&to) => false, + _ => true, + }; + + // In each of these cases, we return the body immediately, + // without waiting for any body to exist. + if ignore_requested_range || no_range_provided || !valid_range { + return self.cache_data.body.read(); + } + + // At least one of (start, end) is provided. + + let (start, end) = if let (None, Some(end)) = (self.from, self.to) { + // We need to convert from "from the end" to "from the start". + // To do that, we need a known or expected length. + if self.cache_data.length().is_none() { + // We don't have an expected length; we have to wait for the end of input. + self.cache_data.body.known_length().await?; + } + + let length = self + .cache_data + .length() + .expect("unknown length after waiting"); + if end > length { + // Asked for more bytes than are available. + // In the case of an invalid range, Compute returns the entire body + // (as in HTTP). + return self.cache_data.body.read(); + } + // Convert to a (start, ...) sequence: + (Some(length - end), None) + } else { + (self.from, self.to) + }; + + let start = start.unwrap_or(0); + + // If the length is not known up-front, + // wait for the first byte to exist before returning a body. + // Yes, this only applies when the length is unknown. + if self.cache_data.length().is_none() { + self.cache_data.body.wait_length(start + 1).await?; + } + + // Convert from inclusive bounds (GetBodyBuilder) to exclusive (read_range), + // and provide the body. + self.cache_data + .body + .read_range(start, end.map(|end| end + 1)) + } + } +} + impl CacheData { /// Get a Body to read the cached object with. - pub(crate) fn get_body(&self) -> Result { - self.body.read() + pub(crate) fn body(&self) -> GetBodyBuilder { + GetBodyBuilder { + cache_data: self, + from: None, + to: None, + always_use_requested_range: false, + } } /// Access to object's metadata @@ -566,7 +672,7 @@ impl CacheData { &self.meta } - /// Return the length of this object, if known. + /// Return the length of this object, if the final or expected length is known. pub fn length(&self) -> Option { self.body.length().or_else(|| self.meta.length) } @@ -667,7 +773,7 @@ mod tests { assert!(found2.is_none()); assert!(obligation2.is_some()); - // Anotehr transaction on the same headers should pick up the same result: + // Another transaction on the same headers should pick up the same result: let busy1 = objects.transaction_get(&h1, true); let busy2 = objects.transaction_get(&h2, true); obligation2.unwrap().insert( @@ -687,13 +793,27 @@ mod tests { make_body("object 1"), ); if let (Some(found), None) = busy1.await { - let s = found.get_body().unwrap().read_into_string().await.unwrap(); + let s = found + .body() + .build() + .await + .unwrap() + .read_into_string() + .await + .unwrap(); assert_eq!(&s, "object 1"); } else { panic!("expected to block on object 1") } if let (Some(found), None) = busy2.await { - let s = found.get_body().unwrap().read_into_string().await.unwrap(); + let s = found + .body() + .build() + .await + .unwrap() + .read_into_string() + .await + .unwrap(); assert_eq!(&s, "object 2"); } else { panic!("expected to block on object 2") @@ -788,7 +908,7 @@ mod tests { None, ); - let body = e.get_body().expect("can read completed body"); + let body = e.body().build().await.expect("can read completed body"); let s = body .read_into_string() .await diff --git a/lib/src/collecting_body.rs b/lib/src/collecting_body.rs index d9c86915..dd27fbcc 100644 --- a/lib/src/collecting_body.rs +++ b/lib/src/collecting_body.rs @@ -50,11 +50,61 @@ impl CollectingBody { } } + /// Wait until the length is known (or the writer hangs up without finishing). + pub async fn known_length(&self) -> Result { + let mut recv = self.inner.clone(); + let state = recv + .wait_for(|state| !matches!(state, CollectingBodyInner::Streaming(_))) + .await + .expect("CollectingBody terminated, but not in state Complete or Error"); + match &*state { + CollectingBodyInner::Streaming(_) => unreachable!("wait_for un-matched this"), + CollectingBodyInner::Complete { body, .. } => { + Ok(body.iter().map(|v| v.len() as u64).sum()) + } + CollectingBodyInner::Error(error) => { + tracing::warn!( + "could not determine length of cache body; write error: {}", + error + ); + Err(Error::UnfinishedStreamingBody) + } + } + } + + /// Wait until at least this many bytes have been written, or the body is complete. + /// Returns Ok() if the body has at least `want` bytes, Err() otherwise. + pub async fn wait_length(&self, want: u64) -> Result<(), error::Error> { + let mut recv = self.inner.clone(); + let state = recv + .wait_for(|state| match state { + CollectingBodyInner::Streaming(items) => { + let length: u64 = items.iter().map(|v| v.len() as u64).sum(); + length >= want + } + _ => true, + }) + .await + .expect("CollectingBody terminated, but not in state Complete or Error"); + + match &*state { + CollectingBodyInner::Error(_) => Err(Error::UnfinishedStreamingBody), + CollectingBodyInner::Complete { body, .. } => { + let length: u64 = body.iter().map(|v| v.len() as u64).sum(); + if length >= want { + Ok(()) + } else { + Err(Error::UnfinishedStreamingBody) + } + } + CollectingBodyInner::Streaming(_) => Ok(()), + } + } + /// Create a new CollectingBody that stores & streams from the provided Body. /// /// Writes to the StreamingBody are collected, and propagated to all readers of this /// CollectingBody. - // TODO: Expected length? pub fn new(from: Body, length: Option) -> CollectingBody { let (tx, rx) = watch::channel(CollectingBodyInner::default()); let body = CollectingBody { inner: rx }; @@ -140,10 +190,17 @@ impl CollectingBody { /// Get a new read handle to this body. pub fn read(&self) -> Result { + self.read_range(0, None) + } + + /// Get a handle to read the requested range from this body. + /// Start is inclusive, end is exclusive. + pub fn read_range(&self, start: u64, end: Option) -> Result { let mut upstream = self.inner.clone(); let (mut tx, rx) = StreamingBody::new(); tokio::task::spawn(async move { let mut next_chunk = 0; + let mut cursor = 0u64; // The receiver tracks the "current" value, and assumes that the value at the receiver // is "seen" to begin with. // So we have a do-while loop, with the "changed" condition at the bottom. @@ -169,18 +226,59 @@ impl CollectingBody { }; // If send_chunks is nonempty, it contains data for us to forward: + next_chunk += send_chunks.len(); for chunk in send_chunks { + let chunk_start = cursor; + let chunk_end = cursor + chunk.len() as u64; + cursor = chunk_end; + + if end.is_some_and(|end| chunk_start >= end) { + // We have sent all the bytes we need to; don't process more chunks on this + // pass. + // But we don't immediately return, as there are still (in the future) + // trailers to process. + break; + } + + // We need to send either the whole chunk, or a portion of it. + let slice_start = std::cmp::max(start, chunk_start); + let slice_end = std::cmp::min(end.unwrap_or(u64::MAX), chunk_end); + if slice_end <= slice_start { + // Empty slice, skip this chunk. + continue; + } + + let chunk = if slice_start == chunk_start && slice_end == chunk_end { + // Proceed without copy + chunk + } else { + // Copy out only the bytes of interest + let range = slice_start.saturating_sub(chunk_start) as usize + ..(slice_end.saturating_sub(chunk_start)) as usize; + Bytes::copy_from_slice(&chunk[range]) + }; + if tx.send_chunk(chunk).await.is_err() { // Reader hung up; we don't care any more. return; } - next_chunk += 1; } // And we may have gotten the trailers, which are the "body is done" signal: if let Some(trailers) = trailers { for (k, v) in trailers.iter() { tx.append_trailer(k.clone(), v.clone()); } + + // Did the body terminate before our "end" offset? + if let Some(end) = end { + if cursor < end { + // Reached the end-of-input without getting the offset we wanted. + // This should be an "unfinished streaming body" error; + // so, return without "finish"ing. + return; + } + } + // We don't wait for the channel to be closed; // if the object stays in the cache, the object will be around ~forever. // Trailers sent -> we're done. @@ -189,8 +287,6 @@ impl CollectingBody { } // Now that we've processed the current state, wait for a change. - // - if upstream.changed().await.is_err() { return; // If there's an error, the Channel is closed. @@ -247,6 +343,7 @@ impl CollectingBodyInner { mod tests { use std::sync::Arc; + use bytes::{Bytes, BytesMut}; use http::{HeaderName, HeaderValue}; use tokio::{sync::oneshot, task::JoinSet}; @@ -256,6 +353,7 @@ mod tests { streaming_body::StreamingBody, Error, }; + use proptest::prelude::*; #[tokio::test] async fn stream_and_collect() { @@ -455,7 +553,7 @@ mod tests { } #[tokio::test] - async fn error_on_underfill() { + async fn error_on_expected_length_underfill() { let (mut tx, rx) = StreamingBody::new(); let collect = CollectingBody::new(Body::from(Chunk::from(rx)), Some(10)); @@ -467,6 +565,21 @@ mod tests { assert!(matches!(err, Error::UnfinishedStreamingBody)); } + #[tokio::test] + async fn error_on_range_underfill() { + let (mut tx, rx) = StreamingBody::new(); + + let collect = CollectingBody::new(Body::from(Chunk::from(rx)), None); + + let reader = collect.read_range(0, Some(10)).unwrap(); + + tx.send_chunk(b"hello".as_slice()).await.unwrap(); + tx.finish().unwrap(); + + let err = reader.read_into_vec().await.unwrap_err(); + assert!(matches!(err, Error::UnfinishedStreamingBody)); + } + #[tokio::test] async fn error_on_overfill() { let (mut tx, rx) = StreamingBody::new(); @@ -479,4 +592,148 @@ mod tests { let err = collect.read().unwrap().read_into_vec().await.unwrap_err(); assert!(matches!(err, Error::UnfinishedStreamingBody)); } + + /// Proptest strategy: get a nonempty set of Bytes. + fn some_bytes() -> impl Strategy { + proptest::collection::vec(any::(), 1..16).prop_map(|v| v.into()) + } + + /// Proptest strategy: a nonempty set of chunks (Byte blobs). + fn some_chunks() -> impl Strategy> { + proptest::collection::vec(some_bytes(), 1..16) + } + + /// Utility function for converting from other errors to a TestCaseError, with an annotation. + fn fail_test_case( + note: &str, + ) -> impl Fn(E) -> TestCaseError + use<'_, E> { + move |err: E| TestCaseError::fail(format!("{note}: {}", err)) + } + + async fn test_start_range(body_chunks: Vec, start: u64) -> Result<(), TestCaseError> { + let full_body: Bytes = body_chunks + .iter() + .cloned() + .fold(BytesMut::new(), |mut acc, b| { + acc.extend(b); + acc + }) + .into(); + if start as usize >= full_body.len() { + return Err(TestCaseError::Reject("invalid start".into())); + } + + let (mut tx, rx) = StreamingBody::new(); + let cb = CollectingBody::new(rx.into(), None); + + let mut js = JoinSet::new(); + js.spawn(async move { + for chunk in body_chunks { + tx.send_chunk(chunk) + .await + .map_err(fail_test_case("error sending chunk"))?; + } + tx.finish() + .map_err(fail_test_case("error finishing write"))?; + Ok(()) + }); + js.spawn(async move { + let body = cb + .read_range(start, None) + .map_err(fail_test_case("error getting body"))?; + let got = body + .read_into_vec() + .await + .map_err(fail_test_case("error reading body"))?; + let want = &full_body[(start as usize)..]; + + prop_assert_eq!(&got, want, "mismatched body data"); + + Ok(()) + }); + + let _: Vec<_> = js.join_all().await.into_iter().collect::>()?; + Ok(()) + } + + async fn test_start_end_range( + body_chunks: Vec, + start: u64, + end: u64, + ) -> Result<(), TestCaseError> { + let full_body: Bytes = body_chunks + .iter() + .cloned() + .fold(BytesMut::new(), |mut acc, b| { + acc.extend(b); + acc + }) + .into(); + if start as usize >= full_body.len() { + return Err(TestCaseError::Reject("invalid start".into())); + } + + let (mut tx, rx) = StreamingBody::new(); + let cb = CollectingBody::new(rx.into(), None); + + let mut js = JoinSet::new(); + js.spawn(async move { + for chunk in body_chunks { + tx.send_chunk(chunk) + .await + .map_err(fail_test_case("error sending chunk"))?; + } + tx.finish() + .map_err(fail_test_case("error finishing write"))?; + Ok(()) + }); + js.spawn(async move { + let body = cb + .read_range(start, Some(end)) + .map_err(fail_test_case("error getting body"))?; + let got = body + .read_into_vec() + .await + .map_err(fail_test_case("error reading body"))?; + let want = &full_body[(start as usize)..(end as usize)]; + + prop_assert_eq!(&got, want, "mismatched body data"); + + Ok(()) + }); + + let _: Vec<_> = js.join_all().await.into_iter().collect::>()?; + Ok(()) + } + + proptest! { + #[test] + fn read_body_from_start( + (body, start) in some_chunks().prop_flat_map(|chunks| { + let len = chunks.iter().map(|v| v.len() as u64).sum(); + (Just(chunks), 0..len) + }), + ) { + let rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); + rt.block_on(test_start_range(body, start))?; + } + + } + + proptest! { + #[test] + fn read_body_start_end( + (body, start, end) in some_chunks().prop_flat_map(|chunks| { + let len = chunks.iter().map(|v| v.len() as u64).sum(); + (Just(chunks), 0..len, Just(len)) + }).prop_flat_map(|(chunks, start, len)| { + // Inclusive of start is fine: + let end = start..len; + (Just(chunks), Just(start), end) + })) { + let rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); + rt.block_on(test_start_end_range(body, start, end))?; + } + + } } diff --git a/lib/src/component/cache.rs b/lib/src/component/cache.rs index f3bdeca5..db086264 100644 --- a/lib/src/component/cache.rs +++ b/lib/src/component/cache.rs @@ -119,11 +119,16 @@ fn load_write_options( }) } +struct LookupOptions { + headers: HeaderMap, + always_use_requested_range: bool, +} + fn load_lookup_options( session: &Session, options_mask: api::LookupOptionsMask, options: api::LookupOptions, -) -> Result { +) -> Result { let headers = if options_mask.contains(api::LookupOptionsMask::REQUEST_HEADERS) { let handle = options.request_headers; let parts = session.request_parts(handle.into())?; @@ -133,10 +138,17 @@ fn load_lookup_options( }; let options_mask = options_mask & !api::LookupOptionsMask::REQUEST_HEADERS; + let always_use_requested_range = + options_mask.contains(api::LookupOptionsMask::ALWAYS_USE_REQUESTED_RANGE); + let options_mask = options_mask & !api::LookupOptionsMask::ALWAYS_USE_REQUESTED_RANGE; + if options_mask != api::LookupOptionsMask::empty() { return Err(Error::NotAvailable("unknown cache lookup option")); } - Ok(headers) + Ok(LookupOptions { + headers, + always_use_requested_range, + }) } #[async_trait::async_trait] @@ -147,14 +159,20 @@ impl api::Host for ComponentCtx { options_mask: api::LookupOptionsMask, options: api::LookupOptions, ) -> Result { - let headers = load_lookup_options(&self.session, options_mask, options)?; + let LookupOptions { + headers, + always_use_requested_range, + } = load_lookup_options(&self.session, options_mask, options)?; let key: CacheKey = get_key(key)?; let cache = Arc::clone(self.session.cache()); - let task = PeekableTask::spawn(Box::pin( - async move { Ok(cache.lookup(&key, &headers).await) }, - )) + let task = PeekableTask::spawn(Box::pin(async move { + Ok(cache + .lookup(&key, &headers) + .await + .with_always_use_requested_range(always_use_requested_range)) + })) .await; let task = PendingCacheTask::new(task); let handle: CacheHandle = self.session.insert_cache_op(task).into(); @@ -298,33 +316,44 @@ impl api::Host for ComponentCtx { async fn get_body( &mut self, handle: api::Handle, - options_mask: api::GetBodyOptionsMask, - _options: api::GetBodyOptions, + mut options_mask: api::GetBodyOptionsMask, + options: api::GetBodyOptions, ) -> Result { + let from = if options_mask.contains(api::GetBodyOptionsMask::FROM) { + Some(options.from) + } else { + None + }; + options_mask &= !api::GetBodyOptionsMask::FROM; + let to = if options_mask.contains(api::GetBodyOptionsMask::TO) { + Some(options.to) + } else { + None + }; + options_mask &= !api::GetBodyOptionsMask::TO; + if options_mask != api::GetBodyOptionsMask::empty() { return Err(Error::NotAvailable("unknown cache get_body option").into()); } // We wind up re-borrowing `found` and `self.session` several times here, to avoid - // borrowing the both of them at once. Ultimately it is possible that inserting a body - // would change the address of Found, by re-shuffling the AsyncItems table; once again, - // borrowck wins the day. + // borrowing the both of them at once. + // (It possible that inserting a body would change the address of Found, by re-shuffling + // the AsyncItems table; we have to live by borrowck's rules.) // // We have an exclusive borrow &mut self.session for the lifetime of this call, // so even though we're re-borrowing/repeating lookups, we know we won't run into TOCTOU. - let found = self - .session - .cache_entry(handle.into()) - .await? - .found() - .ok_or_else(|| Error::CacheError(cache::Error::Missing))?; + let entry = self.session.cache_entry(handle.into()).await?; // Preemptively (optimistically) start a read. Don't worry, the Drop impl for Body will // clean up the copying task. // We have to do this to allow `found`'s lifetime to end before self.session.body, which // has to re-borrow self.self.session. - let body = found.body()?; + let body = entry.body(from, to).await?; + let found = entry + .found() + .ok_or(Error::CacheError(crate::cache::Error::Missing))?; if let Some(prev_handle) = found.last_body_handle { // Check if they're still reading the previous handle. @@ -364,13 +393,19 @@ impl api::Host for ComponentCtx { options_mask: api::LookupOptionsMask, options: api::LookupOptions, ) -> Result { - let headers = load_lookup_options(&self.session, options_mask, options)?; + let LookupOptions { + headers, + always_use_requested_range, + } = load_lookup_options(&self.session, options_mask, options)?; let key: CacheKey = get_key(key)?; let cache = Arc::clone(self.session.cache()); // Look up once, joining the transaction only if obligated: - let e = cache.transaction_lookup(&key, &headers, false).await; + let e = cache + .transaction_lookup(&key, &headers, false) + .await + .with_always_use_requested_range(always_use_requested_range); let ready = e.found().is_some() || e.go_get().is_some(); // If we already got _something_, we can provide an already-complete PeekableTask. // Otherwise we need to spawn it and let it block in the background. @@ -378,7 +413,10 @@ impl api::Host for ComponentCtx { PeekableTask::complete(e) } else { PeekableTask::spawn(Box::pin(async move { - Ok(cache.transaction_lookup(&key, &headers, true).await) + Ok(cache + .transaction_lookup(&key, &headers, true) + .await + .with_always_use_requested_range(always_use_requested_range)) })) .await }; @@ -465,7 +503,7 @@ impl api::Host for ComponentCtx { // The path here is: // InvalidCacheHandle -> FastlyStatus::BADF -> (ABI boundary) -> // CacheError::InvalidOperation - entry.update(options)?; + entry.update(options).await?; Ok(()) } diff --git a/lib/src/wiggle_abi/cache.rs b/lib/src/wiggle_abi/cache.rs index fe4bcd73..1e0a83b4 100644 --- a/lib/src/wiggle_abi/cache.rs +++ b/lib/src/wiggle_abi/cache.rs @@ -134,12 +134,17 @@ fn load_write_options( }) } +struct LookupOptions { + headers: HeaderMap, + always_use_requested_range: bool, +} + fn load_lookup_options( session: &Session, memory: &wiggle::GuestMemory<'_>, mut options_mask: types::CacheLookupOptionsMask, options: wiggle::GuestPtr, -) -> Result { +) -> Result { let options = memory.read(options)?; let headers = if options_mask.contains(types::CacheLookupOptionsMask::REQUEST_HEADERS) { let handle = options.request_headers; @@ -158,11 +163,18 @@ fn load_lookup_options( }); } + let always_use_requested_range = + options_mask.contains(types::CacheLookupOptionsMask::ALWAYS_USE_REQUESTED_RANGE); + options_mask &= !types::CacheLookupOptionsMask::ALWAYS_USE_REQUESTED_RANGE; + if !options_mask.is_empty() { return Err(Error::NotAvailable("unknown cache lookup option")); } - Ok(headers) + Ok(LookupOptions { + headers, + always_use_requested_range, + }) } #[allow(unused_variables)] @@ -175,13 +187,19 @@ impl FastlyCache for Session { options_mask: types::CacheLookupOptionsMask, options: wiggle::GuestPtr, ) -> Result { - let headers = load_lookup_options(self, memory, options_mask, options)?; + let LookupOptions { + headers, + always_use_requested_range, + } = load_lookup_options(&self, memory, options_mask, options)?; let key = load_cache_key(memory, cache_key)?; let cache = Arc::clone(self.cache()); - let task = PeekableTask::spawn(Box::pin( - async move { Ok(cache.lookup(&key, &headers).await) }, - )) + let task = PeekableTask::spawn(Box::pin(async move { + Ok(cache + .lookup(&key, &headers) + .await + .with_always_use_requested_range(always_use_requested_range)) + })) .await; let task = PendingCacheTask::new(task); let handle = self.insert_cache_op(task); @@ -330,12 +348,18 @@ impl FastlyCache for Session { options_mask: types::CacheLookupOptionsMask, options: wiggle::GuestPtr, ) -> Result { - let headers = load_lookup_options(self, memory, options_mask, options)?; + let LookupOptions { + headers, + always_use_requested_range, + } = load_lookup_options(&self, memory, options_mask, options)?; let key = load_cache_key(memory, cache_key)?; let cache = Arc::clone(self.cache()); // Look up once, joining the transaction only if obligated: - let e = cache.transaction_lookup(&key, &headers, false).await; + let e = cache + .transaction_lookup(&key, &headers, false) + .await + .with_always_use_requested_range(always_use_requested_range); let ready = e.found().is_some() || e.go_get().is_some(); // If we already got _something_, we can provide an already-complete PeekableTask. // Otherwise we need to spawn it and let it block in the background. @@ -343,7 +367,10 @@ impl FastlyCache for Session { PeekableTask::complete(e) } else { PeekableTask::spawn(Box::pin(async move { - Ok(cache.transaction_lookup(&key, &headers, true).await) + Ok(cache + .transaction_lookup(&key, &headers, true) + .await + .with_always_use_requested_range(always_use_requested_range)) })) .await }; @@ -429,7 +456,7 @@ impl FastlyCache for Session { // The path here is: // InvalidCacheHandle -> FastlyStatus::BADF -> (ABI boundary) -> // CacheError::InvalidOperation - entry.update(options)?; + entry.update(options).await?; Ok(()) } @@ -529,31 +556,45 @@ impl FastlyCache for Session { &mut self, memory: &mut wiggle::GuestMemory<'_>, handle: types::CacheHandle, - options_mask: types::CacheGetBodyOptionsMask, + mut options_mask: types::CacheGetBodyOptionsMask, options: &types::CacheGetBodyOptions, ) -> Result { + let from = if options_mask.contains(types::CacheGetBodyOptionsMask::FROM) { + Some(options.from) + } else { + None + }; + options_mask &= !types::CacheGetBodyOptionsMask::FROM; + let to = if options_mask.contains(types::CacheGetBodyOptionsMask::TO) { + Some(options.to) + } else { + None + }; + options_mask &= !types::CacheGetBodyOptionsMask::TO; + if !options_mask.is_empty() { return Err(Error::NotAvailable("unknown cache get_body option").into()); } // We wind up re-borrowing `found` and `self.session` several times here, to avoid - // borrowing the both of them at once. Ultimately it is possible that inserting a body - // would change the address of Found, by re-shuffling the AsyncItems table; once again, - // borrowck wins the day. + // borrowing the both of them at once. + // (It possible that inserting a body would change the address of Found, by re-shuffling + // the AsyncItems table; we have to live by borrowck's rules.) // // We have an exclusive borrow &mut self.session for the lifetime of this call, // so even though we're re-borrowing/repeating lookups, we know we won't run into TOCTOU. - let found = self - .cache_entry(handle.into()) - .await? - .found() - .ok_or(Error::CacheError(crate::cache::Error::Missing))?; + let entry = self.cache_entry(handle.into()).await?; + // Preemptively (optimistically) start a read. Don't worry, the Drop impl for Body will // clean up the copying task. // We have to do this to allow `found`'s lifetime to end before self.session.body, which // has to re-borrow self.self.session. - let body = found.body()?; + let body = entry.body(from, to).await?; + + let found = entry + .found() + .ok_or(Error::CacheError(crate::cache::Error::Missing))?; if let Some(prev_handle) = found.last_body_handle { // Check if they're still reading the previous handle. diff --git a/lib/wit/deps/fastly/compute.wit b/lib/wit/deps/fastly/compute.wit index 3331d798..b59026e5 100644 --- a/lib/wit/deps/fastly/compute.wit +++ b/lib/wit/deps/fastly/compute.wit @@ -1036,6 +1036,8 @@ interface cache { flags lookup-options-mask { request-headers, + service-id, + always-use-requested-range, } /// Extensible options for cache lookup operations; currently used for both `lookup` and `transaction_lookup`. diff --git a/test-fixtures/Cargo.lock b/test-fixtures/Cargo.lock index 64a75c7d..df9246e4 100644 --- a/test-fixtures/Cargo.lock +++ b/test-fixtures/Cargo.lock @@ -145,9 +145,9 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "fastly" -version = "0.11.2" +version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52427cf825d145acbb8c30198df4df0d9ac34b3b727d006a42c648393bb5f73b" +checksum = "9506877e1713a00a6a676191c29cc449aa60129da123d81245b203a4f6734dd3" dependencies = [ "anyhow", "bytes", @@ -162,6 +162,7 @@ dependencies = [ "mime", "serde", "serde_json", + "serde_repr", "serde_urlencoded", "sha2 0.9.9", "smallvec", @@ -172,9 +173,9 @@ dependencies = [ [[package]] name = "fastly-macros" -version = "0.11.2" +version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62dbd089072fd0c81a60cc280fa791d47f1d638bef920f4249c90bc8f1c17ea9" +checksum = "3331e33178c193355093de80c370b3d65ed7a61631a38ad32bf84bedec6ca9e8" dependencies = [ "proc-macro2", "quote", @@ -183,9 +184,9 @@ dependencies = [ [[package]] name = "fastly-shared" -version = "0.11.2" +version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7472f43d72061834542e2f5dc11d3d2918fe7d06248c31decd893a614408d7c" +checksum = "80cbb4ef1d9c7f4321179814dd51b39f2a91f0993a7303396bb34581ae4f5daa" dependencies = [ "bitflags 1.3.2", "http", @@ -193,12 +194,14 @@ dependencies = [ [[package]] name = "fastly-sys" -version = "0.11.2" +version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b602c2dbe901ddf49e2c1a2fda9e335a15cda8d3bf442183f048bc4de25810e" +checksum = "583231411fcb7ba818ca7126b2e97038efc04d29950902b3cd11b33741303d80" dependencies = [ "bitflags 1.3.2", "fastly-shared", + "wasi", + "wit-bindgen-rt 0.42.1", ] [[package]] @@ -550,6 +553,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -766,7 +780,7 @@ version = "0.14.2+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" dependencies = [ - "wit-bindgen-rt", + "wit-bindgen-rt 0.39.0", ] [[package]] @@ -778,6 +792,15 @@ dependencies = [ "bitflags 2.9.0", ] +[[package]] +name = "wit-bindgen-rt" +version = "0.42.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "051105bab12bc78e161f8dfb3596e772dd6a01ebf9c4840988e00347e744966a" +dependencies = [ + "bitflags 2.9.0", +] + [[package]] name = "write16" version = "1.0.0" diff --git a/test-fixtures/Cargo.toml b/test-fixtures/Cargo.toml index b38fec3c..582fa596 100644 --- a/test-fixtures/Cargo.toml +++ b/test-fixtures/Cargo.toml @@ -11,9 +11,9 @@ publish = false anyhow = "1.0.86" base64 = "0.21.2" bitflags = "1.3.2" -fastly = "0.11.1" -fastly-shared = "0.11.1" -fastly-sys = "0.11.1" +fastly = "0.11.5" +fastly-shared = "0.11.5" +fastly-sys = "0.11.5" hex-literal = "0.4.1" bytes = "1.0.0" http = "1.1.0" diff --git a/test-fixtures/src/bin/cache.rs b/test-fixtures/src/bin/cache.rs index 356bca9e..b3790137 100644 --- a/test-fixtures/src/bin/cache.rs +++ b/test-fixtures/src/bin/cache.rs @@ -3,6 +3,7 @@ use bytes::Bytes; use fastly::cache::core::*; use fastly::http::{HeaderName, HeaderValue}; +use fastly::Body; use std::io::{Read, Write}; use std::time::Duration; use uuid::Uuid; @@ -37,6 +38,15 @@ fn main() { run_test!(test_length_from_body); run_test!(test_inconsistent_body_length); + run_test!(test_nonconcurrent_range); + run_test!(test_concurrent_range); + run_test!(test_concurrent_range_fixed); + run_test!(test_concurrent_range_known_length_fixed); + run_test!(test_transaction_range_fixed); + + run_test!(test_known_length_nonblocking); + run_test!(test_known_length_invalid_range); + run_test!(test_unknown_length_invalid_range); run_test!(test_user_metadata); run_test!(test_service_id); @@ -53,8 +63,7 @@ fn main() { run_test!(test_collapse_across_vary); run_test!(test_stream_back); - - run_test!(test_range_request_unsupported); + run_test!(test_stream_back_fixed); eprintln!("Completed all tests for version {service}") } @@ -91,7 +100,10 @@ fn ready_and_pending( /// /// This is an ugly hack. Sorry. fn poll_known_length(key: &CacheKey) -> Found { - loop { + const SLEEP: Duration = Duration::from_millis(101); + const TIMEOUT: Duration = Duration::from_secs(10); + let iters = TIMEOUT.as_millis() / SLEEP.as_millis(); + for _ in 0..iters { if let Some(v) = lookup(key.clone()) .execute() .expect("lookup should not generate error") @@ -100,8 +112,12 @@ fn poll_known_length(key: &CacheKey) -> Found { return v; } } - std::thread::sleep(Duration::from_millis(101)); + std::thread::sleep(SLEEP); } + panic!( + "did not arrive at known-length after {} seconds", + TIMEOUT.as_secs() + ); } fn test_non_concurrent() { @@ -601,6 +617,296 @@ fn test_inconsistent_body_length() { } } +fn test_nonconcurrent_range() { + // When we do a full body, then a range request, we get only the ranged bits. + let key = new_key(); + + { + let fetch = lookup(key.clone()) + .execute() + .expect("failed initial lookup"); + assert!(fetch.is_none()); + } + + let body = "hello beautiful world".as_bytes(); + { + let mut writer = insert(key.clone(), Duration::from_secs(117)) + .execute() + .unwrap(); + writer.write_all(body).unwrap(); + writer.finish().unwrap(); + } + { + let got = poll_known_length(&key); + let got = got + .to_stream_from_range(Some(6), Some(14)) + .unwrap() + .into_bytes(); + assert_eq!(&got, b"beautiful"); + } + + { + let got = lookup(key.clone()).execute().unwrap().unwrap(); + let got = got + .to_stream_from_range(Some(6), None) + .unwrap() + .into_bytes(); + assert_eq!(&got, b"beautiful world"); + } + + // to_stream_from_range(None, Some(x)) gets the _last_ (x) bytes + { + let got = lookup(key.clone()).execute().unwrap().unwrap(); + let got = got + .to_stream_from_range(None, Some(4)) + .unwrap() + .into_bytes(); + assert_eq!(std::str::from_utf8(&got).unwrap(), "orld"); + } +} + +fn test_concurrent_range() { + // When we stream a body, we get different results based on whether we provided a known length. + let key_unknown_length = new_key(); + let key_known_length = new_key(); + + let body = "hello beautiful world".as_bytes(); + let mut writer_unknown_length = insert(key_unknown_length.clone(), Duration::from_secs(118)) + .execute() + .unwrap(); + let mut writer_known_length = insert(key_known_length.clone(), Duration::from_secs(119)) + .known_length(body.len() as u64) + .execute() + .unwrap(); + + let read_unknown_length = lookup(key_unknown_length).execute().unwrap().unwrap(); + let read_known_length = lookup(key_known_length).execute().unwrap().unwrap(); + + // These capture the state when no body has been streamed at all, so the behavior should be + // deterministic. + let body_unknown_length = read_unknown_length + .to_stream_from_range(Some(6), Some(14)) + .unwrap(); + let body_known_length = read_known_length + .to_stream_from_range(Some(6), Some(14)) + .unwrap(); + + writer_unknown_length.write(body).unwrap(); + writer_unknown_length.finish().unwrap(); + writer_known_length.write(body).unwrap(); + writer_known_length.finish().unwrap(); + + let body_unknown_length = body_unknown_length.into_bytes(); + let body_known_length = body_known_length.into_bytes(); + + assert_eq!(&body_unknown_length, body); + assert_eq!(&body_known_length, b"beautiful"); +} + +fn test_concurrent_range_fixed() { + // Test the "always use requested range" behavior. + let key_unknown_length = new_key(); + + let body = "hello beautiful world"; + let mut writer_unknown_length = insert(key_unknown_length.clone(), Duration::from_secs(120)) + .execute() + .unwrap(); + + // The to_stream_from_range() call below will block until the first byte of the range is available. + // Make it available. + writer_unknown_length.write(&body.as_bytes()[0..7]).unwrap(); + writer_unknown_length.flush().unwrap(); + + // Then ask for a streaming body, with just part of the range. + let read_unknown_length = lookup(key_unknown_length) + .always_use_requested_range() + .execute() + .unwrap() + .unwrap(); + let body_unknown_length = read_unknown_length + .to_stream_from_range(Some(6), Some(14)) + .unwrap(); + + // Finish writing: + writer_unknown_length.write(&body.as_bytes()[7..]).unwrap(); + writer_unknown_length.finish().unwrap(); + + let body_unknown_length = body_unknown_length.into_string(); + + assert_eq!(&body_unknown_length, &body[6..=14]); +} + +fn test_concurrent_range_known_length_fixed() { + // When we stream a body, we get different results based on whether we provided a known length. + let key_known_length = new_key(); + + let body = "hello beautiful world".as_bytes(); + let mut writer_known_length = insert(key_known_length.clone(), Duration::from_secs(121)) + .known_length(body.len() as u64) + .execute() + .unwrap(); + + let read_known_length = lookup(key_known_length) + .always_use_requested_range() + .execute() + .unwrap() + .unwrap(); + + let body_known_length = read_known_length + .to_stream_from_range(Some(6), Some(14)) + .unwrap(); + + writer_known_length.write(body).unwrap(); + writer_known_length.finish().unwrap(); + + let body_known_length = body_known_length.into_bytes(); + + assert_eq!(&body_known_length, b"beautiful"); +} + +fn test_transaction_range_fixed() { + let key = new_key(); + + let body = "hello beautiful world"; + let tx1 = Transaction::lookup(key.clone()) + .always_use_requested_range() + .execute() + .unwrap(); + let mut writer = tx1.insert(Duration::from_secs(122)).execute().unwrap(); + writer.write(&body.as_bytes()[..7]).unwrap(); + writer.flush().unwrap(); + + // Next transaction should be ready immediately, because the write has started. + let tx2 = Transaction::lookup(key.clone()) + .always_use_requested_range() + .execute_async() + .unwrap() + .wait() + .unwrap(); + let found = tx2.found().expect("write has started, should be ready"); + let body2 = found.to_stream_from_range(Some(6), Some(14)).unwrap(); + + // Finish the write: + writer.write(&body.as_bytes()[7..]).unwrap(); + writer.finish().unwrap(); + + let got = body2.into_string(); + assert_eq!(&got, &body[6..=14]); +} + +fn test_known_length_nonblocking() { + let key = new_key(); + + let source = "hello beautiful world".as_bytes(); + let mut writer = insert(key.clone(), Duration::from_secs(10)) + .known_length(source.len() as u64) + .execute() + .unwrap(); + + // Try to read, even though we don't have these bytes yet: + let read = lookup(key.clone()) + .always_use_requested_range() + .execute() + .unwrap() + .unwrap(); + let body = read.to_stream_from_range(Some(6), Some(14)).unwrap(); + + // Finish the write, so we can convert into bytes: + writer.write(source).unwrap(); + writer.finish().unwrap(); + + let got = body.into_bytes(); + assert_eq!(&got, b"beautiful"); +} + +fn test_known_length_invalid_range() { + let key = new_key(); + + let source = "hello beautiful world".as_bytes(); + let mut writer = insert(key.clone(), Duration::from_secs(10)) + .known_length(source.len() as u64) + .execute() + .unwrap(); + writer.write(source).unwrap(); + writer.flush().unwrap(); + + // Perform a lookup and get a body for the given range. + // We have to do a separate lookup for each because we can only read the body from each Found + // once. + let get_body = |from: Option, to: Option| -> Result { + let read = lookup(key.clone()) + .always_use_requested_range() + .execute() + .unwrap() + .unwrap(); + read.to_stream_from_range(from, to) + }; + + let after_end = source.len() as u64; + + // Known-invalid: the indices are backwards. + let _got = get_body(Some(6), Some(4)).unwrap_err(); + + let mut bodies = Vec::new(); + // Each of these is invalid in some way; so, they should always return the full body, + // because the length of the cached item is known. + bodies.push(get_body(Some(after_end), None).unwrap()); + bodies.push(get_body(Some(4), Some(after_end)).unwrap()); + bodies.push(get_body(None, Some(after_end)).unwrap()); + bodies.push(get_body(None, Some(0)).unwrap()); + + // Finish the write, so we can convert into bodies: + writer.finish().unwrap(); + + for body in bodies { + let got = body.into_bytes(); + assert_eq!(&got, source); + } +} + +fn test_unknown_length_invalid_range() { + let key = new_key(); + + let body = "hello beautiful world".as_bytes(); + let mut writer = insert(key.clone(), Duration::from_secs(10)) + .execute() + .unwrap(); + writer.write(body).unwrap(); + writer.flush().unwrap(); + + { + let read = lookup(key.clone()) + .always_use_requested_range() + .execute() + .unwrap() + .unwrap(); + + let after_end = (body.len() + 1) as u64; + + let got = read.to_stream_from_range(Some(6), Some(4)).unwrap_err(); + assert!(matches!(got, CacheError::Other(_))); + + // TODO: This will block until "finish", so we can't do anything with it here. + //let got = read + // .to_stream_from_range(Some(after_end), None) + // .unwrap() + // .into_bytes(); + //assert_eq!(got.len(), body.len()); + + // We also only know about this after "finish", so the body should be ready immediately: + let mut got = read.to_stream_from_range(Some(4), Some(after_end)).unwrap(); + let mut available = [0u8]; + // We should be able to read one byte... + got.read_exact(&mut available).unwrap(); + // Finish the write to force an error: + writer.finish().unwrap(); + + // Can't read past the end: + let mut v = Vec::new(); + got.read_to_end(&mut v).unwrap_err(); + } +} + fn test_stale_while_revalidate() { let key = new_key(); { @@ -633,12 +939,6 @@ fn test_stale_while_revalidate() { ); } - // One of these should get the obligation: - eprintln!("1 must_insert_or_update: {}", txn1.must_insert_or_update()); - eprintln!("2 must_insert_or_update: {}", txn2.must_insert_or_update()); - eprintln!("1 must_insert: {}", txn1.must_insert()); - eprintln!("2 must_insert: {}", txn2.must_insert()); - // Update without modifying the body: txn1.update(Duration::from_secs(100)) .user_metadata(Bytes::from_static(b"version 2")) @@ -920,25 +1220,37 @@ fn test_stream_back() { assert_eq!(&got, &body); } -fn test_range_request_unsupported() { +fn test_stream_back_fixed() { + // Test the "always use requested range" behavior. let key = new_key(); - let body = "abc123def".as_bytes(); - { - let mut writer = insert(key.clone(), Duration::from_secs(10)) - .known_length(body.len() as u64) - .execute() - .unwrap(); - writer.write_all(body).unwrap(); - writer.finish().unwrap(); - } + let body = "hello beautiful world"; - let fetch = lookup(key.clone()).execute().unwrap(); - let Some(got) = fetch else { - panic!("did not fetch from cache") - }; - let got = got.to_stream_from_range(Some(3), Some(5)); - if !got.is_err() { - panic!("range requests are not yet supported in Viceroy"); - } + let tx = Transaction::lookup(key.clone()) + .always_use_requested_range() + .execute() + .unwrap(); + assert!(tx.found().is_none()); + assert!(tx.must_insert_or_update()); + + let (mut writer, found) = tx + .insert(Duration::from_secs(124)) + .execute_and_stream_back() + .unwrap(); + + // The to_stream_from_range() call below will block until the first byte of the range is available. + // Make it available. + writer.write(&body.as_bytes()[0..7]).unwrap(); + writer.flush().unwrap(); + + // Then ask for a streaming body, with just part of the range. + let got = found.to_stream_from_range(Some(6), Some(14)).unwrap(); + + // Finish writing: + writer.write(&body.as_bytes()[7..]).unwrap(); + writer.finish().unwrap(); + + let got = got.into_string(); + + assert_eq!(&got, &body[6..=14]); }