diff --git a/http-cache-reqwest/Cargo.toml b/http-cache-reqwest/Cargo.toml index 3fbfd96..08b078c 100644 --- a/http-cache-reqwest/Cargo.toml +++ b/http-cache-reqwest/Cargo.toml @@ -20,7 +20,7 @@ anyhow = "1.0.95" async-trait = "0.1.85" http = "1.2.0" http-cache-semantics = "2.1.0" -reqwest = { version = "0.12.12", default-features = false } +reqwest = { version = "0.12.12", default-features = false, features = ["stream"] } reqwest-middleware = "0.4.0" serde = { version = "1.0.217", features = ["derive"] } url = { version = "2.5.4", features = ["serde"] } diff --git a/http-cache-reqwest/src/lib.rs b/http-cache-reqwest/src/lib.rs index 5e8ccb2..dccd66a 100644 --- a/http-cache-reqwest/src/lib.rs +++ b/http-cache-reqwest/src/lib.rs @@ -73,8 +73,8 @@ use reqwest_middleware::{Error, Next}; use url::Url; pub use http_cache::{ - CacheManager, CacheMode, CacheOptions, HttpCache, HttpCacheOptions, - HttpResponse, + Body, CacheManager, CacheMode, CacheOptions, HttpCache, HttpCacheOptions, + HttpResponse, Parts as HttpParts, }; #[cfg(feature = "manager-cacache")] @@ -169,29 +169,25 @@ impl Middleware for ReqwestMiddleware<'_> { let url = res.url().clone(); let status = res.status().into(); let version = res.version(); - let body: Vec = match res.bytes().await { - Ok(b) => b, - Err(e) => return Err(Box::new(e)), - } - .to_vec(); - Ok(HttpResponse { - body, - headers, - status, - url, - version: version.try_into()?, - }) + let parts = + HttpParts { headers, status, url, version: version.try_into()? }; + Ok(HttpResponse::from_parts( + parts, + Body::wrap_stream(res.bytes_stream()), + )) } } // Converts an [`HttpResponse`] to a reqwest [`Response`] fn convert_response(response: HttpResponse) -> anyhow::Result { + let (parts, body) = response.into_parts(); + let body = reqwest::Body::wrap_stream(body.into_data_stream()); let mut ret_res = http::Response::builder() - .status(response.status) - .url(response.url) - .version(response.version.into()) - .body(response.body)?; - for header in response.headers { + .status(parts.status) + .url(parts.url) + .version(parts.version.into()) + .body(body)?; + for header in parts.headers { ret_res.headers_mut().insert( HeaderName::from_str(header.0.clone().as_str())?, HeaderValue::from_str(header.1.clone().as_str())?, diff --git a/http-cache/Cargo.toml b/http-cache/Cargo.toml index 7e0aff9..db446c9 100644 --- a/http-cache/Cargo.toml +++ b/http-cache/Cargo.toml @@ -18,25 +18,33 @@ rust-version = "1.71.1" [dependencies] async-trait = "0.1.85" bincode = { version = "1.3.3", optional = true } +bytes = "1.10.1" cacache = { version = "13.1.0", default-features = false, features = ["mmap"], optional = true } +futures = "0.3.31" +futures-util = "0.3.31" http = "1.2.0" +http-body = "1.0.1" +http-body-util = "0.1.3" http-cache-semantics = "2.1.0" http-types = { version = "2.12.0", default-features = false, optional = true } httpdate = "1.0.3" moka = { version = "0.12.10", features = ["future"], optional = true } serde = { version = "1.0.217", features = ["derive"] } +tokio = { version = "1", default-features = false, features = ["io-util"], optional = true } +tokio-util = { version = "0.7.14", features = ["io"], optional = true } url = { version = "2.5.4", features = ["serde"] } [dev-dependencies] async-attributes = "1.1.2" async-std = { version = "1.13.0" } http-cache-semantics = "2.1.0" +tempfile = "3.19.1" tokio = { version = "1.43.0", features = [ "macros", "rt", "rt-multi-thread" ] } [features] default = ["manager-cacache", "cacache-async-std"] manager-cacache = ["cacache", "bincode"] -cacache-tokio = ["cacache/tokio-runtime"] +cacache-tokio = ["cacache/tokio-runtime", "tokio", "tokio-util"] cacache-async-std = ["cacache/async-std"] manager-moka = ["moka", "bincode"] with-http-types = ["http-types"] diff --git a/http-cache/src/lib.rs b/http-cache/src/lib.rs index fcb17cf..125175b 100644 --- a/http-cache/src/lib.rs +++ b/http-cache/src/lib.rs @@ -40,7 +40,11 @@ use std::{ time::SystemTime, }; +use bytes::{BufMut, Bytes}; +use futures::StreamExt; use http::{header::CACHE_CONTROL, request, response, StatusCode}; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyDataStream, BodyExt, Full}; use http_cache_semantics::{AfterResponse, BeforeRequest, CachePolicy}; use serde::{Deserialize, Serialize}; use url::Url; @@ -117,10 +121,99 @@ impl fmt::Display for HttpVersion { } /// A basic generic type that represents an HTTP response -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug)] pub struct HttpResponse { /// HTTP response body - pub body: Vec, + body: Body, + /// HTTP response parts + parts: Parts, +} + +/// HTTP response body. +#[derive(Debug)] +pub struct Body { + inner: BodyInner, +} + +#[derive(Debug)] +enum BodyInner { + Full(Bytes), + Streaming(BoxBody), +} + +impl Body { + /// wrap stream + pub fn wrap_stream(stream: S) -> Body + where + S: futures::stream::TryStream + Send + Sync + 'static, + S::Error: Into>, + Bytes: From, + { + use futures_util::TryStreamExt; + use http_body::Frame; + use http_body_util::StreamBody; + + let body = BoxBody::new(StreamBody::new( + stream.map_ok(|d| Frame::data(Bytes::from(d))).map_err(Into::into), + )); + Body { inner: BodyInner::Streaming(body) } + } + + /// Get body bytes if body is full. + pub fn as_bytes(&self) -> Option<&[u8]> { + match &self.inner { + BodyInner::Full(bytes) => Some(bytes), + BodyInner::Streaming(_) => None, + } + } + + /// Get all bytes of the response, collecting data stream if some. + pub async fn bytes(self) -> Result { + Ok(match self.inner { + BodyInner::Full(bytes) => bytes, + BodyInner::Streaming(boxed_body) => boxed_body + .into_data_stream() + .collect::>>() + .await + .into_iter() + .collect::>>()? + .into_iter() + .fold(bytes::BytesMut::new(), |mut acc, chunk| { + acc.put(chunk); + acc + }) + .freeze(), + }) + } + + /// Into data stream + pub fn into_data_stream(self) -> BodyDataStream> { + match self.inner { + BodyInner::Full(data) => { + Full::new(data).map_err(Into::into).boxed().into_data_stream() + } + BodyInner::Streaming(boxed_body) => boxed_body.into_data_stream(), + } + } +} + +impl From> for Body { + fn from(value: Vec) -> Self { + Self { inner: BodyInner::Full(value.into()) } + } +} + +impl From for Body { + fn from(value: Bytes) -> Self { + Self { inner: BodyInner::Full(value) } + } +} + +/// HTTP response parts consists of status, version, response URL and headers. +/// +/// Serializable alternative to [`http::response::Parts`]. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Parts { /// HTTP response headers pub headers: HashMap, /// HTTP response status code @@ -132,13 +225,23 @@ pub struct HttpResponse { } impl HttpResponse { + /// Consumes the response returning the head and body parts. + pub fn into_parts(self) -> (Parts, Body) { + (self.parts, self.body) + } + + /// Creates a new Response with the given head and body. + pub fn from_parts(parts: Parts, body: Body) -> Self { + Self { body, parts } + } + /// Returns `http::response::Parts` pub fn parts(&self) -> Result { let mut converted = - response::Builder::new().status(self.status).body(())?; + response::Builder::new().status(self.parts.status).body(())?; { let headers = converted.headers_mut(); - for header in &self.headers { + for header in &self.parts.headers { headers.insert( http::header::HeaderName::from_str(header.0.as_str())?, http::HeaderValue::from_str(header.1.as_str())?, @@ -151,7 +254,7 @@ impl HttpResponse { /// Returns the status code of the warning header if present #[must_use] pub fn warning_code(&self) -> Option { - self.headers.get("warning").and_then(|hdr| { + self.parts.headers.get("warning").and_then(|hdr| { hdr.as_str().chars().take(3).collect::().parse().ok() }) } @@ -167,7 +270,7 @@ impl HttpResponse { // warn-text = quoted-string // warn-date = <"> HTTP-date <"> // (https://tools.ietf.org/html/rfc2616#section-14.46) - self.headers.insert( + self.parts.headers.insert( "warning".to_string(), format!( "{} {} {:?} \"{}\"", @@ -181,13 +284,13 @@ impl HttpResponse { /// Removes a warning header from a response pub fn remove_warning(&mut self) { - self.headers.remove("warning"); + self.parts.headers.remove("warning"); } /// Update the headers from `http::response::Parts` pub fn update_headers(&mut self, parts: &response::Parts) -> Result<()> { for header in parts.headers.iter() { - self.headers.insert( + self.parts.headers.insert( header.0.as_str().to_string(), header.1.to_str()?.to_string(), ); @@ -198,23 +301,27 @@ impl HttpResponse { /// Checks if the Cache-Control header contains the must-revalidate directive #[must_use] pub fn must_revalidate(&self) -> bool { - self.headers.get(CACHE_CONTROL.as_str()).is_some_and(|val| { + self.parts.headers.get(CACHE_CONTROL.as_str()).is_some_and(|val| { val.as_str().to_lowercase().contains("must-revalidate") }) } /// Adds the custom `x-cache` header to the response pub fn cache_status(&mut self, hit_or_miss: HitOrMiss) { - self.headers.insert(XCACHE.to_string(), hit_or_miss.to_string()); + self.parts.headers.insert(XCACHE.to_string(), hit_or_miss.to_string()); } /// Adds the custom `x-cache-lookup` header to the response pub fn cache_lookup_status(&mut self, hit_or_miss: HitOrMiss) { - self.headers.insert(XCACHELOOKUP.to_string(), hit_or_miss.to_string()); + self.parts + .headers + .insert(XCACHELOOKUP.to_string(), hit_or_miss.to_string()); } } /// A trait providing methods for storing, reading, and removing cache records. +/// +/// Generic argument `R` defines the type of HTTP response body which may be put into cache. #[async_trait::async_trait] pub trait CacheManager: Send + Sync + 'static { /// Attempts to pull a cached response and related policy from cache. @@ -555,7 +662,7 @@ impl HttpCache { // the rest of the network for a period of time. // (https://tools.ietf.org/html/rfc2616#section-14.46) res.add_warning( - &res.url.clone(), + &res.parts.url.clone(), 112, "Disconnected operation", ); @@ -571,11 +678,13 @@ impl HttpCache { CacheMode::OnlyIfCached => { // ENOTCACHED let mut res = HttpResponse { - body: b"GatewayTimeout".to_vec(), - headers: HashMap::default(), - status: 504, - url: middleware.url()?, - version: HttpVersion::Http11, + body: b"GatewayTimeout".to_vec().into(), + parts: Parts { + headers: HashMap::default(), + status: 504, + url: middleware.url()?, + version: HttpVersion::Http11, + }, }; if self.options.cache_status_headers { res.cache_status(HitOrMiss::MISS); @@ -615,9 +724,9 @@ impl HttpCache { let mode = self.cache_mode(middleware)?; let mut is_cacheable = is_get_head && mode != CacheMode::NoStore - && res.status == 200 + && res.parts.status == 200 && policy.is_storable(); - if mode == CacheMode::IgnoreRules && res.status == 200 { + if mode == CacheMode::IgnoreRules && res.parts.status == 200 { is_cacheable = true; } if is_cacheable { @@ -670,7 +779,7 @@ impl HttpCache { let req_url = middleware.url()?; match middleware.remote_fetch().await { Ok(mut cond_res) => { - let status = StatusCode::from_u16(cond_res.status)?; + let status = StatusCode::from_u16(cond_res.parts.status)?; if status.is_server_error() && cached_res.must_revalidate() { // 111 Revalidation failed // MUST be included if a cache returns a stale response @@ -686,7 +795,7 @@ impl HttpCache { cached_res.cache_status(HitOrMiss::HIT); } Ok(cached_res) - } else if cond_res.status == 304 { + } else if cond_res.parts.status == 304 { let after_res = policy.after_response( &middleware.parts()?, &cond_res.parts()?, @@ -713,7 +822,7 @@ impl HttpCache { ) .await?; Ok(res) - } else if cond_res.status == 200 { + } else if cond_res.parts.status == 200 { let policy = match self.options.cache_options { Some(options) => middleware .policy_with_options(&cond_res, options)?, diff --git a/http-cache/src/managers/cacache.rs b/http-cache/src/managers/cacache.rs index ea2416d..b542e1f 100644 --- a/http-cache/src/managers/cacache.rs +++ b/http-cache/src/managers/cacache.rs @@ -1,10 +1,21 @@ use std::path::PathBuf; +use std::result::Result as StdResult; -use crate::{CacheManager, HttpResponse, Result}; +use crate::{Body, CacheManager, HttpResponse, Parts, Result}; +use bytes::Bytes; +use cacache::{Reader, Writer}; +use futures::{Stream, StreamExt}; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, StreamBody}; use http_cache_semantics::CachePolicy; use serde::{Deserialize, Serialize}; +#[cfg(feature = "cacache-async-std")] +use futures::{AsyncReadExt, AsyncWriteExt}; +#[cfg(feature = "cacache-tokio")] +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + /// Implements [`CacheManager`] with [`cacache`](https://github.com/zkat/cacache-rs) as the backend. #[cfg_attr(docsrs, doc(cfg(feature = "manager-cacache")))] #[derive(Debug, Clone)] @@ -19,10 +30,21 @@ impl Default for CACacheManager { } } +// Cache binary value layout: +// [u32 - size of the NoBodyStore][Store][response body bytes] +// bincode works with pre-defined slice of bytes, so we need this u32 in front. + +#[derive(Debug, Deserialize, Serialize)] +enum BodyKind { + Full, + Streaming, +} + #[derive(Debug, Deserialize, Serialize)] struct Store { - response: HttpResponse, + parts: Parts, policy: CachePolicy, + body_kind: BodyKind, } #[allow(dead_code)] @@ -34,19 +56,123 @@ impl CACacheManager { } } +#[cfg(feature = "cacache-async-std")] +mod cacache_stream { + use super::*; + use futures::AsyncRead; + use std::pin::Pin; + use std::task::Context; + use std::task::Poll; + + // Some custom dummy implementation of Stream for cacache Reader. + + const BUFSIZE: usize = 4096; + + pub struct CACacheReaderStream { + reader: Reader, + buf: [u8; BUFSIZE], + } + + impl CACacheReaderStream { + pub fn new(reader: Reader) -> Self { + Self { reader, buf: [0; BUFSIZE] } + } + } + + impl Stream for CACacheReaderStream { + type Item = StdResult, std::io::Error>; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let Self { reader, buf } = &mut *self; + let reader = Pin::new(reader); + match reader.poll_read(cx, buf) { + Poll::Ready(Ok(0)) => Poll::Ready(None), + Poll::Ready(Ok(n)) => { + let bytes = Bytes::from(buf[..n].to_vec()); + let frame = http_body::Frame::data(bytes); + Poll::Ready(Some(Ok(frame))) + } + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))), + Poll::Pending => Poll::Pending, + } + } + } +} + +#[cfg(feature = "cacache-tokio")] +mod cacache_stream { + use super::*; + use tokio_util::io::ReaderStream; + + pub struct CACacheReaderStream { + inner: ReaderStream, + } + + impl CACacheReaderStream { + pub fn new(reader: Reader) -> Self { + Self { inner: ReaderStream::new(reader) } + } + } + + impl Stream for CACacheReaderStream { + type Item = StdResult, std::io::Error>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + as Stream>::poll_next( + std::pin::Pin::new(&mut self.inner), + cx, + ) + .map(|opt| opt.map(|res| res.map(http_body::Frame::data))) + } + } +} + +use cacache_stream::CACacheReaderStream; + #[async_trait::async_trait] impl CacheManager for CACacheManager { async fn get( &self, cache_key: &str, ) -> Result> { - let store: Store = match cacache::read(&self.path, cache_key).await { - Ok(d) => bincode::deserialize(&d)?, - Err(_e) => { - return Ok(None); + let mut reader = match Reader::open(&self.path, cache_key).await { + Ok(reader) => reader, + Err(err) => match err { + cacache::Error::EntryNotFound(..) => return Ok(None), + _ => return Err(err.into()), + }, + }; + + // Reading "head" part length + let mut buf = [0u8; 4]; + reader.read_exact(&mut buf).await?; + let store_len = u32::from_le_bytes(buf); + + // Reading "head" part + let mut buf = vec![0; store_len as usize]; + reader.read_exact(buf.as_mut_slice()).await?; + let store: Store = bincode::deserialize(&buf)?; + + let body = match store.body_kind { + BodyKind::Full => { + let mut body = Vec::new(); + reader.read_to_end(&mut body).await?; + Body { inner: crate::BodyInner::Full(body.into()) } } + BodyKind::Streaming => Body { + inner: crate::BodyInner::Streaming(BoxBody::new( + StreamBody::new(CACacheReaderStream::new(reader)) + .map_err(Into::into), + )), + }, }; - Ok(Some((store.response, store.policy))) + Ok(Some((HttpResponse::from_parts(store.parts, body), store.policy))) } async fn put( @@ -55,9 +181,42 @@ impl CacheManager for CACacheManager { response: HttpResponse, policy: CachePolicy, ) -> Result { - let data = Store { response: response.clone(), policy }; + let mut writer = Writer::create(&self.path, &cache_key).await?; + let (parts, body) = response.into_parts(); + let body_kind = match &body.inner { + crate::BodyInner::Full(_) => BodyKind::Full, + crate::BodyInner::Streaming(_) => BodyKind::Streaming, + }; + let data = Store { parts, policy, body_kind }; let bytes = bincode::serialize(&data)?; - cacache::write(&self.path, cache_key, bytes).await?; + let store_len = (bytes.len() as u32).to_le_bytes(); + + // Writing "head" part length + writer.write_all(&store_len).await?; + + // Writing "head" part + writer.write_all(&bytes).await?; + + // Writing body itself + match body.inner { + crate::BodyInner::Full(data) => { + writer.write_all(&data).await?; + } + crate::BodyInner::Streaming(box_body) => { + let mut stream = box_body.into_data_stream(); + while let Some(chunk_result) = stream.next().await { + let chunk = chunk_result?; + writer.write_all(&chunk).await?; + } + } + } + writer.commit().await?; + + // Safety: at this point we successfully created this cache entry, + // so it is safe to unwrap here (cacache::Error::EntryNotFound should be impossible). + // FIXME: does it make sense to return error here instead of unwrapping? If yes, then which error? + let (response, _) = self.get(&cache_key).await?.unwrap(); + Ok(response) } diff --git a/http-cache/src/test.rs b/http-cache/src/test.rs index af9a2c1..d330df6 100644 --- a/http-cache/src/test.rs +++ b/http-cache/src/test.rs @@ -1,6 +1,6 @@ use crate::{ error, CacheMode, HitOrMiss, HttpCacheOptions, HttpResponse, HttpVersion, - Result, + Parts, Result, }; use http::{header::CACHE_CONTROL, StatusCode}; use http_cache_semantics::CacheOptions; @@ -66,13 +66,15 @@ fn test_errors() -> Result<()> { fn response_methods_work() -> Result<()> { let url = Url::from_str("http://example.com")?; let mut res = HttpResponse { - body: TEST_BODY.to_vec(), - headers: HashMap::default(), - status: 200, - url: url.clone(), - version: HttpVersion::Http11, + body: TEST_BODY.to_vec().into(), + parts: Parts { + headers: HashMap::default(), + status: 200, + url: url.clone(), + version: HttpVersion::Http11, + }, }; - assert_eq!(format!("{:?}", res.clone()), "HttpResponse { body: [116, 101, 115, 116], headers: {}, status: 200, url: Url { scheme: \"http\", cannot_be_a_base: false, username: \"\", password: None, host: Some(Domain(\"example.com\")), port: None, path: \"/\", query: None, fragment: None }, version: Http11 }"); + assert_eq!(format!("{:?}", res), "HttpResponse { body: Body { inner: Full(b\"test\") }, parts: Parts { headers: {}, status: 200, url: Url { scheme: \"http\", cannot_be_a_base: false, username: \"\", password: None, host: Some(Domain(\"example.com\")), port: None, path: \"/\", query: None, fragment: None }, version: Http11 } }"); res.add_warning(&url, 112, "Test Warning"); let code = res.warning_code(); assert!(code.is_some()); @@ -89,7 +91,7 @@ fn response_methods_work() -> Result<()> { res.update_headers(&parts)?; assert!(res.must_revalidate()); assert_eq!(res.parts()?.headers, cloned_headers); - res.headers.remove(CACHE_CONTROL.as_str()); + res.parts.headers.remove(CACHE_CONTROL.as_str()); assert!(!res.must_revalidate()); Ok(()) } @@ -176,7 +178,7 @@ mod with_http_types { mod with_cacache { use super::*; - use crate::{CACacheManager, CacheManager}; + use crate::{Body, CACacheManager, CacheManager, Parts}; use http_cache_semantics::CachePolicy; @@ -193,8 +195,7 @@ mod with_cacache { &format!("{:?}", manager), "CACacheManager { path: \"./http-cacache-test\" }" ); - let http_res = HttpResponse { - body: TEST_BODY.to_vec(), + let parts = Parts { headers: Default::default(), status: 200, url: url.clone(), @@ -205,26 +206,72 @@ mod with_cacache { http::Response::builder().status(200).body(TEST_BODY.to_vec())?; let policy = CachePolicy::new(&req, &res); manager - .put(format!("{}:{}", GET, &url), http_res.clone(), policy.clone()) + .put( + format!("{}:{}", GET, &url), + HttpResponse::from_parts( + parts.clone(), + TEST_BODY.to_vec().into(), + ), + policy.clone(), + ) .await?; let data = manager.get(&format!("{}:{}", GET, &url)).await?; assert!(data.is_some()); - assert_eq!(data.unwrap().0.body, TEST_BODY); + assert_eq!(data.unwrap().0.body.as_bytes().unwrap(), TEST_BODY); let clone = manager.clone(); let clonedata = clone.get(&format!("{}:{}", GET, &url)).await?; assert!(clonedata.is_some()); - assert_eq!(clonedata.unwrap().0.body, TEST_BODY); + assert_eq!(clonedata.unwrap().0.body.as_bytes().unwrap(), TEST_BODY); manager.delete(&format!("{}:{}", GET, &url)).await?; let data = manager.get(&format!("{}:{}", GET, &url)).await?; assert!(data.is_none()); - manager.put(format!("{}:{}", GET, &url), http_res, policy).await?; + manager + .put( + format!("{}:{}", GET, &url), + HttpResponse::from_parts(parts, TEST_BODY.to_vec().into()), + policy, + ) + .await?; manager.clear().await?; let data = manager.get(&format!("{}:{}", GET, &url)).await?; assert!(data.is_none()); std::fs::remove_dir_all("./http-cacache-test")?; Ok(()) } + + #[async_test] + async fn cacache_streaming() -> Result<()> { + let test_body_input: Vec> = + vec![Ok(&TEST_BODY[..2]), Ok(&TEST_BODY[2..])]; + let test_body = futures_util::stream::iter(test_body_input); + + let url = Url::parse("http://example.com")?; + let tmp = tempfile::tempdir().unwrap(); + let manager = CACacheManager { path: tmp.path().to_path_buf() }; + let parts = Parts { + headers: Default::default(), + status: 200, + url: url.clone(), + version: HttpVersion::Http11, + }; + let req = http::Request::get("http://example.com").body(())?; + let res = http::Response::builder().status(200).body(())?; + let policy = CachePolicy::new(&req, &res); + + manager + .put( + format!("{}:{}", GET, &url), + HttpResponse::from_parts(parts, Body::wrap_stream(test_body)), + policy.clone(), + ) + .await?; + let data = manager.get(&format!("{}:{}", GET, &url)).await?; + assert!(data.is_some()); + let result = data.unwrap().0.body.bytes().await?; + assert_eq!(result, TEST_BODY); + Ok(()) + } } #[cfg(feature = "manager-moka")]