Skip to content

Commit 183aee0

Browse files
committed
core, graph: Add with_retries modifier to LinkResolver
This allows to configure the link resolver so that it retries requests indefinitely using our `retry` utility. By default, this will be disabled but we'll want to enable it for the resolver used when loading subgraphs manifest files and dynamic data sources. The reason this would be good is that it would allow us to tolerate intermittent IPFS issues during startup.
1 parent 84ab990 commit 183aee0

File tree

2 files changed

+58
-25
lines changed

2 files changed

+58
-25
lines changed

core/src/link_resolver.rs

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ pub struct LinkResolver {
8585
client: ipfs_api::IpfsClient,
8686
cache: Arc<Mutex<LruCache<String, Vec<u8>>>>,
8787
timeout: Duration,
88+
retry: bool,
8889
}
8990

9091
impl From<ipfs_api::IpfsClient> for LinkResolver {
@@ -95,6 +96,7 @@ impl From<ipfs_api::IpfsClient> for LinkResolver {
9596
*MAX_IPFS_CACHE_SIZE as usize,
9697
))),
9798
timeout: *IPFS_TIMEOUT,
99+
retry: false,
98100
}
99101
}
100102
}
@@ -105,6 +107,11 @@ impl LinkResolverTrait for LinkResolver {
105107
self
106108
}
107109

110+
fn with_retries(mut self) -> Self {
111+
self.retry = true;
112+
self
113+
}
114+
108115
/// Supports links of the form `/ipfs/ipfs_hash` or just `ipfs_hash`.
109116
fn cat(
110117
&self,
@@ -113,6 +120,7 @@ impl LinkResolverTrait for LinkResolver {
113120
) -> Box<dyn Future<Item = Vec<u8>, Error = failure::Error> + Send> {
114121
// Discard the `/ipfs/` prefix (if present) to get the hash.
115122
let path = link.link.trim_start_matches("/ipfs/").to_owned();
123+
let path_for_error = path.clone();
116124

117125
if let Some(data) = self.cache.lock().unwrap().get(&path) {
118126
trace!(logger, "IPFS cache hit"; "hash" => &path);
@@ -121,36 +129,56 @@ impl LinkResolverTrait for LinkResolver {
121129
trace!(logger, "IPFS cache miss"; "hash" => &path);
122130
}
123131

124-
let cat = self
125-
.client
126-
.cat(&path)
127-
.concat2()
128-
.timeout(self.timeout)
129-
.map(|x| x.to_vec())
130-
.map_err(|e| failure::err_msg(e.to_string()));
131-
132+
let client_for_cat = self.client.clone();
133+
let client_for_file_size = self.client.clone();
132134
let cache_for_writing = self.cache.clone();
133135

134136
let max_file_size: Option<u64> = read_u64_from_env(MAX_IPFS_FILE_SIZE_VAR);
137+
let timeout_for_file_size = self.timeout.clone();
138+
139+
let retry_fut = if self.retry {
140+
retry("ipfs.cat", &logger).no_limit()
141+
} else {
142+
retry("ipfs.cat", &logger).limit(1)
143+
};
135144

136145
Box::new(
137-
restrict_file_size(
138-
&self.client,
139-
path.clone(),
140-
self.timeout,
141-
max_file_size,
142-
Box::new(cat),
143-
)
144-
.map(move |data| {
145-
// Only cache files if they are not too large
146-
if data.len() <= *MAX_IPFS_CACHE_FILE_SIZE as usize {
147-
let mut cache = cache_for_writing.lock().unwrap();
148-
if !cache.contains_key(&path) {
149-
cache.insert(path, data.clone());
150-
}
151-
}
152-
data
153-
}),
146+
retry_fut
147+
.timeout(self.timeout)
148+
.run(move || {
149+
let cache_for_writing = cache_for_writing.clone();
150+
let path = path.clone();
151+
152+
let cat = client_for_cat
153+
.cat(&path)
154+
.concat2()
155+
.map(|x| x.to_vec())
156+
.map_err(|e| failure::err_msg(e.to_string()));
157+
158+
restrict_file_size(
159+
&client_for_file_size,
160+
path.clone(),
161+
timeout_for_file_size,
162+
max_file_size,
163+
Box::new(cat),
164+
)
165+
.map(move |data| {
166+
// Only cache files if they are not too large
167+
if data.len() <= *MAX_IPFS_CACHE_FILE_SIZE as usize {
168+
let mut cache = cache_for_writing.lock().unwrap();
169+
if !cache.contains_key(&path) {
170+
cache.insert(path, data.clone());
171+
}
172+
}
173+
data
174+
})
175+
})
176+
.map_err(move |e| {
177+
e.into_inner().unwrap_or(format_err!(
178+
"ipfs.cat took too long or failed to load `{}`",
179+
path_for_error,
180+
))
181+
}),
154182
)
155183
}
156184

graph/src/components/link_resolver.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ pub trait LinkResolver: Send + Sync + 'static {
2424
where
2525
Self: Sized;
2626

27+
/// Enables infinite retries.
28+
fn with_retries(self) -> Self
29+
where
30+
Self: Sized;
31+
2732
/// Fetches the link contents as bytes.
2833
fn cat(
2934
&self,

0 commit comments

Comments
 (0)