Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ anyhow = "1.0"
[dev-dependencies]
tower-test = { git = "https://github.com/tower-rs/tower.git" }
uuid = { version = "1.9.1", features = ["v4"] }
wiremock = "0.6.1"
54 changes: 47 additions & 7 deletions core/src/polling_monitor/ipfs_service.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Error};
use anyhow::anyhow;
use anyhow::Error;
use bytes::Bytes;
use graph::futures03::future::BoxFuture;
use graph::ipfs::ContentPath;
use graph::ipfs::IpfsClient;
use graph::ipfs::IpfsError;
use graph::ipfs::RetryPolicy;
use graph::{derive::CheapClone, prelude::CheapClone};
use tower::{buffer::Buffer, ServiceBuilder, ServiceExt};

Expand Down Expand Up @@ -50,12 +51,17 @@ impl IpfsServiceInner {

let res = self
.client
.cat(&path, self.max_file_size, Some(self.timeout))
.cat(
&path,
self.max_file_size,
Some(self.timeout),
RetryPolicy::None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way we can make sure this is tested to prevent regression?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IPFS client's internal retries caused the high number of IPFS requests here, because the polling monitor has it's own infinite retries with priorities. By disabling the internal retries of the IPFS client, we allow the polling monitor to behave as expected and rely on it's retry functionality.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. It's kind of hard to differentiate the different layers of retries. I'm happy to leave it as is since now we know it won't retry on the client level.

)
.await;

match res {
Ok(file_bytes) => Ok(Some(file_bytes)),
Err(IpfsError::RequestFailed(err)) if err.is_timeout() => {
Err(err) if err.is_timeout() => {
// Timeouts in IPFS mean that the content is not available, so we return `None`.
Ok(None)
}
Expand Down Expand Up @@ -95,9 +101,14 @@ mod test {
use graph::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing;
use graph::ipfs::IpfsRpcClient;
use graph::ipfs::ServerAddress;
use graph::log::discard;
use graph::tokio;
use tower::ServiceExt;
use uuid::Uuid;
use wiremock::matchers as m;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;

use super::*;

Expand All @@ -114,10 +125,9 @@ mod test {

let client =
IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard())
.unwrap()
.into_boxed();
.unwrap();

let svc = ipfs_service(client.into(), 100000, Duration::from_secs(30), 10);
let svc = ipfs_service(Arc::new(client), 100000, Duration::from_secs(30), 10);

let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap();
let content = svc.oneshot(path).await.unwrap().unwrap();
Expand All @@ -138,4 +148,34 @@ mod test {
"#.trim_start().trim_end();
assert_eq!(expected, body);
}

#[tokio::test]
async fn no_client_retries_to_allow_polling_monitor_to_handle_retries_internally() {
const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn";

let server = MockServer::start().await;
let ipfs_client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap();
let ipfs_service = ipfs_service(Arc::new(ipfs_client), 10, Duration::from_secs(1), 1);
let path = ContentPath::new(CID).unwrap();

Mock::given(m::method("POST"))
.and(m::path("/api/v0/cat"))
.and(m::query_param("arg", CID))
.respond_with(ResponseTemplate::new(500))
.up_to_n_times(1)
.expect(1)
.mount(&server)
.await;

Mock::given(m::method("POST"))
.and(m::path("/api/v0/cat"))
.and(m::query_param("arg", CID))
.respond_with(ResponseTemplate::new(200))
.expect(..=1)
.mount(&server)
.await;

// This means that we never reached the successful response.
ipfs_service.oneshot(path).await.unwrap_err();
}
}
54 changes: 41 additions & 13 deletions graph/src/components/link_resolver/ipfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::futures01::Async;
use crate::futures01::Poll;
use crate::ipfs::ContentPath;
use crate::ipfs::IpfsClient;
use crate::ipfs::RetryPolicy;
use crate::prelude::{LinkResolver as LinkResolverTrait, *};

#[derive(Clone, CheapClone, Derivative)]
Expand All @@ -36,6 +37,9 @@ pub struct IpfsResolver {
max_file_size: usize,
max_map_file_size: usize,
max_cache_file_size: usize,

/// When set to `true`, it means infinite retries, ignoring the timeout setting.
retry: bool,
}

impl IpfsResolver {
Expand All @@ -51,6 +55,7 @@ impl IpfsResolver {
max_file_size: env.max_ipfs_file_bytes,
max_map_file_size: env.max_ipfs_map_file_size,
max_cache_file_size: env.max_ipfs_cache_file_size,
retry: false,
}
}
}
Expand All @@ -64,8 +69,9 @@ impl LinkResolverTrait for IpfsResolver {
}

fn with_retries(&self) -> Box<dyn LinkResolverTrait> {
// IPFS clients have internal retries enabled by default.
Box::new(self.cheap_clone())
let mut s = self.cheap_clone();
s.retry = true;
Box::new(s)
}

async fn cat(&self, logger: &Logger, link: &Link) -> Result<Vec<u8>, Error> {
Expand All @@ -81,9 +87,16 @@ impl LinkResolverTrait for IpfsResolver {

trace!(logger, "IPFS cat cache miss"; "hash" => path.to_string());

let (timeout, retry_policy) = if self.retry {
(None, RetryPolicy::NonDeterministic)
} else {
(Some(timeout), RetryPolicy::Networking)
};

let data = self
.client
.cat(&path, max_file_size, Some(timeout))
.clone()
.cat(&path, max_file_size, timeout, retry_policy)
.await?
.to_vec();

Expand Down Expand Up @@ -111,20 +124,39 @@ impl LinkResolverTrait for IpfsResolver {

trace!(logger, "IPFS block get"; "hash" => path.to_string());

let data = self.client.get_block(&path, Some(timeout)).await?.to_vec();
let (timeout, retry_policy) = if self.retry {
(None, RetryPolicy::NonDeterministic)
} else {
(Some(timeout), RetryPolicy::Networking)
};

let data = self
.client
.clone()
.get_block(&path, timeout, retry_policy)
.await?
.to_vec();

Ok(data)
}

async fn json_stream(&self, logger: &Logger, link: &Link) -> Result<JsonValueStream, Error> {
let path = ContentPath::new(&link.link)?;
let max_map_file_size = self.max_map_file_size;
let timeout = self.timeout;

trace!(logger, "IPFS JSON stream"; "hash" => path.to_string());

let (timeout, retry_policy) = if self.retry {
(None, RetryPolicy::NonDeterministic)
} else {
(Some(timeout), RetryPolicy::Networking)
};

let mut stream = self
.client
.cat_stream(&path, None)
.clone()
.cat_stream(&path, timeout, retry_policy)
.await?
.fuse()
.boxed()
Expand Down Expand Up @@ -228,11 +260,8 @@ mod tests {

let logger = crate::log::discard();

let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger)
.unwrap()
.into_boxed();

let resolver = IpfsResolver::new(client.into(), Arc::new(env_vars));
let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger).unwrap();
let resolver = IpfsResolver::new(Arc::new(client), Arc::new(env_vars));

let err = IpfsResolver::cat(&resolver, &logger, &Link { link: cid.clone() })
.await
Expand All @@ -250,9 +279,8 @@ mod tests {
.to_owned();

let logger = crate::log::discard();
let client =
IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger)?.into_boxed();
let resolver = IpfsResolver::new(client.into(), Arc::new(env_vars));
let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger)?;
let resolver = IpfsResolver::new(Arc::new(client), Arc::new(env_vars));

let stream = IpfsResolver::json_stream(&resolver, &logger, &Link { link: cid }).await?;
stream.map_ok(|sv| sv.value).try_collect().await
Expand Down
Loading
Loading