Skip to content

Commit f727dbc

Browse files
mothrannategraf
andauthored
BM-51: Image / Input fetch retry system. (github#95)
This PR adds a configurable retry count with static wait time between fetches. Will allow for brokers to configure a retry counter so IPFS urls have a chance of being indexed between http GET's closes BM-51 --------- Co-authored-by: Victor Graf <[email protected]>
1 parent 9efd40b commit f727dbc

File tree

7 files changed

+146
-41
lines changed

7 files changed

+146
-41
lines changed

broker.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ lookback_blocks = 100
77
max_stake = "0.5"
88
skip_preflight_ids = []
99
max_file_size = 50_000_000
10+
# max_fetch_retries = 2
1011
# allow_client_addresses = []
1112
# lockin_priority_gas = 100
1213

crates/boundless-market/src/storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ pub enum PinataStorageProviderError {
131131
}
132132

133133
const DEFAULT_PINATA_API_URL: &str = "https://api.pinata.cloud";
134-
const DEFAULT_GATEWAY_URL: &str = "https://dweb.link";
134+
const DEFAULT_GATEWAY_URL: &str = "https://gateway.pinata.cloud";
135135

136136
impl PinataStorageProvider {
137137
pub async fn from_env() -> Result<Self, PinataStorageProviderError> {

crates/broker/src/config.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@ pub struct MarketConf {
5151
/// for increasing the priority if competing with multiple provers during the
5252
/// same block
5353
pub lockin_priority_gas: Option<u64>,
54-
/// Max input / image file size
54+
/// Max input / image file size allowed for downloading from request URLs
5555
pub max_file_size: usize,
56+
/// Max retries for fetching input / image contents from URLs
57+
pub max_fetch_retries: Option<u8>,
5658
}
5759

5860
impl Default for MarketConf {
@@ -68,6 +70,7 @@ impl Default for MarketConf {
6870
allow_client_addresses: None,
6971
lockin_priority_gas: None,
7072
max_file_size: 50_000_000,
73+
max_fetch_retries: Some(2),
7174
}
7275
}
7376
}
@@ -330,6 +333,7 @@ lookback_blocks = 100
330333
max_stake = "0.1"
331334
skip_preflight_ids = ["0x0000000000000000000000000000000000000000000000000000000000000001"]
332335
max_file_size = 50_000_000
336+
max_fetch_retries = 10
333337
allow_client_addresses = ["0x0000000000000000000000000000000000000000"]
334338
lockin_priority_gas = 100
335339
@@ -425,6 +429,7 @@ error = ?"#;
425429
assert_eq!(config.market.lookback_blocks, 100);
426430
assert_eq!(config.market.allow_client_addresses, Some(vec![Address::ZERO]));
427431
assert_eq!(config.market.lockin_priority_gas, Some(100));
432+
assert_eq!(config.market.max_fetch_retries, Some(10));
428433
assert_eq!(config.prover.status_poll_ms, 1000);
429434
assert!(config.prover.bonsai_r0_zkvm_ver.is_none());
430435
assert_eq!(config.batcher.txn_timeout, Some(45));

crates/broker/src/lib.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -533,11 +533,18 @@ where
533533
}
534534
}
535535

