Skip to content

Commit a9a2336

Browse files
committed
ipfs: optimize retries
1 parent fc80657 commit a9a2336

File tree

13 files changed

+678
-898
lines changed

13 files changed

+678
-898
lines changed

core/src/polling_monitor/ipfs_service.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use std::sync::Arc;
22
use std::time::Duration;
33

4-
use anyhow::{anyhow, Error};
4+
use anyhow::anyhow;
5+
use anyhow::Error;
56
use bytes::Bytes;
67
use graph::futures03::future::BoxFuture;
78
use graph::ipfs::ContentPath;
89
use graph::ipfs::IpfsClient;
9-
use graph::ipfs::IpfsError;
10+
use graph::ipfs::RetryPolicy;
1011
use graph::{derive::CheapClone, prelude::CheapClone};
1112
use tower::{buffer::Buffer, ServiceBuilder, ServiceExt};
1213

@@ -50,12 +51,17 @@ impl IpfsServiceInner {
5051

5152
let res = self
5253
.client
53-
.cat(&path, self.max_file_size, Some(self.timeout))
54+
.cat(
55+
&path,
56+
self.max_file_size,
57+
Some(self.timeout),
58+
RetryPolicy::None,
59+
)
5460
.await;
5561

5662
match res {
5763
Ok(file_bytes) => Ok(Some(file_bytes)),
58-
Err(IpfsError::RequestFailed(err)) if err.is_timeout() => {
64+
Err(err) if err.is_timeout() => {
5965
// Timeouts in IPFS mean that the content is not available, so we return `None`.
6066
Ok(None)
6167
}
@@ -114,10 +120,9 @@ mod test {
114120

115121
let client =
116122
IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard())
117-
.unwrap()
118-
.into_boxed();
123+
.unwrap();
119124

120-
let svc = ipfs_service(client.into(), 100000, Duration::from_secs(30), 10);
125+
let svc = ipfs_service(Arc::new(client), 100000, Duration::from_secs(30), 10);
121126

122127
let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap();
123128
let content = svc.oneshot(path).await.unwrap().unwrap();

graph/src/components/link_resolver/ipfs.rs

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::futures01::Async;
2121
use crate::futures01::Poll;
2222
use crate::ipfs::ContentPath;
2323
use crate::ipfs::IpfsClient;
24+
use crate::ipfs::RetryPolicy;
2425
use crate::prelude::{LinkResolver as LinkResolverTrait, *};
2526

2627
#[derive(Clone, CheapClone, Derivative)]
@@ -36,6 +37,9 @@ pub struct IpfsResolver {
3637
max_file_size: usize,
3738
max_map_file_size: usize,
3839
max_cache_file_size: usize,
40+
41+
/// When set to `true`, it means infinite retries, ignoring the timeout setting.
42+
retry: bool,
3943
}
4044

4145
impl IpfsResolver {
@@ -51,6 +55,7 @@ impl IpfsResolver {
5155
max_file_size: env.max_ipfs_file_bytes,
5256
max_map_file_size: env.max_ipfs_map_file_size,
5357
max_cache_file_size: env.max_ipfs_cache_file_size,
58+
retry: false,
5459
}
5560
}
5661
}
@@ -64,8 +69,9 @@ impl LinkResolverTrait for IpfsResolver {
6469
}
6570

6671
fn with_retries(&self) -> Box<dyn LinkResolverTrait> {
67-
// IPFS clients have internal retries enabled by default.
68-
Box::new(self.cheap_clone())
72+
let mut s = self.cheap_clone();
73+
s.retry = true;
74+
Box::new(s)
6975
}
7076

7177
async fn cat(&self, logger: &Logger, link: &Link) -> Result<Vec<u8>, Error> {
@@ -81,9 +87,16 @@ impl LinkResolverTrait for IpfsResolver {
8187

8288
trace!(logger, "IPFS cat cache miss"; "hash" => path.to_string());
8389

90+
let (timeout, retry_policy) = if self.retry {
91+
(None, RetryPolicy::NonDeterministic)
92+
} else {
93+
(Some(timeout), RetryPolicy::Networking)
94+
};
95+
8496
let data = self
8597
.client
86-
.cat(&path, max_file_size, Some(timeout))
98+
.clone()
99+
.cat(&path, max_file_size, timeout, retry_policy)
87100
.await?
88101
.to_vec();
89102

@@ -111,20 +124,39 @@ impl LinkResolverTrait for IpfsResolver {
111124

112125
trace!(logger, "IPFS block get"; "hash" => path.to_string());
113126

114-
let data = self.client.get_block(&path, Some(timeout)).await?.to_vec();
127+
let (timeout, retry_policy) = if self.retry {
128+
(None, RetryPolicy::NonDeterministic)
129+
} else {
130+
(Some(timeout), RetryPolicy::Networking)
131+
};
132+
133+
let data = self
134+
.client
135+
.clone()
136+
.get_block(&path, timeout, retry_policy)
137+
.await?
138+
.to_vec();
115139

116140
Ok(data)
117141
}
118142

119143
async fn json_stream(&self, logger: &Logger, link: &Link) -> Result<JsonValueStream, Error> {
120144
let path = ContentPath::new(&link.link)?;
121145
let max_map_file_size = self.max_map_file_size;
146+
let timeout = self.timeout;
122147

123148
trace!(logger, "IPFS JSON stream"; "hash" => path.to_string());
124149

150+
let (timeout, retry_policy) = if self.retry {
151+
(None, RetryPolicy::NonDeterministic)
152+
} else {
153+
(Some(timeout), RetryPolicy::Networking)
154+
};
155+
125156
let mut stream = self
126157
.client
127-
.cat_stream(&path, None)
158+
.clone()
159+
.cat_stream(&path, timeout, retry_policy)
128160
.await?
129161
.fuse()
130162
.boxed()
@@ -228,11 +260,8 @@ mod tests {
228260

229261
let logger = crate::log::discard();
230262

231-
let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger)
232-
.unwrap()
233-
.into_boxed();
234-
235-
let resolver = IpfsResolver::new(client.into(), Arc::new(env_vars));
263+
let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger).unwrap();
264+
let resolver = IpfsResolver::new(Arc::new(client), Arc::new(env_vars));
236265

237266
let err = IpfsResolver::cat(&resolver, &logger, &Link { link: cid.clone() })
238267
.await
@@ -250,9 +279,8 @@ mod tests {
250279
.to_owned();
251280

252281
let logger = crate::log::discard();
253-
let client =
254-
IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger)?.into_boxed();
255-
let resolver = IpfsResolver::new(client.into(), Arc::new(env_vars));
282+
let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger)?;
283+
let resolver = IpfsResolver::new(Arc::new(client), Arc::new(env_vars));
256284

257285
let stream = IpfsResolver::json_stream(&resolver, &logger, &Link { link: cid }).await?;
258286
stream.map_ok(|sv| sv.value).try_collect().await

graph/src/ipfs/client.rs

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
use std::future::Future;
2+
use std::sync::Arc;
3+
use std::time::Duration;
4+
5+
use async_trait::async_trait;
6+
use bytes::Bytes;
7+
use bytes::BytesMut;
8+
use futures03::stream::BoxStream;
9+
use futures03::StreamExt;
10+
use futures03::TryStreamExt;
11+
use slog::Logger;
12+
13+
use crate::ipfs::ContentPath;
14+
use crate::ipfs::IpfsError;
15+
use crate::ipfs::IpfsResult;
16+
use crate::ipfs::RetryPolicy;
17+
18+
/// A read-only connection to an IPFS server.
19+
#[async_trait]
20+
pub trait IpfsClient: Send + Sync + 'static {
21+
/// Returns the logger associated with the client.
22+
fn logger(&self) -> &Logger;
23+
24+
/// Sends a request to the IPFS server and returns a raw response.
25+
async fn call(self: Arc<Self>, req: IpfsRequest) -> IpfsResult<IpfsResponse>;
26+
27+
/// Streams data from the specified content path.
28+
///
29+
/// If a timeout is specified, the execution will be aborted if the IPFS server
30+
/// does not return a response within the specified amount of time.
31+
///
32+
/// The timeout is not propagated to the resulting stream.
33+
async fn cat_stream(
34+
self: Arc<Self>,
35+
path: &ContentPath,
36+
timeout: Option<Duration>,
37+
retry_policy: RetryPolicy,
38+
) -> IpfsResult<BoxStream<'static, IpfsResult<Bytes>>> {
39+
let fut = retry_policy.create("IPFS.cat_stream", self.logger()).run({
40+
let path = path.to_owned();
41+
42+
move || {
43+
let path = path.clone();
44+
let client = self.clone();
45+
46+
async move { client.call(IpfsRequest::Cat(path)).await }
47+
}
48+
});
49+
50+
let resp = run_with_optional_timeout(path, fut, timeout).await?;
51+
52+
Ok(resp.bytes_stream())
53+
}
54+
55+
/// Downloads data from the specified content path.
56+
///
57+
/// If a timeout is specified, the execution will be aborted if the IPFS server
58+
/// does not return a response within the specified amount of time.
59+
async fn cat(
60+
self: Arc<Self>,
61+
path: &ContentPath,
62+
max_size: usize,
63+
timeout: Option<Duration>,
64+
retry_policy: RetryPolicy,
65+
) -> IpfsResult<Bytes> {
66+
let fut = retry_policy.create("IPFS.cat", self.logger()).run({
67+
let path = path.to_owned();
68+
69+
move || {
70+
let path = path.clone();
71+
let client = self.clone();
72+
73+
async move {
74+
client
75+
.call(IpfsRequest::Cat(path))
76+
.await?
77+
.bytes(Some(max_size))
78+
.await
79+
}
80+
}
81+
});
82+
83+
run_with_optional_timeout(path, fut, timeout).await
84+
}
85+
86+
/// Downloads an IPFS block in raw format.
87+
///
88+
/// If a timeout is specified, the execution will be aborted if the IPFS server
89+
/// does not return a response within the specified amount of time.
90+
async fn get_block(
91+
self: Arc<Self>,
92+
path: &ContentPath,
93+
timeout: Option<Duration>,
94+
retry_policy: RetryPolicy,
95+
) -> IpfsResult<Bytes> {
96+
let fut = retry_policy.create("IPFS.get_block", self.logger()).run({
97+
let path = path.to_owned();
98+
99+
move || {
100+
let path = path.clone();
101+
let client = self.clone();
102+
103+
async move {
104+
client
105+
.call(IpfsRequest::GetBlock(path))
106+
.await?
107+
.bytes(None)
108+
.await
109+
}
110+
}
111+
});
112+
113+
run_with_optional_timeout(path, fut, timeout).await
114+
}
115+
}
116+
117+
/// Describes a request to an IPFS server.
118+
#[derive(Clone, Debug)]
119+
pub enum IpfsRequest {
120+
Cat(ContentPath),
121+
GetBlock(ContentPath),
122+
}
123+
124+
/// Contains a raw, successful IPFS response.
125+
#[derive(Debug)]
126+
pub struct IpfsResponse {
127+
pub(super) path: ContentPath,
128+
pub(super) response: reqwest::Response,
129+
}
130+
131+
impl IpfsResponse {
132+
/// Reads and returns the response body.
133+
///
134+
/// If the max size is specified and the response body is larger than the max size,
135+
/// execution will result in an error.
136+
pub async fn bytes(self, max_size: Option<usize>) -> IpfsResult<Bytes> {
137+
let Some(max_size) = max_size else {
138+
return self.response.bytes().await.map_err(Into::into);
139+
};
140+
141+
let bytes = self
142+
.response
143+
.bytes_stream()
144+
.err_into()
145+
.try_fold(BytesMut::new(), |mut acc, chunk| async {
146+
acc.extend(chunk);
147+
148+
if acc.len() > max_size {
149+
return Err(IpfsError::ContentTooLarge {
150+
path: self.path.clone(),
151+
max_size,
152+
});
153+
}
154+
155+
Ok(acc)
156+
})
157+
.await?;
158+
159+
Ok(bytes.into())
160+
}
161+
162+
/// Converts the response into a stream of bytes from the body.
163+
pub fn bytes_stream(self) -> BoxStream<'static, IpfsResult<Bytes>> {
164+
self.response.bytes_stream().err_into().boxed()
165+
}
166+
}
167+
168+
async fn run_with_optional_timeout<F, O>(
169+
path: &ContentPath,
170+
fut: F,
171+
timeout: Option<Duration>,
172+
) -> IpfsResult<O>
173+
where
174+
F: Future<Output = IpfsResult<O>>,
175+
{
176+
match timeout {
177+
Some(timeout) => {
178+
tokio::time::timeout(timeout, fut)
179+
.await
180+
.map_err(|_| IpfsError::RequestTimeout {
181+
path: path.to_owned(),
182+
})?
183+
}
184+
None => fut.await,
185+
}
186+
}

graph/src/ipfs/content_path.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use cid::Cid;
44
use crate::ipfs::IpfsError;
55
use crate::ipfs::IpfsResult;
66

7-
#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
87
/// Represents a path to some data on IPFS.
8+
#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
99
pub struct ContentPath {
1010
cid: Cid,
1111
path: Option<String>,

0 commit comments

Comments
 (0)