From c9902e50896be467b22230ccce3f1cbe1d16fa20 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Wed, 6 Nov 2024 20:27:26 +0200 Subject: [PATCH 1/2] ipfs: optimize retries --- core/src/polling_monitor/ipfs_service.rs | 19 +- graph/src/components/link_resolver/ipfs.rs | 54 ++- graph/src/ipfs/client.rs | 186 +++++++++ graph/src/ipfs/content_path.rs | 2 +- graph/src/ipfs/error.rs | 72 +++- graph/src/ipfs/gateway_client.rs | 392 +++++-------------- graph/src/ipfs/mod.rs | 66 +--- graph/src/ipfs/pool.rs | 292 +++++++-------- graph/src/ipfs/retry_policy.rs | 55 ++- graph/src/ipfs/rpc_client.rs | 416 +++++---------------- graph/src/ipfs/server_address.rs | 2 +- runtime/test/src/common.rs | 6 +- tests/src/fixture/mod.rs | 14 +- 13 files changed, 678 insertions(+), 898 deletions(-) create mode 100644 graph/src/ipfs/client.rs diff --git a/core/src/polling_monitor/ipfs_service.rs b/core/src/polling_monitor/ipfs_service.rs index 24a9a6b9c6a..7e147b90b4b 100644 --- a/core/src/polling_monitor/ipfs_service.rs +++ b/core/src/polling_monitor/ipfs_service.rs @@ -1,12 +1,13 @@ use std::sync::Arc; use std::time::Duration; -use anyhow::{anyhow, Error}; +use anyhow::anyhow; +use anyhow::Error; use bytes::Bytes; use graph::futures03::future::BoxFuture; use graph::ipfs::ContentPath; use graph::ipfs::IpfsClient; -use graph::ipfs::IpfsError; +use graph::ipfs::RetryPolicy; use graph::{derive::CheapClone, prelude::CheapClone}; use tower::{buffer::Buffer, ServiceBuilder, ServiceExt}; @@ -50,12 +51,17 @@ impl IpfsServiceInner { let res = self .client - .cat(&path, self.max_file_size, Some(self.timeout)) + .cat( + &path, + self.max_file_size, + Some(self.timeout), + RetryPolicy::None, + ) .await; match res { Ok(file_bytes) => Ok(Some(file_bytes)), - Err(IpfsError::RequestFailed(err)) if err.is_timeout() => { + Err(err) if err.is_timeout() => { // Timeouts in IPFS mean that the content is not available, so we return `None`. Ok(None) } @@ -114,10 +120,9 @@ mod test { let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard()) - .unwrap() - .into_boxed(); + .unwrap(); - let svc = ipfs_service(client.into(), 100000, Duration::from_secs(30), 10); + let svc = ipfs_service(Arc::new(client), 100000, Duration::from_secs(30), 10); let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap(); let content = svc.oneshot(path).await.unwrap().unwrap(); diff --git a/graph/src/components/link_resolver/ipfs.rs b/graph/src/components/link_resolver/ipfs.rs index 5064ab4b030..9ecf4ff02e3 100644 --- a/graph/src/components/link_resolver/ipfs.rs +++ b/graph/src/components/link_resolver/ipfs.rs @@ -21,6 +21,7 @@ use crate::futures01::Async; use crate::futures01::Poll; use crate::ipfs::ContentPath; use crate::ipfs::IpfsClient; +use crate::ipfs::RetryPolicy; use crate::prelude::{LinkResolver as LinkResolverTrait, *}; #[derive(Clone, CheapClone, Derivative)] @@ -36,6 +37,9 @@ pub struct IpfsResolver { max_file_size: usize, max_map_file_size: usize, max_cache_file_size: usize, + + /// When set to `true`, it means infinite retries, ignoring the timeout setting. + retry: bool, } impl IpfsResolver { @@ -51,6 +55,7 @@ impl IpfsResolver { max_file_size: env.max_ipfs_file_bytes, max_map_file_size: env.max_ipfs_map_file_size, max_cache_file_size: env.max_ipfs_cache_file_size, + retry: false, } } } @@ -64,8 +69,9 @@ impl LinkResolverTrait for IpfsResolver { } fn with_retries(&self) -> Box { - // IPFS clients have internal retries enabled by default. - Box::new(self.cheap_clone()) + let mut s = self.cheap_clone(); + s.retry = true; + Box::new(s) } async fn cat(&self, logger: &Logger, link: &Link) -> Result, Error> { @@ -81,9 +87,16 @@ impl LinkResolverTrait for IpfsResolver { trace!(logger, "IPFS cat cache miss"; "hash" => path.to_string()); + let (timeout, retry_policy) = if self.retry { + (None, RetryPolicy::NonDeterministic) + } else { + (Some(timeout), RetryPolicy::Networking) + }; + let data = self .client - .cat(&path, max_file_size, Some(timeout)) + .clone() + .cat(&path, max_file_size, timeout, retry_policy) .await? .to_vec(); @@ -111,7 +124,18 @@ impl LinkResolverTrait for IpfsResolver { trace!(logger, "IPFS block get"; "hash" => path.to_string()); - let data = self.client.get_block(&path, Some(timeout)).await?.to_vec(); + let (timeout, retry_policy) = if self.retry { + (None, RetryPolicy::NonDeterministic) + } else { + (Some(timeout), RetryPolicy::Networking) + }; + + let data = self + .client + .clone() + .get_block(&path, timeout, retry_policy) + .await? + .to_vec(); Ok(data) } @@ -119,12 +143,20 @@ impl LinkResolverTrait for IpfsResolver { async fn json_stream(&self, logger: &Logger, link: &Link) -> Result { let path = ContentPath::new(&link.link)?; let max_map_file_size = self.max_map_file_size; + let timeout = self.timeout; trace!(logger, "IPFS JSON stream"; "hash" => path.to_string()); + let (timeout, retry_policy) = if self.retry { + (None, RetryPolicy::NonDeterministic) + } else { + (Some(timeout), RetryPolicy::Networking) + }; + let mut stream = self .client - .cat_stream(&path, None) + .clone() + .cat_stream(&path, timeout, retry_policy) .await? .fuse() .boxed() @@ -228,11 +260,8 @@ mod tests { let logger = crate::log::discard(); - let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger) - .unwrap() - .into_boxed(); - - let resolver = IpfsResolver::new(client.into(), Arc::new(env_vars)); + let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger).unwrap(); + let resolver = IpfsResolver::new(Arc::new(client), Arc::new(env_vars)); let err = IpfsResolver::cat(&resolver, &logger, &Link { link: cid.clone() }) .await @@ -250,9 +279,8 @@ mod tests { .to_owned(); let logger = crate::log::discard(); - let client = - IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger)?.into_boxed(); - let resolver = IpfsResolver::new(client.into(), Arc::new(env_vars)); + let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger)?; + let resolver = IpfsResolver::new(Arc::new(client), Arc::new(env_vars)); let stream = IpfsResolver::json_stream(&resolver, &logger, &Link { link: cid }).await?; stream.map_ok(|sv| sv.value).try_collect().await diff --git a/graph/src/ipfs/client.rs b/graph/src/ipfs/client.rs new file mode 100644 index 00000000000..d9df6cafb67 --- /dev/null +++ b/graph/src/ipfs/client.rs @@ -0,0 +1,186 @@ +use std::future::Future; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use bytes::Bytes; +use bytes::BytesMut; +use futures03::stream::BoxStream; +use futures03::StreamExt; +use futures03::TryStreamExt; +use slog::Logger; + +use crate::ipfs::ContentPath; +use crate::ipfs::IpfsError; +use crate::ipfs::IpfsResult; +use crate::ipfs::RetryPolicy; + +/// A read-only connection to an IPFS server. +#[async_trait] +pub trait IpfsClient: Send + Sync + 'static { + /// Returns the logger associated with the client. + fn logger(&self) -> &Logger; + + /// Sends a request to the IPFS server and returns a raw response. + async fn call(self: Arc, req: IpfsRequest) -> IpfsResult; + + /// Streams data from the specified content path. + /// + /// If a timeout is specified, the execution will be aborted if the IPFS server + /// does not return a response within the specified amount of time. + /// + /// The timeout is not propagated to the resulting stream. + async fn cat_stream( + self: Arc, + path: &ContentPath, + timeout: Option, + retry_policy: RetryPolicy, + ) -> IpfsResult>> { + let fut = retry_policy.create("IPFS.cat_stream", self.logger()).run({ + let path = path.to_owned(); + + move || { + let path = path.clone(); + let client = self.clone(); + + async move { client.call(IpfsRequest::Cat(path)).await } + } + }); + + let resp = run_with_optional_timeout(path, fut, timeout).await?; + + Ok(resp.bytes_stream()) + } + + /// Downloads data from the specified content path. + /// + /// If a timeout is specified, the execution will be aborted if the IPFS server + /// does not return a response within the specified amount of time. + async fn cat( + self: Arc, + path: &ContentPath, + max_size: usize, + timeout: Option, + retry_policy: RetryPolicy, + ) -> IpfsResult { + let fut = retry_policy.create("IPFS.cat", self.logger()).run({ + let path = path.to_owned(); + + move || { + let path = path.clone(); + let client = self.clone(); + + async move { + client + .call(IpfsRequest::Cat(path)) + .await? + .bytes(Some(max_size)) + .await + } + } + }); + + run_with_optional_timeout(path, fut, timeout).await + } + + /// Downloads an IPFS block in raw format. + /// + /// If a timeout is specified, the execution will be aborted if the IPFS server + /// does not return a response within the specified amount of time. + async fn get_block( + self: Arc, + path: &ContentPath, + timeout: Option, + retry_policy: RetryPolicy, + ) -> IpfsResult { + let fut = retry_policy.create("IPFS.get_block", self.logger()).run({ + let path = path.to_owned(); + + move || { + let path = path.clone(); + let client = self.clone(); + + async move { + client + .call(IpfsRequest::GetBlock(path)) + .await? + .bytes(None) + .await + } + } + }); + + run_with_optional_timeout(path, fut, timeout).await + } +} + +/// Describes a request to an IPFS server. +#[derive(Clone, Debug)] +pub enum IpfsRequest { + Cat(ContentPath), + GetBlock(ContentPath), +} + +/// Contains a raw, successful IPFS response. +#[derive(Debug)] +pub struct IpfsResponse { + pub(super) path: ContentPath, + pub(super) response: reqwest::Response, +} + +impl IpfsResponse { + /// Reads and returns the response body. + /// + /// If the max size is specified and the response body is larger than the max size, + /// execution will result in an error. + pub async fn bytes(self, max_size: Option) -> IpfsResult { + let Some(max_size) = max_size else { + return self.response.bytes().await.map_err(Into::into); + }; + + let bytes = self + .response + .bytes_stream() + .err_into() + .try_fold(BytesMut::new(), |mut acc, chunk| async { + acc.extend(chunk); + + if acc.len() > max_size { + return Err(IpfsError::ContentTooLarge { + path: self.path.clone(), + max_size, + }); + } + + Ok(acc) + }) + .await?; + + Ok(bytes.into()) + } + + /// Converts the response into a stream of bytes from the body. + pub fn bytes_stream(self) -> BoxStream<'static, IpfsResult> { + self.response.bytes_stream().err_into().boxed() + } +} + +async fn run_with_optional_timeout( + path: &ContentPath, + fut: F, + timeout: Option, +) -> IpfsResult +where + F: Future>, +{ + match timeout { + Some(timeout) => { + tokio::time::timeout(timeout, fut) + .await + .map_err(|_| IpfsError::RequestTimeout { + path: path.to_owned(), + })? + } + None => fut.await, + } +} diff --git a/graph/src/ipfs/content_path.rs b/graph/src/ipfs/content_path.rs index 3106d202d5e..2032526b6ae 100644 --- a/graph/src/ipfs/content_path.rs +++ b/graph/src/ipfs/content_path.rs @@ -4,8 +4,8 @@ use cid::Cid; use crate::ipfs::IpfsError; use crate::ipfs::IpfsResult; -#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] /// Represents a path to some data on IPFS. +#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ContentPath { cid: Cid, path: Option, diff --git a/graph/src/ipfs/error.rs b/graph/src/ipfs/error.rs index 99f4b4db9fc..9cb956bbccb 100644 --- a/graph/src/ipfs/error.rs +++ b/graph/src/ipfs/error.rs @@ -37,27 +37,78 @@ pub enum IpfsError { #[error("IPFS content from '{path}' exceeds the {max_size} bytes limit")] ContentTooLarge { path: ContentPath, max_size: usize }, + /// Does not consider HTTP status codes for timeouts. + #[error("IPFS request to '{path}' timed out")] + RequestTimeout { path: ContentPath }, + + #[error("IPFS request to '{path}' failed with a deterministic error: {reason:#}")] + DeterministicFailure { + path: ContentPath, + reason: DeterministicIpfsError, + }, + #[error(transparent)] RequestFailed(RequestError), } +#[derive(Debug, Error)] +pub enum DeterministicIpfsError {} + #[derive(Debug, Error)] #[error("request to IPFS server failed: {0:#}")] pub struct RequestError(reqwest::Error); impl IpfsError { + /// Returns true if the sever is invalid. pub fn is_invalid_server(&self) -> bool { matches!(self, Self::InvalidServer { .. }) } + + /// Returns true if the error was caused by a timeout. + /// + /// Considers HTTP status codes for timeouts. + pub fn is_timeout(&self) -> bool { + match self { + Self::RequestTimeout { .. } => true, + Self::RequestFailed(err) if err.is_timeout() => true, + _ => false, + } + } + + /// Returns true if the error was caused by a network connection failure. + pub fn is_networking(&self) -> bool { + matches!(self, Self::RequestFailed(err) if err.is_networking()) + } + + /// Returns true if the error is deterministic. + pub fn is_deterministic(&self) -> bool { + match self { + Self::InvalidServerAddress { .. } => true, + Self::InvalidServer { .. } => true, + Self::InvalidContentPath { .. } => true, + Self::ContentNotAvailable { .. } => false, + Self::ContentTooLarge { .. } => true, + Self::RequestTimeout { .. } => false, + Self::DeterministicFailure { .. } => true, + Self::RequestFailed(_) => false, + } + } } impl From for IpfsError { fn from(err: reqwest::Error) -> Self { - Self::RequestFailed(RequestError(err)) + // We remove the URL from the error as it may contain + // sensitive information such as auth tokens or passwords. + Self::RequestFailed(RequestError(err.without_url())) } } impl RequestError { + /// Returns true if the request failed due to a networking error. + pub fn is_networking(&self) -> bool { + self.0.is_request() || self.0.is_connect() || self.0.is_timeout() + } + /// Returns true if the request failed due to a timeout. pub fn is_timeout(&self) -> bool { if self.0.is_timeout() { @@ -80,23 +131,4 @@ impl RequestError { .into_iter() .any(|x| status == x) } - - /// Returns true if the request can be retried. - pub fn is_retriable(&self) -> bool { - let Some(status) = self.0.status() else { - return true; - }; - - const CLOUDFLARE_WEB_SERVER_DOWN: u16 = 521; - - [ - StatusCode::TOO_MANY_REQUESTS, - StatusCode::INTERNAL_SERVER_ERROR, - StatusCode::BAD_GATEWAY, - StatusCode::SERVICE_UNAVAILABLE, - StatusCode::from_u16(CLOUDFLARE_WEB_SERVER_DOWN).unwrap(), - ] - .into_iter() - .any(|x| status == x) - } } diff --git a/graph/src/ipfs/gateway_client.rs b/graph/src/ipfs/gateway_client.rs index 87eb25b48a0..4f4844f0147 100644 --- a/graph/src/ipfs/gateway_client.rs +++ b/graph/src/ipfs/gateway_client.rs @@ -1,39 +1,31 @@ +use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; use async_trait::async_trait; -use bytes::Bytes; -use bytes::BytesMut; use derivative::Derivative; -use futures03::stream::BoxStream; -use futures03::StreamExt; -use futures03::TryStreamExt; use http::header::ACCEPT; use http::header::CACHE_CONTROL; use reqwest::StatusCode; use slog::Logger; -use crate::derive::CheapClone; -use crate::ipfs::retry_policy::retry_policy; -use crate::ipfs::CanProvide; -use crate::ipfs::Cat; -use crate::ipfs::CatStream; -use crate::ipfs::ContentPath; -use crate::ipfs::GetBlock; use crate::ipfs::IpfsClient; use crate::ipfs::IpfsError; +use crate::ipfs::IpfsRequest; +use crate::ipfs::IpfsResponse; use crate::ipfs::IpfsResult; +use crate::ipfs::RetryPolicy; use crate::ipfs::ServerAddress; /// The request that verifies that the IPFS gateway is accessible is generally fast because /// it does not involve querying the distributed network. -const TEST_REQUEST_TIMEOUT: Duration = Duration::from_secs(300); +const TEST_REQUEST_TIMEOUT: Duration = Duration::from_secs(60); -#[derive(Clone, CheapClone, Derivative)] -#[derivative(Debug)] /// A client that connects to an IPFS gateway. /// /// Reference: +#[derive(Clone, Derivative)] +#[derivative(Debug)] pub struct IpfsGatewayClient { server_address: ServerAddress, @@ -41,7 +33,6 @@ pub struct IpfsGatewayClient { http_client: reqwest::Client, logger: Logger, - test_request_timeout: Duration, } impl IpfsGatewayClient { @@ -68,14 +59,11 @@ impl IpfsGatewayClient { server_address: ServerAddress::new(server_address)?, http_client: reqwest::Client::new(), logger: logger.to_owned(), - test_request_timeout: TEST_REQUEST_TIMEOUT, }) } - pub fn into_boxed(self) -> Box { - Box::new(self) - } - + /// A one-time request sent at client initialization to verify that the specified + /// server address is a valid IPFS gateway server. async fn send_test_request(&self) -> anyhow::Result<()> { // To successfully perform this test, it does not really matter which CID we use. const RANDOM_CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; @@ -88,10 +76,10 @@ impl IpfsGatewayClient { let req = self .http_client .head(self.ipfs_url(RANDOM_CID)) - .header(CACHE_CONTROL, "only-if-cached") - .timeout(self.test_request_timeout); + .header(CACHE_CONTROL, "only-if-cached"); - let ok = retry_policy("IPFS.Gateway.send_test_request", &self.logger) + let fut = RetryPolicy::NonDeterministic + .create("IPFS.Gateway.send_test_request", &self.logger) .run(move || { let req = req.try_clone().expect("request can be cloned"); @@ -107,8 +95,11 @@ impl IpfsGatewayClient { Ok(false) } - }) - .await?; + }); + + let ok = tokio::time::timeout(TEST_REQUEST_TIMEOUT, fut) + .await + .map_err(|_| anyhow!("request timed out"))??; if !ok { return Err(anyhow!("not a gateway")); @@ -123,131 +114,43 @@ impl IpfsGatewayClient { } #[async_trait] -impl CanProvide for IpfsGatewayClient { - async fn can_provide(&self, path: &ContentPath, timeout: Option) -> IpfsResult { - let url = self.ipfs_url(path.to_string()); - let mut req = self.http_client.head(url); - - if let Some(timeout) = timeout { - req = req.timeout(timeout); - } - - retry_policy("IPFS.Gateway.can_provide", &self.logger) - .run(move || { - let req = req.try_clone().expect("request can be cloned"); - - async move { - let status = req.send().await?.error_for_status()?.status(); - - Ok(status == StatusCode::OK) - } - }) - .await - } -} - -#[async_trait] -impl CatStream for IpfsGatewayClient { - async fn cat_stream( - &self, - path: &ContentPath, - timeout: Option, - ) -> IpfsResult>> { - let url = self.ipfs_url(path.to_string()); - let mut req = self.http_client.get(url); - - if let Some(timeout) = timeout { - req = req.timeout(timeout); - } - - let resp = retry_policy("IPFS.Gateway.cat_stream", &self.logger) - .run(move || { - let req = req.try_clone().expect("request can be cloned"); - - async move { Ok(req.send().await?.error_for_status()?) } - }) - .await?; - - Ok(resp.bytes_stream().err_into().boxed()) +impl IpfsClient for IpfsGatewayClient { + fn logger(&self) -> &Logger { + &self.logger } -} - -#[async_trait] -impl Cat for IpfsGatewayClient { - async fn cat( - &self, - path: &ContentPath, - max_size: usize, - timeout: Option, - ) -> IpfsResult { - let url = self.ipfs_url(path.to_string()); - let mut req = self.http_client.get(url); - - if let Some(timeout) = timeout { - req = req.timeout(timeout); - } - let path = path.to_owned(); + async fn call(self: Arc, req: IpfsRequest) -> IpfsResult { + use IpfsRequest::*; - retry_policy("IPFS.Gateway.cat", &self.logger) - .run(move || { - let path = path.clone(); - let req = req.try_clone().expect("request can be cloned"); + let (path, req) = match req { + Cat(path) => { + let url = self.ipfs_url(path.to_string()); + let req = self.http_client.get(url); - async move { - let content = req - .send() - .await? - .error_for_status()? - .bytes_stream() - .err_into() - .try_fold(BytesMut::new(), |mut acc, chunk| async { - acc.extend(chunk); - - if acc.len() > max_size { - return Err(IpfsError::ContentTooLarge { - path: path.clone(), - max_size, - }); - } - - Ok(acc) - }) - .await?; - - Ok(content.into()) - } - }) - .await - } -} + (path, req) + } + GetBlock(path) => { + let url = self.ipfs_url(format!("{path}?format=raw")); -#[async_trait] -impl GetBlock for IpfsGatewayClient { - async fn get_block(&self, path: &ContentPath, timeout: Option) -> IpfsResult { - let url = self.ipfs_url(format!("{path}?format=raw")); - - let mut req = self - .http_client - .get(url) - .header(ACCEPT, "application/vnd.ipld.raw"); + let req = self + .http_client + .get(url) + .header(ACCEPT, "application/vnd.ipld.raw"); - if let Some(timeout) = timeout { - req = req.timeout(timeout); - } + (path, req) + } + }; - retry_policy("IPFS.Gateway.get_block", &self.logger) - .run(move || { - let req = req.try_clone().expect("request can be cloned"); + let response = req.send().await?.error_for_status()?; - async move { Ok(req.send().await?.error_for_status()?.bytes().await?) } - }) - .await + Ok(IpfsResponse { path, response }) } } #[cfg(test)] mod tests { + use bytes::BytesMut; + use futures03::TryStreamExt; use wiremock::matchers as m; use wiremock::Mock; use wiremock::MockBuilder; @@ -255,6 +158,7 @@ mod tests { use wiremock::ResponseTemplate; use super::*; + use crate::ipfs::ContentPath; use crate::log::discard; const PATH: &str = "/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; @@ -283,11 +187,11 @@ mod tests { .and(m::header("Accept", "application/vnd.ipld.raw")) } - async fn make_client() -> (MockServer, IpfsGatewayClient) { + async fn make_client() -> (MockServer, Arc) { let server = mock_server().await; let client = IpfsGatewayClient::new_unchecked(server.uri(), &discard()).unwrap(); - (server, client) + (server, Arc::new(client)) } fn make_path() -> ContentPath { @@ -334,7 +238,7 @@ mod tests { } #[tokio::test] - async fn new_retries_gateway_check_on_retriable_errors() { + async fn new_retries_gateway_check_on_non_deterministic_errors() { let server = mock_server().await; mock_gateway_check(StatusCode::INTERNAL_SERVER_ERROR) @@ -353,20 +257,6 @@ mod tests { .unwrap(); } - #[tokio::test] - async fn new_does_not_retry_gateway_check_on_non_retriable_errors() { - let server = mock_server().await; - - mock_gateway_check(StatusCode::METHOD_NOT_ALLOWED) - .expect(1) - .mount(&server) - .await; - - IpfsGatewayClient::new(server.uri(), &discard()) - .await - .unwrap_err(); - } - #[tokio::test] async fn new_unchecked_creates_the_client_without_checking_the_gateway() { let server = mock_server().await; @@ -374,87 +264,6 @@ mod tests { IpfsGatewayClient::new_unchecked(server.uri(), &discard()).unwrap(); } - #[tokio::test] - async fn can_provide_returns_true_when_content_is_available() { - let (server, client) = make_client().await; - - mock_head() - .respond_with(ResponseTemplate::new(StatusCode::OK)) - .expect(1) - .mount(&server) - .await; - - let ok = client.can_provide(&make_path(), None).await.unwrap(); - - assert!(ok); - } - - #[tokio::test] - async fn can_provide_returns_false_when_content_is_not_completely_available() { - let (server, client) = make_client().await; - - mock_head() - .respond_with(ResponseTemplate::new(StatusCode::PARTIAL_CONTENT)) - .expect(1) - .mount(&server) - .await; - - let ok = client.can_provide(&make_path(), None).await.unwrap(); - - assert!(!ok); - } - - #[tokio::test] - async fn can_provide_fails_on_timeout() { - let (server, client) = make_client().await; - - mock_head() - .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(500))) - .expect(1) - .mount(&server) - .await; - - client - .can_provide(&make_path(), Some(ms(300))) - .await - .unwrap_err(); - } - - #[tokio::test] - async fn can_provide_retries_the_request_on_retriable_errors() { - let (server, client) = make_client().await; - - mock_head() - .respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR)) - .up_to_n_times(1) - .expect(1) - .mount(&server) - .await; - - mock_head() - .respond_with(ResponseTemplate::new(StatusCode::OK)) - .expect(1) - .mount(&server) - .await; - - let ok = client.can_provide(&make_path(), None).await.unwrap(); - - assert!(ok); - } - - #[tokio::test] - async fn can_provide_does_not_retry_the_request_on_non_retriable_errors() { - let (server, client) = make_client().await; - - mock_head() - .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) - .expect(1) - .mount(&server) - .await; - - client.can_provide(&make_path(), None).await.unwrap_err(); - } - #[tokio::test] async fn cat_stream_returns_the_content() { let (server, client) = make_client().await; @@ -465,8 +274,8 @@ mod tests { .mount(&server) .await; - let content = client - .cat_stream(&make_path(), None) + let bytes = client + .cat_stream(&make_path(), None, RetryPolicy::None) .await .unwrap() .try_fold(BytesMut::new(), |mut acc, chunk| async { @@ -477,7 +286,7 @@ mod tests { .await .unwrap(); - assert_eq!(content.as_ref(), b"some data") + assert_eq!(bytes.as_ref(), b"some data") } #[tokio::test] @@ -490,13 +299,15 @@ mod tests { .mount(&server) .await; - let result = client.cat_stream(&make_path(), Some(ms(300))).await; + let result = client + .cat_stream(&make_path(), Some(ms(300)), RetryPolicy::None) + .await; assert!(matches!(result, Err(_))); } #[tokio::test] - async fn cat_stream_retries_the_request_on_retriable_errors() { + async fn cat_stream_retries_the_request_on_non_deterministic_errors() { let (server, client) = make_client().await; mock_get() @@ -512,22 +323,10 @@ mod tests { .mount(&server) .await; - let _stream = client.cat_stream(&make_path(), None).await.unwrap(); - } - - #[tokio::test] - async fn cat_stream_does_not_retry_the_request_on_non_retriable_errors() { - let (server, client) = make_client().await; - - mock_get() - .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) - .expect(1) - .mount(&server) - .await; - - let result = client.cat_stream(&make_path(), None).await; - - assert!(matches!(result, Err(_))); + let _stream = client + .cat_stream(&make_path(), None, RetryPolicy::NonDeterministic) + .await + .unwrap(); } #[tokio::test] @@ -540,9 +339,12 @@ mod tests { .mount(&server) .await; - let content = client.cat(&make_path(), usize::MAX, None).await.unwrap(); + let bytes = client + .cat(&make_path(), usize::MAX, None, RetryPolicy::None) + .await + .unwrap(); - assert_eq!(content.as_ref(), b"some data"); + assert_eq!(bytes.as_ref(), b"some data"); } #[tokio::test] @@ -557,9 +359,12 @@ mod tests { .mount(&server) .await; - let content = client.cat(&make_path(), data.len(), None).await.unwrap(); + let bytes = client + .cat(&make_path(), data.len(), None, RetryPolicy::None) + .await + .unwrap(); - assert_eq!(content.as_ref(), data); + assert_eq!(bytes.as_ref(), data); } #[tokio::test] @@ -575,7 +380,7 @@ mod tests { .await; client - .cat(&make_path(), data.len() - 1, None) + .cat(&make_path(), data.len() - 1, None, RetryPolicy::None) .await .unwrap_err(); } @@ -591,13 +396,13 @@ mod tests { .await; client - .cat(&make_path(), usize::MAX, Some(ms(300))) + .cat(&make_path(), usize::MAX, Some(ms(300)), RetryPolicy::None) .await .unwrap_err(); } #[tokio::test] - async fn cat_retries_the_request_on_retriable_errors() { + async fn cat_retries_the_request_on_non_deterministic_errors() { let (server, client) = make_client().await; mock_get() @@ -613,25 +418,17 @@ mod tests { .mount(&server) .await; - let content = client.cat(&make_path(), usize::MAX, None).await.unwrap(); - - assert_eq!(content.as_ref(), b"some data"); - } - - #[tokio::test] - async fn cat_does_not_retry_the_request_on_non_retriable_errors() { - let (server, client) = make_client().await; - - mock_get() - .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) - .expect(1) - .mount(&server) - .await; - - client - .cat(&make_path(), usize::MAX, None) + let bytes = client + .cat( + &make_path(), + usize::MAX, + None, + RetryPolicy::NonDeterministic, + ) .await - .unwrap_err(); + .unwrap(); + + assert_eq!(bytes.as_ref(), b"some data"); } #[tokio::test] @@ -644,9 +441,12 @@ mod tests { .mount(&server) .await; - let block = client.get_block(&make_path(), None).await.unwrap(); + let bytes = client + .get_block(&make_path(), None, RetryPolicy::None) + .await + .unwrap(); - assert_eq!(block.as_ref(), b"some data"); + assert_eq!(bytes.as_ref(), b"some data"); } #[tokio::test] @@ -660,13 +460,13 @@ mod tests { .await; client - .get_block(&make_path(), Some(ms(300))) + .get_block(&make_path(), Some(ms(300)), RetryPolicy::None) .await .unwrap_err(); } #[tokio::test] - async fn get_block_retries_the_request_on_retriable_errors() { + async fn get_block_retries_the_request_on_non_deterministic_errors() { let (server, client) = make_client().await; mock_get_block() @@ -682,21 +482,11 @@ mod tests { .mount(&server) .await; - let block = client.get_block(&make_path(), None).await.unwrap(); - - assert_eq!(block.as_ref(), b"some data"); - } - - #[tokio::test] - async fn get_block_does_not_retry_the_request_on_non_retriable_errors() { - let (server, client) = make_client().await; - - mock_get_block() - .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) - .expect(1) - .mount(&server) - .await; + let bytes = client + .get_block(&make_path(), None, RetryPolicy::NonDeterministic) + .await + .unwrap(); - client.get_block(&make_path(), None).await.unwrap_err(); + assert_eq!(bytes.as_ref(), b"some data"); } } diff --git a/graph/src/ipfs/mod.rs b/graph/src/ipfs/mod.rs index 038897d7dcd..9770ab497db 100644 --- a/graph/src/ipfs/mod.rs +++ b/graph/src/ipfs/mod.rs @@ -1,15 +1,12 @@ use std::sync::Arc; -use std::time::Duration; use anyhow::anyhow; -use async_trait::async_trait; -use bytes::Bytes; -use futures03::stream::BoxStream; use slog::info; use slog::Logger; use crate::util::security::SafeDisplay; +mod client; mod content_path; mod error; mod gateway_client; @@ -20,62 +17,25 @@ mod server_address; pub mod test_utils; +pub use self::client::IpfsClient; +pub use self::client::IpfsRequest; +pub use self::client::IpfsResponse; pub use self::content_path::ContentPath; pub use self::error::IpfsError; pub use self::error::RequestError; pub use self::gateway_client::IpfsGatewayClient; +pub use self::pool::IpfsClientPool; +pub use self::retry_policy::RetryPolicy; pub use self::rpc_client::IpfsRpcClient; pub use self::server_address::ServerAddress; pub type IpfsResult = Result; -/// Describes a read-only connection to an IPFS server. -pub trait IpfsClient: CanProvide + CatStream + Cat + GetBlock + Send + Sync + 'static {} - -#[async_trait] -/// Checks if the server can provide data from the specified content path. -pub trait CanProvide { - /// Checks if the server can provide data from the specified content path. - async fn can_provide(&self, path: &ContentPath, timeout: Option) -> IpfsResult; -} - -#[async_trait] -/// Streams data from the specified content path. -pub trait CatStream { - /// Streams data from the specified content path. - async fn cat_stream( - &self, - path: &ContentPath, - timeout: Option, - ) -> IpfsResult>>; -} - -#[async_trait] -/// Downloads data from the specified content path. -pub trait Cat { - /// Downloads data from the specified content path. - async fn cat( - &self, - path: &ContentPath, - max_size: usize, - timeout: Option, - ) -> IpfsResult; -} - -#[async_trait] -/// Downloads an IPFS block in raw format. -pub trait GetBlock { - /// Downloads an IPFS block in raw format. - async fn get_block(&self, path: &ContentPath, timeout: Option) -> IpfsResult; -} - -impl IpfsClient for T where T: CanProvide + CatStream + Cat + GetBlock + Send + Sync + 'static {} - /// Creates and returns the most appropriate IPFS client for the given IPFS server addresses. /// /// If multiple IPFS server addresses are specified, an IPFS client pool is created internally -/// and for each IPFS read request, the fastest client that can provide the content is -/// automatically selected and the request is forwarded to that client. +/// and for each IPFS request, the fastest client that can provide the content is +/// automatically selected and the response is streamed from that client. pub async fn new_ipfs_client( server_addresses: I, logger: &Logger, @@ -84,7 +44,7 @@ where I: IntoIterator, S: AsRef, { - let mut clients = Vec::new(); + let mut clients: Vec> = Vec::new(); for server_address in server_addresses { let server_address = server_address.as_ref(); @@ -103,7 +63,7 @@ where SafeDisplay(server_address) ); - clients.push(client.into_boxed()); + clients.push(Arc::new(client)); continue; } Err(err) if err.is_invalid_server() => {} @@ -118,7 +78,7 @@ where SafeDisplay(server_address) ); - clients.push(client.into_boxed()); + clients.push(Arc::new(client)); continue; } Err(err) if err.is_invalid_server() => {} @@ -140,9 +100,9 @@ where n => { info!(logger, "Creating a pool of {} IPFS clients", n); - let pool = pool::IpfsClientPool::with_clients(clients); + let pool = IpfsClientPool::new(clients, logger); - Ok(pool.into_boxed().into()) + Ok(Arc::new(pool)) } } } diff --git a/graph/src/ipfs/pool.rs b/graph/src/ipfs/pool.rs index 0fb5ce8dc5b..80abd7ca3e8 100644 --- a/graph/src/ipfs/pool.rs +++ b/graph/src/ipfs/pool.rs @@ -1,124 +1,79 @@ -use std::time::Duration; +use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; -use bytes::Bytes; -use futures03::stream::BoxStream; use futures03::stream::FuturesUnordered; use futures03::stream::StreamExt; +use slog::Logger; -use crate::ipfs::CanProvide; -use crate::ipfs::Cat; -use crate::ipfs::CatStream; -use crate::ipfs::ContentPath; -use crate::ipfs::GetBlock; use crate::ipfs::IpfsClient; use crate::ipfs::IpfsError; +use crate::ipfs::IpfsRequest; +use crate::ipfs::IpfsResponse; use crate::ipfs::IpfsResult; /// Contains a list of IPFS clients and, for each read request, selects the fastest IPFS client -/// that can provide the content and forwards the request to that client. +/// that can provide the content and streams the response from that client. /// /// This can significantly improve performance when using multiple IPFS gateways, /// as some of them may already have the content cached. -/// -/// Note: It should remain an implementation detail and not be used directly. -pub(super) struct IpfsClientPool { - inner: Vec>, +pub struct IpfsClientPool { + clients: Vec>, + logger: Logger, } impl IpfsClientPool { - pub(super) fn with_clients(clients: Vec>) -> Self { - Self { inner: clients } - } - - pub(super) fn into_boxed(self) -> Box { - Box::new(self) - } -} - -#[async_trait] -impl CanProvide for IpfsClientPool { - async fn can_provide(&self, path: &ContentPath, timeout: Option) -> IpfsResult { - select_fastest_ipfs_client(&self.inner, path, timeout) - .await - .map(|_client| true) - } -} - -#[async_trait] -impl CatStream for IpfsClientPool { - async fn cat_stream( - &self, - path: &ContentPath, - timeout: Option, - ) -> IpfsResult>> { - let client = select_fastest_ipfs_client(&self.inner, path, timeout).await?; - - client.cat_stream(path, timeout).await + /// Creates a new IPFS client pool from the specified clients. + pub fn new(clients: Vec>, logger: &Logger) -> Self { + Self { + clients, + logger: logger.to_owned(), + } } } #[async_trait] -impl Cat for IpfsClientPool { - async fn cat( - &self, - path: &ContentPath, - max_size: usize, - timeout: Option, - ) -> IpfsResult { - let client = select_fastest_ipfs_client(&self.inner, path, timeout).await?; - - client.cat(path, max_size, timeout).await +impl IpfsClient for IpfsClientPool { + fn logger(&self) -> &Logger { + &self.logger } -} - -#[async_trait] -impl GetBlock for IpfsClientPool { - async fn get_block(&self, path: &ContentPath, timeout: Option) -> IpfsResult { - let client = select_fastest_ipfs_client(&self.inner, path, timeout).await?; - - client.get_block(path, timeout).await - } -} -/// Returns the first IPFS client that can provide the content from the specified path. -async fn select_fastest_ipfs_client<'a>( - clients: &'a [Box], - path: &ContentPath, - timeout: Option, -) -> IpfsResult<&'a dyn IpfsClient> { - let mut futs = clients - .iter() - .enumerate() - .map(|(i, client)| async move { - client - .can_provide(path, timeout) - .await - .map(|ok| ok.then_some(i)) - }) - .collect::>(); - - let mut last_err = None; - - while let Some(result) = futs.next().await { - match result { - Ok(Some(i)) => return Ok(clients[i].as_ref()), - Ok(None) => continue, - Err(err) => last_err = Some(err), + async fn call(self: Arc, req: IpfsRequest) -> IpfsResult { + let mut futs = self + .clients + .iter() + .map(|client| client.clone().call(req.clone())) + .collect::>(); + + let mut last_err = None; + + while let Some(result) = futs.next().await { + match result { + Ok(resp) => return Ok(resp), + Err(err) => last_err = Some(err), + }; + } + + let path = match req { + IpfsRequest::Cat(path) => path, + IpfsRequest::GetBlock(path) => path, }; - } - let err = last_err.unwrap_or_else(|| IpfsError::ContentNotAvailable { - path: path.to_owned(), - reason: anyhow!("no clients can provide the content"), - }); + let err = last_err.unwrap_or_else(|| IpfsError::ContentNotAvailable { + path, + reason: anyhow!("no clients can provide the content"), + }); - Err(err) + Err(err) + } } #[cfg(test)] mod tests { + use std::time::Duration; + + use bytes::BytesMut; + use futures03::TryStreamExt; use http::StatusCode; use wiremock::matchers as m; use wiremock::Mock; @@ -127,24 +82,22 @@ mod tests { use wiremock::ResponseTemplate; use super::*; + use crate::ipfs::ContentPath; use crate::ipfs::IpfsGatewayClient; + use crate::ipfs::RetryPolicy; use crate::log::discard; const PATH: &str = "/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; - fn mock_head() -> MockBuilder { - Mock::given(m::method("HEAD")).and(m::path(PATH)) - } - fn mock_get() -> MockBuilder { Mock::given(m::method("GET")).and(m::path(PATH)) } - async fn make_client() -> (MockServer, IpfsGatewayClient) { + async fn make_client() -> (MockServer, Arc) { let server = MockServer::start().await; let client = IpfsGatewayClient::new_unchecked(server.uri(), &discard()).unwrap(); - (server, client) + (server, Arc::new(client)) } fn make_path() -> ContentPath { @@ -156,116 +109,149 @@ mod tests { } #[tokio::test] - async fn can_provide_returns_true_if_any_client_can_provide_the_content() { + async fn cat_stream_streams_the_response_from_the_fastest_client() { let (server_1, client_1) = make_client().await; let (server_2, client_2) = make_client().await; + let (server_3, client_3) = make_client().await; - mock_head() + mock_get() .respond_with( - ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR).set_delay(ms(100)), + ResponseTemplate::new(StatusCode::OK) + .set_body_bytes(b"server_1") + .set_delay(ms(300)), ) - .expect(1..) - .mount(&server_1) - .await; - - mock_head() - .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(200))) - .expect(1) - .mount(&server_2) - .await; - - let clients = vec![client_1.into_boxed(), client_2.into_boxed()]; - let pool = IpfsClientPool::with_clients(clients); - let ok = pool.can_provide(&make_path(), None).await.unwrap(); - - assert!(ok); - } - - #[tokio::test] - async fn cat_stream_forwards_the_request_to_the_fastest_client_that_can_provide_the_content() { - let (server_1, client_1) = make_client().await; - let (server_2, client_2) = make_client().await; - - mock_head() - .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(200))) .expect(1) .mount(&server_1) .await; - mock_head() - .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(100))) + mock_get() + .respond_with( + ResponseTemplate::new(StatusCode::OK) + .set_body_bytes(b"server_2") + .set_delay(ms(200)), + ) .expect(1) .mount(&server_2) .await; mock_get() - .respond_with(ResponseTemplate::new(StatusCode::OK)) + .respond_with( + ResponseTemplate::new(StatusCode::OK) + .set_body_bytes(b"server_3") + .set_delay(ms(100)), + ) .expect(1) - .mount(&server_2) + .mount(&server_3) .await; - let clients = vec![client_1.into_boxed(), client_2.into_boxed()]; - let pool = IpfsClientPool::with_clients(clients); - let _stream = pool.cat_stream(&make_path(), None).await.unwrap(); + let clients: Vec> = vec![client_1, client_2, client_3]; + let pool = Arc::new(IpfsClientPool::new(clients, &discard())); + + let bytes = pool + .cat_stream(&make_path(), None, RetryPolicy::None) + .await + .unwrap() + .try_fold(BytesMut::new(), |mut acc, chunk| async { + acc.extend(chunk); + Ok(acc) + }) + .await + .unwrap(); + + assert_eq!(bytes.as_ref(), b"server_3"); } #[tokio::test] - async fn cat_forwards_the_request_to_the_fastest_client_that_can_provide_the_content() { + async fn cat_streams_the_response_from_the_fastest_client() { let (server_1, client_1) = make_client().await; let (server_2, client_2) = make_client().await; + let (server_3, client_3) = make_client().await; - mock_head() - .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(200))) + mock_get() + .respond_with( + ResponseTemplate::new(StatusCode::OK) + .set_body_bytes(b"server_1") + .set_delay(ms(300)), + ) .expect(1) .mount(&server_1) .await; - mock_head() - .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(100))) + mock_get() + .respond_with( + ResponseTemplate::new(StatusCode::OK) + .set_body_bytes(b"server_2") + .set_delay(ms(200)), + ) .expect(1) .mount(&server_2) .await; mock_get() - .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"some data")) + .respond_with( + ResponseTemplate::new(StatusCode::OK) + .set_body_bytes(b"server_3") + .set_delay(ms(100)), + ) .expect(1) - .mount(&server_2) + .mount(&server_3) .await; - let clients = vec![client_1.into_boxed(), client_2.into_boxed()]; - let pool = IpfsClientPool::with_clients(clients); - let content = pool.cat(&make_path(), usize::MAX, None).await.unwrap(); + let clients: Vec> = vec![client_1, client_2, client_3]; + let pool = Arc::new(IpfsClientPool::new(clients, &discard())); - assert_eq!(content.as_ref(), b"some data") + let bytes = pool + .cat(&make_path(), usize::MAX, None, RetryPolicy::None) + .await + .unwrap(); + + assert_eq!(bytes.as_ref(), b"server_3") } #[tokio::test] - async fn get_block_forwards_the_request_to_the_fastest_client_that_can_provide_the_content() { + async fn get_block_streams_the_response_from_the_fastest_client() { let (server_1, client_1) = make_client().await; let (server_2, client_2) = make_client().await; + let (server_3, client_3) = make_client().await; - mock_head() - .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(200))) + mock_get() + .respond_with( + ResponseTemplate::new(StatusCode::OK) + .set_body_bytes(b"server_1") + .set_delay(ms(300)), + ) .expect(1) .mount(&server_1) .await; - mock_head() - .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(100))) + mock_get() + .respond_with( + ResponseTemplate::new(StatusCode::OK) + .set_body_bytes(b"server_2") + .set_delay(ms(200)), + ) .expect(1) .mount(&server_2) .await; mock_get() - .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"some data")) + .respond_with( + ResponseTemplate::new(StatusCode::OK) + .set_body_bytes(b"server_3") + .set_delay(ms(100)), + ) .expect(1) - .mount(&server_2) + .mount(&server_3) .await; - let clients = vec![client_1.into_boxed(), client_2.into_boxed()]; - let pool = IpfsClientPool::with_clients(clients); - let block = pool.get_block(&make_path(), None).await.unwrap(); + let clients: Vec> = vec![client_1, client_2, client_3]; + let pool = Arc::new(IpfsClientPool::new(clients, &discard())); + + let bytes = pool + .get_block(&make_path(), None, RetryPolicy::None) + .await + .unwrap(); - assert_eq!(block.as_ref(), b"some data") + assert_eq!(bytes.as_ref(), b"server_3") } } diff --git a/graph/src/ipfs/retry_policy.rs b/graph/src/ipfs/retry_policy.rs index 942b24f4e79..bd955c12684 100644 --- a/graph/src/ipfs/retry_policy.rs +++ b/graph/src/ipfs/retry_policy.rs @@ -4,21 +4,44 @@ use crate::ipfs::error::IpfsError; use crate::util::futures::retry; use crate::util::futures::RetryConfigNoTimeout; -const DEFAULT_MAX_ATTEMPTS: usize = 100; +/// This is a safety mechanism to prevent infinite spamming of IPFS servers +/// in the event of logical or unhandled deterministic errors. +const DEFAULT_MAX_ATTEMPTS: usize = 10_0000; -/// Creates a retry policy for each request sent by IPFS clients. -/// -/// Note: It is expected that timeouts will be set on the requests. -pub fn retry_policy( - operation_name: &'static str, - logger: &Logger, -) -> RetryConfigNoTimeout { - retry(operation_name, logger) - .limit(DEFAULT_MAX_ATTEMPTS) - .when(|result: &Result| match result { - Ok(_) => false, - Err(IpfsError::RequestFailed(err)) => !err.is_timeout() && err.is_retriable(), - Err(_) => false, - }) - .no_timeout() +/// Describes retry behavior when IPFS requests fail. +#[derive(Clone, Copy, Debug)] +pub enum RetryPolicy { + /// At the first error, immediately stops execution and returns the error. + None, + + /// Retries the request if the error is related to the network connection. + Networking, + + /// Retries the request if the error is related to the network connection, + /// and for any error that may be resolved by sending another request. + NonDeterministic, +} + +impl RetryPolicy { + /// Creates a retry policy for every request sent to IPFS servers. + /// + /// Note: It is expected that retries will be wrapped in timeouts + /// when necessary to make them more flexible. + pub(super) fn create( + self, + operation_name: impl ToString, + logger: &Logger, + ) -> RetryConfigNoTimeout { + retry(operation_name, logger) + .limit(DEFAULT_MAX_ATTEMPTS) + .when(move |result: &Result| match result { + Ok(_) => false, + Err(err) => match self { + Self::None => false, + Self::Networking => err.is_networking(), + Self::NonDeterministic => !err.is_deterministic(), + }, + }) + .no_timeout() + } } diff --git a/graph/src/ipfs/rpc_client.rs b/graph/src/ipfs/rpc_client.rs index fb0420606a9..8c0ff5f5acb 100644 --- a/graph/src/ipfs/rpc_client.rs +++ b/graph/src/ipfs/rpc_client.rs @@ -1,39 +1,31 @@ +use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; use async_trait::async_trait; -use bytes::Bytes; -use bytes::BytesMut; use derivative::Derivative; -use futures03::stream::BoxStream; -use futures03::StreamExt; -use futures03::TryStreamExt; -use graph_derive::CheapClone; use http::header::CONTENT_LENGTH; use reqwest::Response; use reqwest::StatusCode; use slog::Logger; -use crate::ipfs::retry_policy::retry_policy; -use crate::ipfs::CanProvide; -use crate::ipfs::Cat; -use crate::ipfs::CatStream; -use crate::ipfs::ContentPath; -use crate::ipfs::GetBlock; use crate::ipfs::IpfsClient; use crate::ipfs::IpfsError; +use crate::ipfs::IpfsRequest; +use crate::ipfs::IpfsResponse; use crate::ipfs::IpfsResult; +use crate::ipfs::RetryPolicy; use crate::ipfs::ServerAddress; /// The request that verifies that the IPFS RPC API is accessible is generally fast because /// it does not involve querying the distributed network. -const TEST_REQUEST_TIMEOUT: Duration = Duration::from_secs(300); +const TEST_REQUEST_TIMEOUT: Duration = Duration::from_secs(60); -#[derive(Clone, CheapClone, Derivative)] -#[derivative(Debug)] /// A client that connects to an IPFS RPC API. /// /// Reference: +#[derive(Clone, Derivative)] +#[derivative(Debug)] pub struct IpfsRpcClient { server_address: ServerAddress, @@ -72,30 +64,31 @@ impl IpfsRpcClient { }) } - pub fn into_boxed(self) -> Box { - Box::new(self) - } - + /// A one-time request sent at client initialization to verify that the specified + /// server address is a valid IPFS RPC server. async fn send_test_request(&self) -> anyhow::Result<()> { - let client = self.to_owned(); - - let ok = retry_policy("IPFS.RPC.send_test_request", &self.logger) - .run(move || { - let client = client.clone(); - - async move { - // While there may be unrelated servers that successfully respond to this - // request, it is good enough to at least filter out unresponsive servers and - // confirm that the server behaves like an IPFS RPC API. - let status = client - .call("version", Some(client.test_request_timeout)) - .await? - .status(); - - Ok(status == StatusCode::OK) + let fut = RetryPolicy::NonDeterministic + .create("IPFS.RPC.send_test_request", &self.logger) + .run({ + let client = self.to_owned(); + + move || { + let client = client.clone(); + + async move { + // While there may be unrelated servers that successfully respond to this + // request, it is good enough to at least filter out unresponsive servers + // and confirm that the server behaves like an IPFS RPC API. + let status = client.send_request("version").await?.status(); + + Ok(status == StatusCode::OK) + } } - }) - .await?; + }); + + let ok = tokio::time::timeout(TEST_REQUEST_TIMEOUT, fut) + .await + .map_err(|_| anyhow!("request timed out"))??; if !ok { return Err(anyhow!("not an RPC API")); @@ -104,21 +97,13 @@ impl IpfsRpcClient { Ok(()) } - async fn call( - &self, - path_and_query: impl AsRef, - timeout: Option, - ) -> IpfsResult { + async fn send_request(&self, path_and_query: impl AsRef) -> IpfsResult { let url = self.url(path_and_query); let mut req = self.http_client.post(url); // Some servers require `content-length` even for an empty body. req = req.header(CONTENT_LENGTH, 0); - if let Some(timeout) = timeout { - req = req.timeout(timeout); - } - Ok(req.send().await?.error_for_status()?) } @@ -128,122 +113,29 @@ impl IpfsRpcClient { } #[async_trait] -impl CanProvide for IpfsRpcClient { - async fn can_provide(&self, path: &ContentPath, timeout: Option) -> IpfsResult { - let client = self.to_owned(); - let path = path.to_owned(); - - retry_policy("IPFS.RPC.can_provide", &self.logger) - .run(move || { - let client = client.clone(); - let path = path.clone(); - - async move { - let status = client - .call(format!("cat?arg={path}&length=1"), timeout) - .await? - .status(); - - Ok(status == StatusCode::OK) - } - }) - .await +impl IpfsClient for IpfsRpcClient { + fn logger(&self) -> &Logger { + &self.logger } -} -#[async_trait] -impl CatStream for IpfsRpcClient { - async fn cat_stream( - &self, - path: &ContentPath, - timeout: Option, - ) -> IpfsResult>> { - let client = self.to_owned(); - let path = path.to_owned(); - - let resp = retry_policy("IPFS.RPC.cat_stream", &self.logger) - .run(move || { - let client = client.clone(); - let path = path.clone(); - - async move { Ok(client.call(format!("cat?arg={path}"), timeout).await?) } - }) - .await?; + async fn call(self: Arc, req: IpfsRequest) -> IpfsResult { + use IpfsRequest::*; - Ok(resp.bytes_stream().err_into().boxed()) - } -} + let (path_and_query, path) = match req { + Cat(path) => (format!("cat?arg={path}"), path), + GetBlock(path) => (format!("block/get?arg={path}"), path), + }; -#[async_trait] -impl Cat for IpfsRpcClient { - async fn cat( - &self, - path: &ContentPath, - max_size: usize, - timeout: Option, - ) -> IpfsResult { - let client = self.to_owned(); - let path = path.to_owned(); - - retry_policy("IPFS.RPC.cat", &self.logger) - .run(move || { - let client = client.clone(); - let path = path.clone(); - - async move { - let content = client - .call(format!("cat?arg={path}"), timeout) - .await? - .bytes_stream() - .err_into() - .try_fold(BytesMut::new(), |mut acc, chunk| async { - acc.extend(chunk); - - if acc.len() > max_size { - return Err(IpfsError::ContentTooLarge { - path: path.clone(), - max_size, - }); - } - - Ok(acc) - }) - .await?; - - Ok(content.into()) - } - }) - .await - } -} + let response = self.send_request(path_and_query).await?; -#[async_trait] -impl GetBlock for IpfsRpcClient { - async fn get_block(&self, path: &ContentPath, timeout: Option) -> IpfsResult { - let client = self.to_owned(); - let path = path.to_owned(); - - retry_policy("IPFS.RPC.get_block", &self.logger) - .run(move || { - let client = client.clone(); - let path = path.clone(); - - async move { - let block = client - .call(format!("block/get?arg={path}"), timeout) - .await? - .bytes() - .await?; - - Ok(block) - } - }) - .await + Ok(IpfsResponse { path, response }) } } #[cfg(test)] mod tests { + use bytes::BytesMut; + use futures03::TryStreamExt; use wiremock::matchers as m; use wiremock::Mock; use wiremock::MockBuilder; @@ -251,6 +143,7 @@ mod tests { use wiremock::ResponseTemplate; use super::*; + use crate::ipfs::ContentPath; use crate::log::discard; const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; @@ -263,12 +156,6 @@ mod tests { Mock::given(m::method("POST")).and(m::path(format!("/api/v0/{path}"))) } - fn mock_can_provide() -> MockBuilder { - mock_post("cat") - .and(m::query_param("arg", CID)) - .and(m::query_param("length", "1")) - } - fn mock_cat() -> MockBuilder { mock_post("cat").and(m::query_param("arg", CID)) } @@ -277,11 +164,11 @@ mod tests { mock_post("block/get").and(m::query_param("arg", CID)) } - async fn make_client() -> (MockServer, IpfsRpcClient) { + async fn make_client() -> (MockServer, Arc) { let server = mock_server().await; let client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap(); - (server, client) + (server, Arc::new(client)) } fn make_path() -> ContentPath { @@ -315,7 +202,7 @@ mod tests { } #[tokio::test] - async fn new_retries_rpc_api_check_on_retriable_errors() { + async fn new_retries_rpc_api_check_on_non_deterministic_errors() { let server = mock_server().await; mock_post("version") @@ -334,21 +221,6 @@ mod tests { IpfsRpcClient::new(server.uri(), &discard()).await.unwrap(); } - #[tokio::test] - async fn new_does_not_retry_rpc_api_check_on_non_retriable_errors() { - let server = mock_server().await; - - mock_post("version") - .respond_with(ResponseTemplate::new(StatusCode::METHOD_NOT_ALLOWED)) - .expect(1) - .mount(&server) - .await; - - IpfsRpcClient::new(server.uri(), &discard()) - .await - .unwrap_err(); - } - #[tokio::test] async fn new_unchecked_creates_the_client_without_checking_the_rpc_api() { let server = mock_server().await; @@ -356,87 +228,6 @@ mod tests { IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap(); } - #[tokio::test] - async fn can_provide_returns_true_when_content_is_available() { - let (server, client) = make_client().await; - - mock_can_provide() - .respond_with(ResponseTemplate::new(StatusCode::OK)) - .expect(1) - .mount(&server) - .await; - - let ok = client.can_provide(&make_path(), None).await.unwrap(); - - assert!(ok); - } - - #[tokio::test] - async fn can_provide_returns_false_when_content_is_not_completely_available() { - let (server, client) = make_client().await; - - mock_can_provide() - .respond_with(ResponseTemplate::new(StatusCode::PARTIAL_CONTENT)) - .expect(1) - .mount(&server) - .await; - - let ok = client.can_provide(&make_path(), None).await.unwrap(); - - assert!(!ok); - } - - #[tokio::test] - async fn can_provide_fails_on_timeout() { - let (server, client) = make_client().await; - - mock_can_provide() - .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(500))) - .expect(1) - .mount(&server) - .await; - - client - .can_provide(&make_path(), Some(ms(300))) - .await - .unwrap_err(); - } - - #[tokio::test] - async fn can_provide_retries_the_request_on_retriable_errors() { - let (server, client) = make_client().await; - - mock_can_provide() - .respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR)) - .up_to_n_times(1) - .expect(1) - .mount(&server) - .await; - - mock_can_provide() - .respond_with(ResponseTemplate::new(StatusCode::OK)) - .expect(1) - .mount(&server) - .await; - - let ok = client.can_provide(&make_path(), None).await.unwrap(); - - assert!(ok); - } - - #[tokio::test] - async fn can_provide_does_not_retry_the_request_on_non_retriable_errors() { - let (server, client) = make_client().await; - - mock_can_provide() - .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) - .expect(1) - .mount(&server) - .await; - - client.can_provide(&make_path(), None).await.unwrap_err(); - } - #[tokio::test] async fn cat_stream_returns_the_content() { let (server, client) = make_client().await; @@ -447,8 +238,8 @@ mod tests { .mount(&server) .await; - let content = client - .cat_stream(&make_path(), None) + let bytes = client + .cat_stream(&make_path(), None, RetryPolicy::None) .await .unwrap() .try_fold(BytesMut::new(), |mut acc, chunk| async { @@ -459,7 +250,7 @@ mod tests { .await .unwrap(); - assert_eq!(content.as_ref(), b"some data"); + assert_eq!(bytes.as_ref(), b"some data"); } #[tokio::test] @@ -472,13 +263,15 @@ mod tests { .mount(&server) .await; - let result = client.cat_stream(&make_path(), Some(ms(300))).await; + let result = client + .cat_stream(&make_path(), Some(ms(300)), RetryPolicy::None) + .await; assert!(matches!(result, Err(_))); } #[tokio::test] - async fn cat_stream_retries_the_request_on_retriable_errors() { + async fn cat_stream_retries_the_request_on_non_deterministic_errors() { let (server, client) = make_client().await; mock_cat() @@ -494,22 +287,10 @@ mod tests { .mount(&server) .await; - let _stream = client.cat_stream(&make_path(), None).await.unwrap(); - } - - #[tokio::test] - async fn cat_stream_does_not_retry_the_request_on_non_retriable_errors() { - let (server, client) = make_client().await; - - mock_cat() - .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) - .expect(1) - .mount(&server) - .await; - - let result = client.cat_stream(&make_path(), None).await; - - assert!(matches!(result, Err(_))); + let _stream = client + .cat_stream(&make_path(), None, RetryPolicy::NonDeterministic) + .await + .unwrap(); } #[tokio::test] @@ -522,9 +303,12 @@ mod tests { .mount(&server) .await; - let content = client.cat(&make_path(), usize::MAX, None).await.unwrap(); + let bytes = client + .cat(&make_path(), usize::MAX, None, RetryPolicy::None) + .await + .unwrap(); - assert_eq!(content.as_ref(), b"some data"); + assert_eq!(bytes.as_ref(), b"some data"); } #[tokio::test] @@ -539,9 +323,12 @@ mod tests { .mount(&server) .await; - let content = client.cat(&make_path(), data.len(), None).await.unwrap(); + let bytes = client + .cat(&make_path(), data.len(), None, RetryPolicy::None) + .await + .unwrap(); - assert_eq!(content.as_ref(), data); + assert_eq!(bytes.as_ref(), data); } #[tokio::test] @@ -557,7 +344,7 @@ mod tests { .await; client - .cat(&make_path(), data.len() - 1, None) + .cat(&make_path(), data.len() - 1, None, RetryPolicy::None) .await .unwrap_err(); } @@ -573,13 +360,13 @@ mod tests { .await; client - .cat(&make_path(), usize::MAX, Some(ms(300))) + .cat(&make_path(), usize::MAX, Some(ms(300)), RetryPolicy::None) .await .unwrap_err(); } #[tokio::test] - async fn cat_retries_the_request_on_retriable_errors() { + async fn cat_retries_the_request_on_non_deterministic_errors() { let (server, client) = make_client().await; mock_cat() @@ -595,25 +382,17 @@ mod tests { .mount(&server) .await; - let content = client.cat(&make_path(), usize::MAX, None).await.unwrap(); - - assert_eq!(content.as_ref(), b"some data"); - } - - #[tokio::test] - async fn cat_does_not_retry_the_request_on_non_retriable_errors() { - let (server, client) = make_client().await; - - mock_cat() - .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) - .expect(1) - .mount(&server) - .await; - - client - .cat(&make_path(), usize::MAX, None) + let bytes = client + .cat( + &make_path(), + usize::MAX, + None, + RetryPolicy::NonDeterministic, + ) .await - .unwrap_err(); + .unwrap(); + + assert_eq!(bytes.as_ref(), b"some data"); } #[tokio::test] @@ -626,9 +405,12 @@ mod tests { .mount(&server) .await; - let block = client.get_block(&make_path(), None).await.unwrap(); + let bytes = client + .get_block(&make_path(), None, RetryPolicy::None) + .await + .unwrap(); - assert_eq!(block.as_ref(), b"some data"); + assert_eq!(bytes.as_ref(), b"some data"); } #[tokio::test] @@ -642,13 +424,13 @@ mod tests { .await; client - .get_block(&make_path(), Some(ms(300))) + .get_block(&make_path(), Some(ms(300)), RetryPolicy::None) .await .unwrap_err(); } #[tokio::test] - async fn get_block_retries_the_request_on_retriable_errors() { + async fn get_block_retries_the_request_on_non_deterministic_errors() { let (server, client) = make_client().await; mock_get_block() @@ -664,21 +446,11 @@ mod tests { .mount(&server) .await; - let block = client.get_block(&make_path(), None).await.unwrap(); - - assert_eq!(block.as_ref(), b"some data"); - } - - #[tokio::test] - async fn get_block_does_not_retry_the_request_on_non_retriable_errors() { - let (server, client) = make_client().await; - - mock_get_block() - .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) - .expect(1) - .mount(&server) - .await; + let bytes = client + .get_block(&make_path(), None, RetryPolicy::NonDeterministic) + .await + .unwrap(); - client.get_block(&make_path(), None).await.unwrap_err(); + assert_eq!(bytes.as_ref(), b"some data"); } } diff --git a/graph/src/ipfs/server_address.rs b/graph/src/ipfs/server_address.rs index dd0026f054e..c7c8bc109f6 100644 --- a/graph/src/ipfs/server_address.rs +++ b/graph/src/ipfs/server_address.rs @@ -8,8 +8,8 @@ use crate::derive::CheapClone; use crate::ipfs::IpfsError; use crate::ipfs::IpfsResult; -#[derive(Clone, Debug, CheapClone)] /// Contains a valid IPFS server address. +#[derive(Clone, Debug, CheapClone)] pub struct ServerAddress { inner: Arc, } diff --git a/runtime/test/src/common.rs b/runtime/test/src/common.rs index 2416a96a198..25e01776629 100644 --- a/runtime/test/src/common.rs +++ b/runtime/test/src/common.rs @@ -65,16 +65,14 @@ fn mock_host_exports( Arc::new(templates.iter().map(|t| t.into()).collect()), ); - let ipfs_client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &LOGGER) - .unwrap() - .into_boxed(); + let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &LOGGER).unwrap(); HostExports::new( subgraph_id, network, ds_details, Arc::new(IpfsResolver::new( - ipfs_client.into(), + Arc::new(client), Arc::new(EnvVars::default()), )), ens_lookup, diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index b61e1f45e40..6da24511615 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -462,13 +462,13 @@ pub async fn setup( let static_filters = env_vars.experimental_static_filters; - let ipfs_client: Arc = graph::ipfs::IpfsRpcClient::new_unchecked( - graph::ipfs::ServerAddress::local_rpc_api(), - &logger, - ) - .unwrap() - .into_boxed() - .into(); + let ipfs_client: Arc = Arc::new( + graph::ipfs::IpfsRpcClient::new_unchecked( + graph::ipfs::ServerAddress::local_rpc_api(), + &logger, + ) + .unwrap(), + ); let link_resolver = Arc::new(IpfsResolver::new( ipfs_client.cheap_clone(), From d56ce135ab22a837eddc859cd5491c73a5ef51b6 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Fri, 8 Nov 2024 13:43:37 +0200 Subject: [PATCH 2/2] ipfs: add more tests --- Cargo.lock | 1 + core/Cargo.toml | 1 + core/src/polling_monitor/ipfs_service.rs | 35 +++++ graph/src/ipfs/retry_policy.rs | 166 +++++++++++++++++++++++ 4 files changed, 203 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 00762bcea58..a5c245b6a69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1988,6 +1988,7 @@ dependencies = [ "tower 0.4.13 (git+https://github.com/tower-rs/tower.git)", "tower-test", "uuid", + "wiremock", ] [[package]] diff --git a/core/Cargo.toml b/core/Cargo.toml index e73834333b1..7c232f60807 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -26,3 +26,4 @@ anyhow = "1.0" [dev-dependencies] tower-test = { git = "https://github.com/tower-rs/tower.git" } uuid = { version = "1.9.1", features = ["v4"] } +wiremock = "0.6.1" diff --git a/core/src/polling_monitor/ipfs_service.rs b/core/src/polling_monitor/ipfs_service.rs index 7e147b90b4b..f8c68976216 100644 --- a/core/src/polling_monitor/ipfs_service.rs +++ b/core/src/polling_monitor/ipfs_service.rs @@ -101,9 +101,14 @@ mod test { use graph::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing; use graph::ipfs::IpfsRpcClient; use graph::ipfs::ServerAddress; + use graph::log::discard; use graph::tokio; use tower::ServiceExt; use uuid::Uuid; + use wiremock::matchers as m; + use wiremock::Mock; + use wiremock::MockServer; + use wiremock::ResponseTemplate; use super::*; @@ -143,4 +148,34 @@ mod test { "#.trim_start().trim_end(); assert_eq!(expected, body); } + + #[tokio::test] + async fn no_client_retries_to_allow_polling_monitor_to_handle_retries_internally() { + const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; + + let server = MockServer::start().await; + let ipfs_client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap(); + let ipfs_service = ipfs_service(Arc::new(ipfs_client), 10, Duration::from_secs(1), 1); + let path = ContentPath::new(CID).unwrap(); + + Mock::given(m::method("POST")) + .and(m::path("/api/v0/cat")) + .and(m::query_param("arg", CID)) + .respond_with(ResponseTemplate::new(500)) + .up_to_n_times(1) + .expect(1) + .mount(&server) + .await; + + Mock::given(m::method("POST")) + .and(m::path("/api/v0/cat")) + .and(m::query_param("arg", CID)) + .respond_with(ResponseTemplate::new(200)) + .expect(..=1) + .mount(&server) + .await; + + // This means that we never reached the successful response. + ipfs_service.oneshot(path).await.unwrap_err(); + } } diff --git a/graph/src/ipfs/retry_policy.rs b/graph/src/ipfs/retry_policy.rs index bd955c12684..37ffec81372 100644 --- a/graph/src/ipfs/retry_policy.rs +++ b/graph/src/ipfs/retry_policy.rs @@ -45,3 +45,169 @@ impl RetryPolicy { .no_timeout() } } + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicU64; + use std::sync::atomic::Ordering; + use std::sync::Arc; + use std::time::Duration; + + use super::*; + use crate::ipfs::ContentPath; + use crate::log::discard; + + const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; + + fn path() -> ContentPath { + ContentPath::new(CID).unwrap() + } + + #[tokio::test] + async fn retry_policy_none_disables_retries() { + let counter = Arc::new(AtomicU64::new(0)); + + let err = RetryPolicy::None + .create::<()>("test", &discard()) + .run({ + let counter = counter.clone(); + move || { + let counter = counter.clone(); + async move { + counter.fetch_add(1, Ordering::SeqCst); + Err(IpfsError::RequestTimeout { path: path() }) + } + } + }) + .await + .unwrap_err(); + + assert_eq!(counter.load(Ordering::SeqCst), 1); + assert!(matches!(err, IpfsError::RequestTimeout { .. })); + } + + #[tokio::test] + async fn retry_policy_networking_retries_only_network_related_errors() { + let counter = Arc::new(AtomicU64::new(0)); + + let err = RetryPolicy::Networking + .create("test", &discard()) + .run({ + let counter = counter.clone(); + move || { + let counter = counter.clone(); + async move { + counter.fetch_add(1, Ordering::SeqCst); + + if counter.load(Ordering::SeqCst) == 10 { + return Err(IpfsError::RequestTimeout { path: path() }); + } + + reqwest::Client::new() + .get("https://simulate-dns-lookup-failure") + .timeout(Duration::from_millis(50)) + .send() + .await?; + + Ok(()) + } + } + }) + .await + .unwrap_err(); + + assert_eq!(counter.load(Ordering::SeqCst), 10); + assert!(matches!(err, IpfsError::RequestTimeout { .. })); + } + + #[tokio::test] + async fn retry_policy_networking_stops_on_success() { + let counter = Arc::new(AtomicU64::new(0)); + + RetryPolicy::Networking + .create("test", &discard()) + .run({ + let counter = counter.clone(); + move || { + let counter = counter.clone(); + async move { + counter.fetch_add(1, Ordering::SeqCst); + + if counter.load(Ordering::SeqCst) == 10 { + return Ok(()); + } + + reqwest::Client::new() + .get("https://simulate-dns-lookup-failure") + .timeout(Duration::from_millis(50)) + .send() + .await?; + + Ok(()) + } + } + }) + .await + .unwrap(); + + assert_eq!(counter.load(Ordering::SeqCst), 10); + } + + #[tokio::test] + async fn retry_policy_non_deterministic_retries_all_non_deterministic_errors() { + let counter = Arc::new(AtomicU64::new(0)); + + let err = RetryPolicy::NonDeterministic + .create::<()>("test", &discard()) + .run({ + let counter = counter.clone(); + move || { + let counter = counter.clone(); + async move { + counter.fetch_add(1, Ordering::SeqCst); + + if counter.load(Ordering::SeqCst) == 10 { + return Err(IpfsError::ContentTooLarge { + path: path(), + max_size: 0, + }); + } + + Err(IpfsError::RequestTimeout { path: path() }) + } + } + }) + .await + .unwrap_err(); + + assert_eq!(counter.load(Ordering::SeqCst), 10); + assert!(matches!(err, IpfsError::ContentTooLarge { .. })); + } + + #[tokio::test] + async fn retry_policy_non_deterministic_stops_on_success() { + let counter = Arc::new(AtomicU64::new(0)); + + RetryPolicy::NonDeterministic + .create("test", &discard()) + .run({ + let counter = counter.clone(); + move || { + let counter = counter.clone(); + async move { + counter.fetch_add(1, Ordering::SeqCst); + + if counter.load(Ordering::SeqCst) == 10 { + return Ok(()); + } + + Err(IpfsError::RequestTimeout { path: path() }) + } + } + }) + .await + .unwrap(); + + assert_eq!(counter.load(Ordering::SeqCst), 10); + } +}