Skip to content

Commit b2aee12

Browse files
committed
ipfs: add more tests
1 parent a9a2336 commit b2aee12

File tree

4 files changed

+203
-0
lines changed

4 files changed

+203
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,4 @@ anyhow = "1.0"
2626
[dev-dependencies]
2727
tower-test = { git = "https://github.com/tower-rs/tower.git" }
2828
uuid = { version = "1.9.1", features = ["v4"] }
29+
wiremock = "0.6.1"

core/src/polling_monitor/ipfs_service.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,14 @@ mod test {
101101
use graph::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing;
102102
use graph::ipfs::IpfsRpcClient;
103103
use graph::ipfs::ServerAddress;
104+
use graph::log::discard;
104105
use graph::tokio;
105106
use tower::ServiceExt;
106107
use uuid::Uuid;
108+
use wiremock::matchers as m;
109+
use wiremock::Mock;
110+
use wiremock::MockServer;
111+
use wiremock::ResponseTemplate;
107112

108113
use super::*;
109114

@@ -143,4 +148,34 @@ mod test {
143148
"#.trim_start().trim_end();
144149
assert_eq!(expected, body);
145150
}
151+
152+
#[tokio::test]
153+
async fn no_client_retries_to_allow_polling_monitor_to_handle_retries_internally() {
154+
const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn";
155+
156+
let server = MockServer::start().await;
157+
let ipfs_client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap();
158+
let ipfs_service = ipfs_service(Arc::new(ipfs_client), 10, Duration::from_secs(1), 1);
159+
let path = ContentPath::new(CID).unwrap();
160+
161+
Mock::given(m::method("POST"))
162+
.and(m::path("/api/v0/cat"))
163+
.and(m::query_param("arg", CID))
164+
.respond_with(ResponseTemplate::new(500))
165+
.up_to_n_times(1)
166+
.expect(1)
167+
.mount(&server)
168+
.await;
169+
170+
Mock::given(m::method("POST"))
171+
.and(m::path("/api/v0/cat"))
172+
.and(m::query_param("arg", CID))
173+
.respond_with(ResponseTemplate::new(200))
174+
.expect(..=1)
175+
.mount(&server)
176+
.await;
177+
178+
// This means that we never reached the successful response.
179+
ipfs_service.oneshot(path).await.unwrap_err();
180+
}
146181
}

graph/src/ipfs/retry_policy.rs

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,169 @@ impl RetryPolicy {
4545
.no_timeout()
4646
}
4747
}
48+
49+
#[cfg(test)]
50+
mod tests {
51+
use std::sync::atomic::AtomicU64;
52+
use std::sync::atomic::Ordering;
53+
use std::sync::Arc;
54+
use std::time::Duration;
55+
56+
use super::*;
57+
use crate::ipfs::ContentPath;
58+
use crate::log::discard;
59+
60+
const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn";
61+
62+
fn path() -> ContentPath {
63+
ContentPath::new(CID).unwrap()
64+
}
65+
66+
#[tokio::test]
67+
async fn retry_policy_none_disables_retries() {
68+
let counter = Arc::new(AtomicU64::new(0));
69+
70+
let err = RetryPolicy::None
71+
.create::<()>("test", &discard())
72+
.run({
73+
let counter = counter.clone();
74+
move || {
75+
let counter = counter.clone();
76+
async move {
77+
counter.fetch_add(1, Ordering::SeqCst);
78+
Err(IpfsError::RequestTimeout { path: path() })
79+
}
80+
}
81+
})
82+
.await
83+
.unwrap_err();
84+
85+
assert_eq!(counter.load(Ordering::SeqCst), 1);
86+
assert!(matches!(err, IpfsError::RequestTimeout { .. }));
87+
}
88+
89+
#[tokio::test]
90+
async fn retry_policy_networking_retries_only_network_related_errors() {
91+
let counter = Arc::new(AtomicU64::new(0));
92+
93+
let err = RetryPolicy::Networking
94+
.create("test", &discard())
95+
.run({
96+
let counter = counter.clone();
97+
move || {
98+
let counter = counter.clone();
99+
async move {
100+
counter.fetch_add(1, Ordering::SeqCst);
101+
102+
if counter.load(Ordering::SeqCst) == 10 {
103+
return Err(IpfsError::RequestTimeout { path: path() });
104+
}
105+
106+
reqwest::Client::new()
107+
.get("https://simulate-dns-lookup-failure")
108+
.timeout(Duration::from_millis(50))
109+
.send()
110+
.await?;
111+
112+
Ok(())
113+
}
114+
}
115+
})
116+
.await
117+
.unwrap_err();
118+
119+
assert_eq!(counter.load(Ordering::SeqCst), 10);
120+
assert!(matches!(err, IpfsError::RequestTimeout { .. }));
121+
}
122+
123+
#[tokio::test]
124+
async fn retry_policy_networking_stops_on_success() {
125+
let counter = Arc::new(AtomicU64::new(0));
126+
127+
RetryPolicy::Networking
128+
.create("test", &discard())
129+
.run({
130+
let counter = counter.clone();
131+
move || {
132+
let counter = counter.clone();
133+
async move {
134+
counter.fetch_add(1, Ordering::SeqCst);
135+
136+
if counter.load(Ordering::SeqCst) == 10 {
137+
return Ok(());
138+
}
139+
140+
reqwest::Client::new()
141+
.get("https://simulate-dns-lookup-failure")
142+
.timeout(Duration::from_millis(50))
143+
.send()
144+
.await?;
145+
146+
Ok(())
147+
}
148+
}
149+
})
150+
.await
151+
.unwrap();
152+
153+
assert_eq!(counter.load(Ordering::SeqCst), 10);
154+
}
155+
156+
#[tokio::test]
157+
async fn retry_policy_non_deterministic_retries_all_non_deterministic_errors() {
158+
let counter = Arc::new(AtomicU64::new(0));
159+
160+
let err = RetryPolicy::NonDeterministic
161+
.create::<()>("test", &discard())
162+
.run({
163+
let counter = counter.clone();
164+
move || {
165+
let counter = counter.clone();
166+
async move {
167+
counter.fetch_add(1, Ordering::SeqCst);
168+
169+
if counter.load(Ordering::SeqCst) == 10 {
170+
return Err(IpfsError::ContentTooLarge {
171+
path: path(),
172+
max_size: 0,
173+
});
174+
}
175+
176+
Err(IpfsError::RequestTimeout { path: path() })
177+
}
178+
}
179+
})
180+
.await
181+
.unwrap_err();
182+
183+
assert_eq!(counter.load(Ordering::SeqCst), 10);
184+
assert!(matches!(err, IpfsError::ContentTooLarge { .. }));
185+
}
186+
187+
#[tokio::test]
188+
async fn retry_policy_non_deterministic_stops_on_success() {
189+
let counter = Arc::new(AtomicU64::new(0));
190+
191+
RetryPolicy::NonDeterministic
192+
.create("test", &discard())
193+
.run({
194+
let counter = counter.clone();
195+
move || {
196+
let counter = counter.clone();
197+
async move {
198+
counter.fetch_add(1, Ordering::SeqCst);
199+
200+
if counter.load(Ordering::SeqCst) == 10 {
201+
return Ok(());
202+
}
203+
204+
Err(IpfsError::RequestTimeout { path: path() })
205+
}
206+
}
207+
})
208+
.await
209+
.unwrap();
210+
211+
assert_eq!(counter.load(Ordering::SeqCst), 10);
212+
}
213+
}

0 commit comments

Comments
 (0)