Skip to content

Commit aed1a2a

Browse files
isumencalypto
authored andcommitted
Optimize IPFS retries (#5698)
* ipfs: optimize retries * ipfs: add more tests
1 parent 9e1f1a4 commit aed1a2a

File tree

15 files changed

+882
-899
lines changed

15 files changed

+882
-899
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,4 @@ anyhow = "1.0"
2626
[dev-dependencies]
2727
tower-test = { git = "https://github.com/tower-rs/tower.git" }
2828
uuid = { version = "1.9.1", features = ["v4"] }
29+
wiremock = "0.6.1"

core/src/polling_monitor/ipfs_service.rs

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use std::sync::Arc;
22
use std::time::Duration;
33

4-
use anyhow::{anyhow, Error};
4+
use anyhow::anyhow;
5+
use anyhow::Error;
56
use bytes::Bytes;
67
use graph::futures03::future::BoxFuture;
78
use graph::ipfs::ContentPath;
89
use graph::ipfs::IpfsClient;
9-
use graph::ipfs::IpfsError;
10+
use graph::ipfs::RetryPolicy;
1011
use graph::{derive::CheapClone, prelude::CheapClone};
1112
use tower::{buffer::Buffer, ServiceBuilder, ServiceExt};
1213

@@ -50,12 +51,17 @@ impl IpfsServiceInner {
5051

5152
let res = self
5253
.client
53-
.cat(&path, self.max_file_size, Some(self.timeout))
54+
.cat(
55+
&path,
56+
self.max_file_size,
57+
Some(self.timeout),
58+
RetryPolicy::None,
59+
)
5460
.await;
5561

5662
match res {
5763
Ok(file_bytes) => Ok(Some(file_bytes)),
58-
Err(IpfsError::RequestFailed(err)) if err.is_timeout() => {
64+
Err(err) if err.is_timeout() => {
5965
// Timeouts in IPFS mean that the content is not available, so we return `None`.
6066
Ok(None)
6167
}
@@ -95,9 +101,14 @@ mod test {
95101
use graph::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing;
96102
use graph::ipfs::IpfsRpcClient;
97103
use graph::ipfs::ServerAddress;
104+
use graph::log::discard;
98105
use graph::tokio;
99106
use tower::ServiceExt;
100107
use uuid::Uuid;
108+
use wiremock::matchers as m;
109+
use wiremock::Mock;
110+
use wiremock::MockServer;
111+
use wiremock::ResponseTemplate;
101112

102113
use super::*;
103114

@@ -114,10 +125,9 @@ mod test {
114125

115126
let client =
116127
IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard())
117-
.unwrap()
118-
.into_boxed();
128+
.unwrap();
119129

120-
let svc = ipfs_service(client.into(), 100000, Duration::from_secs(30), 10);
130+
let svc = ipfs_service(Arc::new(client), 100000, Duration::from_secs(30), 10);
121131

122132
let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap();
123133
let content = svc.oneshot(path).await.unwrap().unwrap();
@@ -138,4 +148,34 @@ mod test {
138148
"#.trim_start().trim_end();
139149
assert_eq!(expected, body);
140150
}
151+
152+
#[tokio::test]
153+
async fn no_client_retries_to_allow_polling_monitor_to_handle_retries_internally() {
154+
const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn";
155+
156+
let server = MockServer::start().await;
157+
let ipfs_client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap();
158+
let ipfs_service = ipfs_service(Arc::new(ipfs_client), 10, Duration::from_secs(1), 1);
159+
let path = ContentPath::new(CID).unwrap();
160+
161+
Mock::given(m::method("POST"))
162+
.and(m::path("/api/v0/cat"))
163+
.and(m::query_param("arg", CID))
164+
.respond_with(ResponseTemplate::new(500))
165+
.up_to_n_times(1)
166+
.expect(1)
167+
.mount(&server)
168+
.await;
169+
170+
Mock::given(m::method("POST"))
171+
.and(m::path("/api/v0/cat"))
172+
.and(m::query_param("arg", CID))
173+
.respond_with(ResponseTemplate::new(200))
174+
.expect(..=1)
175+
.mount(&server)
176+
.await;
177+
178+
// This means that we never reached the successful response.
179+
ipfs_service.oneshot(path).await.unwrap_err();
180+
}
141181
}

graph/src/components/link_resolver/ipfs.rs

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::futures01::Async;
2121
use crate::futures01::Poll;
2222
use crate::ipfs::ContentPath;
2323
use crate::ipfs::IpfsClient;
24+
use crate::ipfs::RetryPolicy;
2425
use crate::prelude::{LinkResolver as LinkResolverTrait, *};
2526

2627
#[derive(Clone, CheapClone, Derivative)]
@@ -36,6 +37,9 @@ pub struct IpfsResolver {
3637
max_file_size: usize,
3738
max_map_file_size: usize,
3839
max_cache_file_size: usize,
40+
41+
/// When set to `true`, it means infinite retries, ignoring the timeout setting.
42+
retry: bool,
3943
}
4044

4145
impl IpfsResolver {
@@ -51,6 +55,7 @@ impl IpfsResolver {
5155
max_file_size: env.max_ipfs_file_bytes,
5256
max_map_file_size: env.max_ipfs_map_file_size,
5357
max_cache_file_size: env.max_ipfs_cache_file_size,
58+
retry: false,
5459
}
5560
}
5661
}
@@ -64,8 +69,9 @@ impl LinkResolverTrait for IpfsResolver {
6469
}
6570

6671
fn with_retries(&self) -> Box<dyn LinkResolverTrait> {
67-
// IPFS clients have internal retries enabled by default.
68-
Box::new(self.cheap_clone())
72+
let mut s = self.cheap_clone();
73+
s.retry = true;
74+
Box::new(s)
6975
}
7076

7177
async fn cat(&self, logger: &Logger, link: &Link) -> Result<Vec<u8>, Error> {
@@ -81,9 +87,16 @@ impl LinkResolverTrait for IpfsResolver {
8187

8288
trace!(logger, "IPFS cat cache miss"; "hash" => path.to_string());
8389

90+
let (timeout, retry_policy) = if self.retry {
91+
(None, RetryPolicy::NonDeterministic)
92+
} else {
93+
(Some(timeout), RetryPolicy::Networking)
94+
};
95+
8496
let data = self
8597
.client
86-
.cat(&path, max_file_size, Some(timeout))
98+
.clone()
99+
.cat(&path, max_file_size, timeout, retry_policy)
87100
.await?
88101
.to_vec();
89102

@@ -111,20 +124,39 @@ impl LinkResolverTrait for IpfsResolver {
111124

112125
trace!(logger, "IPFS block get"; "hash" => path.to_string());
113126

114-
let data = self.client.get_block(&path, Some(timeout)).await?.to_vec();
127+
let (timeout, retry_policy) = if self.retry {
128+
(None, RetryPolicy::NonDeterministic)
129+
} else {
130+
(Some(timeout), RetryPolicy::Networking)
131+
};
132+
133+
let data = self
134+
.client
135+
.clone()
136+
.get_block(&path, timeout, retry_policy)
137+
.await?
138+
.to_vec();
115139

116140
Ok(data)
117141
}
118142

119143
async fn json_stream(&self, logger: &Logger, link: &Link) -> Result<JsonValueStream, Error> {
120144
let path = ContentPath::new(&link.link)?;
121145
let max_map_file_size = self.max_map_file_size;
146+
let timeout = self.timeout;
122147

123148
trace!(logger, "IPFS JSON stream"; "hash" => path.to_string());
124149

150+
let (timeout, retry_policy) = if self.retry {
151+
(None, RetryPolicy::NonDeterministic)
152+
} else {
153+
(Some(timeout), RetryPolicy::Networking)
154+
};
155+
125156
let mut stream = self
126157
.client
127-
.cat_stream(&path, None)
158+
.clone()
159+
.cat_stream(&path, timeout, retry_policy)
128160
.await?
129161
.fuse()
130162
.boxed()
@@ -228,11 +260,8 @@ mod tests {
228260

229261
let logger = crate::log::discard();
230262

231-
let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger)
232-
.unwrap()
233-
.into_boxed();
234-
235-
let resolver = IpfsResolver::new(client.into(), Arc::new(env_vars));
263+
let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger).unwrap();
264+
let resolver = IpfsResolver::new(Arc::new(client), Arc::new(env_vars));
236265

237266
let err = IpfsResolver::cat(&resolver, &logger, &Link { link: cid.clone() })
238267
.await
@@ -250,9 +279,8 @@ mod tests {
250279
.to_owned();
251280

252281
let logger = crate::log::discard();
253-
let client =
254-
IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger)?.into_boxed();
255-
let resolver = IpfsResolver::new(client.into(), Arc::new(env_vars));
282+
let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger)?;
283+
let resolver = IpfsResolver::new(Arc::new(client), Arc::new(env_vars));
256284

257285
let stream = IpfsResolver::json_stream(&resolver, &logger, &Link { link: cid }).await?;
258286
stream.map_ok(|sv| sv.value).try_collect().await

0 commit comments

Comments
 (0)