Skip to content

Commit 819ef5e

Browse files
authored
Merge pull request #49 from RCasatta/timeouts
Timeouts and logging
2 parents f401cce + d81d7ce commit 819ef5e

File tree

5 files changed

+55
-23
lines changed

5 files changed

+55
-23
lines changed

src/fetch.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ const BS: &str = "https://blockstream.info";
5757
const LOCAL: &str = "http://127.0.0.1";
5858

5959
impl Client {
60-
pub fn new(args: &Arguments) -> Client {
60+
pub fn new(args: &Arguments) -> Result<Client> {
61+
args.is_valid()?;
6162
let esplora_url = match args.network {
6263
Network::Liquid => args
6364
.esplora_url
@@ -89,13 +90,19 @@ impl Client {
8990
node_url.unwrap_or(format!("{LOCAL}:{port}"))
9091
};
9192
log::info!("connecting to {base_url}");
92-
Client {
93-
client: reqwest::Client::new(),
93+
let client = reqwest::Client::builder()
94+
.timeout(Duration::from_secs(args.request_timeout_seconds))
95+
.connect_timeout(Duration::from_secs(args.request_timeout_seconds)) // Connection establishment timeout
96+
.build()
97+
.with_context(|| "Failed to create HTTP client with timeout")?;
98+
99+
Ok(Client {
100+
client,
94101
use_esplora,
95102
base_url,
96103
esplora_url,
97104
rpc_user_password: args.rpc_user_password.clone(),
98-
}
105+
})
99106
}
100107

101108
// `curl http://127.0.0.1:7041/rest/blockhashbyheight/0.hex`
@@ -488,9 +495,10 @@ mod test {
488495
let _ = env_logger::try_init();
489496
let mut args = Arguments::default();
490497
args.use_esplora = true;
498+
args.request_timeout_seconds = 30;
491499
for network in [Network::Bitcoin, Network::Liquid, Network::LiquidTestnet] {
492500
args.network = network;
493-
let client = Client::new(&args);
501+
let client = Client::new(&args).unwrap();
494502
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
495503
test(client, network).await;
496504
}
@@ -505,7 +513,9 @@ mod test {
505513
args.use_esplora = false;
506514
args.network = Network::ElementsRegtest;
507515
args.node_url = Some(elementsd.rpc_url());
508-
let client = Client::new(&args);
516+
args.request_timeout_seconds = 10;
517+
args.rpc_user_password = Some(elementsd.params.cookie_file.to_string_lossy().to_string());
518+
let client = Client::new(&args).unwrap();
509519
test(client, args.network).await;
510520
}
511521

@@ -535,7 +545,7 @@ mod test {
535545
let mut args = Arguments::default();
536546
args.use_esplora = false;
537547
args.network = network;
538-
Client::new(&args)
548+
Client::new(&args).unwrap()
539549
}
540550

541551
#[tokio::test]
@@ -548,7 +558,9 @@ mod test {
548558
args.use_esplora = false;
549559
args.network = Network::BitcoinRegtest;
550560
args.node_url = Some(bitcoind.rpc_url());
551-
let client = Client::new(&args);
561+
args.request_timeout_seconds = 10;
562+
args.rpc_user_password = Some(bitcoind.params.cookie_file.to_string_lossy().to_string());
563+
let client = Client::new(&args).unwrap();
552564
test(client, args.network).await;
553565
}
554566

src/server/mod.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ pub struct Arguments {
117117
/// Cache control duration in seconds for waterfalls endpoints. Set to 0 to disable cache control headers.
118118
#[arg(env, long, default_value = "5")]
119119
pub cache_control_seconds: u32,
120+
121+
/// Timeout in seconds for connect and for reques HTTP requests and to the node or esplora
122+
#[arg(env, long, default_value = "30")]
123+
pub request_timeout_seconds: u64,
120124
}
121125

122126
impl Arguments {
@@ -125,6 +129,10 @@ impl Arguments {
125129
Err(Error::String(
126130
"When using the node you must specify --rpc-user-password".to_string(),
127131
))
132+
} else if self.request_timeout_seconds == 0 {
133+
Err(Error::String(
134+
"Request timeout must be greater than 0".to_string(),
135+
))
128136
} else {
129137
Ok(())
130138
}
@@ -264,7 +272,6 @@ pub async fn inner_main(
264272
shutdown_signal: impl Future<Output = ()>,
265273
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
266274
log::info!("starting waterfalls");
267-
args.is_valid()?;
268275

269276
let store = get_store(&args)?;
270277

@@ -307,7 +314,8 @@ pub async fn inner_main(
307314

308315
let h1 = {
309316
let state = state.clone();
310-
let client: Client = Client::new(&args);
317+
let client: Client =
318+
Client::new(&args).unwrap_or_else(|e| error_panic!("Failed to create client: {e}"));
311319
let shutdown_rx = shutdown_tx.subscribe();
312320
tokio::spawn(async move {
313321
let shutdown_future = async {
@@ -328,7 +336,8 @@ pub async fn inner_main(
328336

329337
let h2 = {
330338
let state = state.clone();
331-
let client = Client::new(&args);
339+
let client =
340+
Client::new(&args).unwrap_or_else(|e| error_panic!("Failed to create client: {e}"));
332341
let shutdown_rx = shutdown_tx.subscribe();
333342
tokio::spawn(async move {
334343
let shutdown_future = async {
@@ -353,7 +362,7 @@ pub async fn inner_main(
353362
log::info!("Starting on http://{addr}");
354363

355364
let listener = TcpListener::bind(addr).await?;
356-
let client = Client::new(&args);
365+
let client = Client::new(&args)?;
357366
let client = Arc::new(Mutex::new(client));
358367
let mut signal = std::pin::pin!(shutdown_signal);
359368

src/store/db.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ impl DBStore {
113113
block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
114114

115115
if name == HISTORY_CF {
116-
// bloom filter are useful only for gets with a key miss, which happens a lot for history and never for utxo
116+
// bloom filter are useful only for gets with a key miss,
117+
// which happens a lot for history and never for utxo.
118+
// TODO actually not true for utxo_only query
117119
block_opts.set_bloom_filter(10.0, true);
118120
}
119121

src/test_env.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ async fn inner_launch_with_node(node: BitcoinD, path: Option<PathBuf>, family: F
7070
node_url: Some(node.rpc_url()),
7171
derivation_cache_capacity: 10000,
7272
cache_control_seconds: 0,
73+
request_timeout_seconds: 10,
7374
..Default::default()
7475
};
7576
let available_port = get_available_port().unwrap();

src/threads/blocks.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,7 @@ async fn get_next_block_to_index(
4747
match last_indexed.as_ref() {
4848
Some(last) => {
4949
match client.get_next(&last, family).await {
50-
Ok(ChainStatus::NewBlock(next)) => {
51-
return Some(next);
52-
}
50+
Ok(ChainStatus::NewBlock(next)) => Some(next),
5351
Ok(ChainStatus::Reorg) => {
5452
log::warn!("reorg happened! {last:?} removed from the chain");
5553
let previous_height = last.height - 1;
@@ -64,7 +62,7 @@ async fn get_next_block_to_index(
6462
BlockMeta::new(previous_height, blocks_hash_ts.0, blocks_hash_ts.1);
6563
*last_indexed = Some(previous_block_meta);
6664
state.store.reorg();
67-
return None;
65+
None
6866
}
6967
Ok(ChainStatus::Tip) => {
7068
// Signal initial sync completion the first time we hit the tip
@@ -73,23 +71,33 @@ async fn get_next_block_to_index(
7371
log::info!("Initial block download completed, signaling mempool thread");
7472
}
7573
sleep(Duration::from_secs(1)).await;
76-
return None;
74+
None
7775
}
7876
Err(e) => {
7977
log::warn!("error getting next block {e}, sleeping for 1 second and retrying");
8078
sleep(Duration::from_secs(1)).await;
81-
return None;
79+
None
8280
}
8381
}
8482
}
8583
None => {
86-
if let Ok(Some(next)) = client.block_hash(0).await {
87-
return Some(BlockMeta::new(0, next, 0)); // TODO timestamp
84+
match client.block_hash(0).await {
85+
Ok(Some(next)) => {
86+
Some(BlockMeta::new(0, next, 0)) // TODO timestamp
87+
}
88+
Ok(None) => {
89+
log::info!("block hash 0 is not found");
90+
sleep(Duration::from_secs(1)).await;
91+
None
92+
}
93+
Err(e) => {
94+
log::warn!("error getting next block {e}, sleeping for 1 second and retrying");
95+
sleep(Duration::from_secs(1)).await;
96+
None
97+
}
8898
}
8999
}
90100
}
91-
sleep(Duration::from_secs(1)).await;
92-
None
93101
}
94102

95103
pub async fn index(

0 commit comments

Comments
 (0)