536-
async fn upload_image_uri(prover: &ProverObj, order: &Order, max_size: usize) -> Result<String> {
537-
let uri = UriHandlerBuilder::new(&order.request.imageUrl)
538-
.set_max_size(max_size)
539-
.build()
540-
.context("Uri parse failure")?;
536+
async fn upload_image_uri(
537+
prover: &ProverObj,
538+
order: &Order,
539+
max_size: usize,
540+
retries: Option<u8>,
541+
) -> Result<String> {
542+
let mut uri = UriHandlerBuilder::new(&order.request.imageUrl).set_max_size(max_size);
543+
544+
if let Some(retry) = retries {
545+
uri = uri.set_retries(retry);
546+
}
547+
let uri = uri.build().context("Uri parse failure")?;
541548

542549
if !uri.exists() {
543550
let image_data = uri
@@ -566,7 +573,12 @@ async fn upload_image_uri(prover: &ProverObj, order: &Order, max_size: usize) ->
566573
Ok(uri.id().context("Invalid image URI type")?)
567574
}
568575
}
569-
async fn upload_input_uri(prover: &ProverObj, order: &Order, max_size: usize) -> Result<String> {
576+
async fn upload_input_uri(
577+
prover: &ProverObj,
578+
order: &Order,
579+
max_size: usize,
580+
retries: Option<u8>,
581+
) -> Result<String> {
570582
Ok(match order.request.input.inputType {
571583
InputType::Inline => prover
572584
.upload_input(order.request.input.data.to_vec())
@@ -577,10 +589,12 @@ async fn upload_input_uri(prover: &ProverObj, order: &Order, max_size: usize) ->
577589
let input_uri_str =
578590
std::str::from_utf8(&order.request.input.data).context("input url is not utf8")?;
579591
tracing::debug!("Input URI string: {input_uri_str}");
580-
let input_uri = UriHandlerBuilder::new(input_uri_str)
581-
.set_max_size(max_size)
582-
.build()
583-
.context("Failed to parse input uri")?;
592+
let mut input_uri = UriHandlerBuilder::new(input_uri_str).set_max_size(max_size);
593+
594+
if let Some(retry) = retries {
595+
input_uri = input_uri.set_retries(retry);
596+
}
597+
let input_uri = input_uri.build().context("Failed to parse input uri")?;
584598

585599
if !input_uri.exists() {
586600
let input_data = input_uri

crates/broker/src/order_picker.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ where
125125
return Ok(());
126126
}
127127

128-
let (skip_preflight, max_size, peak_prove_khz) = {
128+
let (skip_preflight, max_size, peak_prove_khz, fetch_retries) = {
129129
let config = self.config.lock_all().context("Failed to read config")?;
130130
let skip_preflight =
131131
if let Some(skip_preflights) = config.market.skip_preflight_ids.as_ref() {
@@ -134,7 +134,12 @@ where
134134
false
135135
};
136136

137-
(skip_preflight, config.market.max_file_size, config.market.peak_prove_khz)
137+
(
138+
skip_preflight,
139+
config.market.max_file_size,
140+
config.market.peak_prove_khz,
141+
config.market.max_fetch_retries,
142+
)
138143
};
139144

140145
if skip_preflight {
@@ -147,13 +152,13 @@ where
147152
}
148153

149154
// TODO: Move URI handling like this into the prover impls
150-
let image_id = crate::upload_image_uri(&self.prover, order, max_size)
155+
let image_id = crate::upload_image_uri(&self.prover, order, max_size, fetch_retries)
151156
.await
152-
.context("Failed to upload image_id")?;
157+
.context("Failed to fetch and upload image_id")?;
153158

154-
let input_id = crate::upload_input_uri(&self.prover, order, max_size)
159+
let input_id = crate::upload_input_uri(&self.prover, order, max_size, fetch_retries)
155160
.await
156-
.context("Failed to upload input_id")?;
161+
.context("Failed to fetch and upload input_id")?;
157162

158163
// Record the image/input IDs for proving stage
159164
self.db

crates/broker/src/proving.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,22 @@ impl ProvingService {
3939
}
4040

4141
pub async fn prove_order(&self, order_id: U256, order: Order) -> Result<()> {
42-
let max_file_size = {
42+
let (max_file_size, fetch_retries) = {
4343
let config = self.config.lock_all().context("Failed to read config")?;
44-
config.market.max_file_size
44+
(config.market.max_file_size, config.market.max_fetch_retries)
4545
};
4646

4747
// If the ID's are not present then upload them now
4848
// Mostly hit by skipping pre-flight
4949
let image_id = match order.image_id.as_ref() {
5050
Some(val) => val.clone(),
51-
None => crate::upload_image_uri(&self.prover, &order, max_file_size)
51+
None => crate::upload_image_uri(&self.prover, &order, max_file_size, fetch_retries)
5252
.await
5353
.context("Failed to upload image")?,
5454
};
5555
let input_id = match order.input_id.as_ref() {
5656
Some(val) => val.clone(),
57-
None => crate::upload_input_uri(&self.prover, &order, max_file_size)
57+
None => crate::upload_input_uri(&self.prover, &order, max_file_size, fetch_retries)
5858
.await
5959
.context("Failed to upload input")?,
6060
};

crates/broker/src/storage.rs

Lines changed: 99 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ pub enum StorageErr {
3131
#[error("HTTP status error {0}")]
3232
HttpStatusErr(String),
3333

34+
#[error("HTTP fetch failed after {0} retries")]
35+
FetchRetryMax(u8),
36+
3437
#[error("Authority missing")]
3538
AuthorityMissing,
3639

@@ -45,8 +48,11 @@ pub struct UriHandler {
4548
uri: url::Url,
4649
uri_scheme: String,
4750
max_size: Option<usize>,
51+
retries: u8,
4852
}
4953

54+
const DEFAULT_RETRY_NUMB: u8 = 1;
55+
5056
impl UriHandler {
5157
fn supported_scheme(scheme: &str) -> bool {
5258
if risc0_zkvm::is_dev_mode() {
@@ -59,7 +65,11 @@ impl UriHandler {
5965
matches!(authority, "image" | "input")
6066
}
6167

62-
pub fn new(uri_str: &str, max_size: Option<usize>) -> Result<Self, StorageErr> {
68+
pub fn new(
69+
uri_str: &str,
70+
max_size: Option<usize>,
71+
retries: Option<u8>,
72+
) -> Result<Self, StorageErr> {
6373
let uri = url::Url::parse(uri_str)?;
6474

6575
let scheme = uri.scheme().to_string();
@@ -86,7 +96,12 @@ impl UriHandler {
8696
}
8797
}
8898

89-
Ok(Self { uri, uri_scheme: scheme, max_size })
99+
Ok(Self {
100+
uri,
101+
uri_scheme: scheme,
102+
max_size,
103+
retries: retries.unwrap_or(DEFAULT_RETRY_NUMB),
104+
})
90105
}
91106

92107
pub fn exists(&self) -> bool {
@@ -103,15 +118,29 @@ impl UriHandler {
103118
match self.uri_scheme.as_ref() {
104119
"bonsai" => Err(StorageErr::BonsaiFetch),
105120
"http" | "https" => {
106-
let res = reqwest::get(self.uri.to_string()).await?;
107-
let status = res.status();
108-
if !status.is_success() {
109-
let body = res.text().await?;
110-
return Err(StorageErr::HttpStatusErr(format!(
111-
"HTTP fetch err: {} - {}",
112-
status, body
113-
)));
114-
}
121+
let mut retry = 0;
122+
let res = loop {
123+
// TODO: move these ?'s to captures + retries
124+
// currently only retry on http status code failures
125+
let res = reqwest::get(self.uri.to_string()).await?;
126+
let status = res.status();
127+
if status.is_success() {
128+
break res;
129+
} else {
130+
let body = res.text().await?;
131+
tracing::error!(
132+
"HTTP error fetching contents {retry}/{}: {status} - {body}",
133+
self.retries
134+
);
135+
if retry == self.retries {
136+
return Err(StorageErr::FetchRetryMax(self.retries));
137+
}
138+
retry += 1;
139+
// TODO configurable...
140+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
141+
continue;
142+
}
143+
};
115144

116145
let mut buffer = vec![];
117146
if let Some(content_length) = res.content_length() {
@@ -163,20 +192,26 @@ impl UriHandler {
163192
pub struct UriHandlerBuilder {
164193
uri_str: String,
165194
max_size: Option<usize>,
195+
retries: Option<u8>,
166196
}
167197

168198
impl UriHandlerBuilder {
169199
pub fn new(uri_str: &str) -> Self {
170-
Self { uri_str: uri_str.into(), max_size: None }
200+
Self { uri_str: uri_str.into(), max_size: None, retries: None }
171201
}
172202

173203
pub fn set_max_size(mut self, max_size: usize) -> Self {
174204
self.max_size = Some(max_size);
175205
self
176206
}
177207

208+
pub fn set_retries(mut self, retries: u8) -> Self {
209+
self.retries = Some(retries);
210+
self
211+
}
212+
178213
pub fn build(self) -> Result<UriHandler, StorageErr> {
179-
UriHandler::new(&self.uri_str, self.max_size)
214+
UriHandler::new(&self.uri_str, self.max_size, self.retries)
180215
}
181216
}
182217

@@ -189,6 +224,8 @@ impl Display for UriHandler {
189224
#[cfg(test)]
190225
mod tests {
191226
use super::*;
227+
use httpmock::prelude::*;
228+
use tracing_test::traced_test;
192229

193230
#[test]
194231
fn bonsai_uri_parser() {
@@ -230,22 +267,65 @@ mod tests {
230267

231268
#[tokio::test]
232269
async fn http_fetch() {
233-
let handler = UriHandlerBuilder::new("https://risczero.com/")
234-
.set_max_size(1_000_000)
235-
.build()
236-
.unwrap();
270+
let server = MockServer::start();
271+
let resp_data = vec![0x41, 0x41, 0x41, 0x41];
272+
let get_mock = server.mock(|when, then| {
273+
when.method(GET).path("/image");
274+
then.status(200).body(&resp_data);
275+
});
276+
277+
let url = format!("http://{}/image", server.address());
278+
let handler = UriHandlerBuilder::new(&url).set_max_size(1_000_000).build().unwrap();
279+
assert!(!handler.exists());
280+
281+
let data = handler.fetch().await.unwrap();
282+
assert_eq!(data, resp_data);
283+
get_mock.assert();
284+
}
285+
286+
#[traced_test]
287+
#[tokio::test]
288+
async fn http_fetch_retry() {
289+
static mut REQ_COUNT: u32 = 0;
290+
291+
let server = MockServer::start();
292+
let get_mock = server.mock(|when, then| {
293+
when.method(GET).path("/image").matches(|_req: &HttpMockRequest| {
294+
let req = unsafe {
295+
let req = REQ_COUNT;
296+
REQ_COUNT += 1;
297+
req
298+
};
299+
req >= 1
300+
});
301+
then.status(200).body("TEST");
302+
});
303+
304+
let url = format!("http://{}/image", server.address());
305+
let handler =
306+
UriHandlerBuilder::new(&url).set_max_size(1_000_000).set_retries(1).build().unwrap();
237307
assert!(!handler.exists());
238308

239309
let _data = handler.fetch().await.unwrap();
310+
get_mock.assert();
311+
assert!(logs_contain("HTTP error fetching contents 0/1"));
240312
}
241313

242314
#[tokio::test]
243315
#[should_panic(expected = "TooLarge")]
244316
async fn max_size_limit() {
245-
let handler =
246-
UriHandlerBuilder::new("https://risczero.com/").set_max_size(1).build().unwrap();
317+
let server = MockServer::start();
318+
let resp_data = vec![0x41, 0x41, 0x41, 0x41];
319+
let get_mock = server.mock(|when, then| {
320+
when.method(GET).path("/image");
321+
then.status(200).body(&resp_data);
322+
});
323+
324+
let url = format!("http://{}/image", server.address());
325+
let handler = UriHandlerBuilder::new(&url).set_max_size(1).build().unwrap();
247326
assert!(!handler.exists());
248327

249328
let _data = handler.fetch().await.unwrap();
329+
get_mock.assert();
250330
}
251331
}

0 commit comments

Comments
 (0)