diff --git a/node/config_spec.toml b/node/config_spec.toml index c39fd23..acb48ef 100644 --- a/node/config_spec.toml +++ b/node/config_spec.toml @@ -19,7 +19,7 @@ doc = "The bitcoin network to operate on. Default `bitcoin`. Options are `bitcoi [[param]] name = "ping_timeout" type = "u64" -default = "15" +default = "60" doc = "The time (seconds) a peer has to respond to a `ping` message. Pings are sent aggressively throughout IBD to find slow peers." [[param]] @@ -43,11 +43,11 @@ doc = "The maximum time (seconds) to write to a TCP stream until the connection [[param]] name = "min_blocks_per_sec" type = "f64" -default = "1." +default = "3." doc = "The minimum rate a peer has to respond to block requests." [[param]] name = "tasks" type = "usize" -default = "64" +default = "32" doc = "The number of tasks to download blocks. Default is 64. Each task uses two OS threads." diff --git a/node/src/bin/ibd.rs b/node/src/bin/ibd.rs index 357321d..675c308 100644 --- a/node/src/bin/ibd.rs +++ b/node/src/bin/ibd.rs @@ -15,7 +15,7 @@ use node::{ }; use p2p::net::TimeoutParams; -const PING_INTERVAL: Duration = Duration::from_secs(15); +const PING_INTERVAL: Duration = Duration::from_secs(10 * 60); configure_me::include_config!(); @@ -74,8 +74,7 @@ fn main() { let acc_task = std::thread::spawn(move || accumulator_state.verify()); let peers = Arc::new(Mutex::new(peers)); let mut tasks = Vec::new(); - let chunk_size = chain.best_header().height() as usize / task_num; - let hashes = hashes_from_chain(Arc::clone(&chain), chunk_size); + let hashes = hashes_from_chain(Arc::clone(&chain), task_num); for (task_id, chunk) in hashes.into_iter().enumerate() { let chain = Arc::clone(&chain); let tx = tx.clone(); diff --git a/node/src/lib.rs b/node/src/lib.rs index c0cc8d5..4289958 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -34,6 +34,7 @@ use p2p::{ }; const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::WTXID_RELAY_VERSION; +const MAX_GETDATA: usize = 50_000; pub fn elapsed_time(then: Instant) { let duration_sec = then.elapsed().as_secs_f64(); @@ -79,11 +80,12 @@ pub fn sync_block_headers( hosts: &[SocketAddr], chainman: Arc, network: Network, - timeout_params: TimeoutParams, + mut timeout_params: TimeoutParams, ) { let mut rng = thread_rng(); let then = Instant::now(); tracing::info!("Syncing block headers to assume valid hash"); + timeout_params.ping_interval(Duration::from_secs(30)); loop { let random = hosts .choose(&mut rng) @@ -99,20 +101,24 @@ pub fn sync_block_headers( Err(_) => continue, }; tracing::info!("Connection established"); + let mut get_next = true; loop { - let curr = chainman.best_header().block_hash().hash; - let locator = BlockHash::from_byte_array(curr); - let getheaders = GetHeadersMessage { - version: PROTOCOL_VERSION, - locator_hashes: vec![locator], - stop_hash: BlockHash::GENESIS_PREVIOUS_BLOCK_HASH, - }; - tracing::info!("Requesting {locator}"); - if writer - .send_message(NetworkMessage::GetHeaders(getheaders)) - .is_err() - { - break; + let mut kill = false; + if get_next { + let curr = chainman.best_header().block_hash().hash; + let locator = BlockHash::from_byte_array(curr); + let getheaders = GetHeadersMessage { + version: PROTOCOL_VERSION, + locator_hashes: vec![locator], + stop_hash: BlockHash::GENESIS_PREVIOUS_BLOCK_HASH, + }; + tracing::info!("Requesting {locator}"); + if writer + .send_message(NetworkMessage::GetHeaders(getheaders)) + .is_err() + { + break; + } } while let Ok(Some(message)) = reader.read_message() { match message { @@ -121,6 +127,7 @@ pub fn sync_block_headers( chainman .process_new_block_headers(&consensus::serialize(&header), true) .expect("process headers failed"); + get_next = true; if header.block_hash().eq(&stop_hash) { tracing::info!("Done syncing block headers"); if let Some(message_rate) = @@ -138,12 +145,23 @@ pub fn sync_block_headers( tracing::info!("Update chain tip: {}", chainman.best_header().height()); break; } + NetworkMessage::Inv(_) => { + kill = true; + break; + } NetworkMessage::Ping(nonce) => { + get_next = false; let _ = writer.send_message(NetworkMessage::Pong(nonce)); } - e => tracing::info!("Ignoring message {}", e.command()), + e => { + get_next = false; + tracing::info!("Ignoring message {}", e.command()); + } } } + if kill { + break; + } } } } @@ -153,7 +171,7 @@ pub fn get_blocks_for_range( task_id: u32, timeout_params: TimeoutParams, blocks_per_sec: f64, - ping_timeout: Duration, + _ping_timeout: Duration, network: Network, block_dir: &Path, chain: Arc, @@ -162,6 +180,7 @@ pub fn get_blocks_for_range( updater: Sender, mut batch: Vec, ) { + tracing::info!("{task_id} assigned {} blocks", batch.len()); let mut rng = thread_rng(); loop { let peer = { @@ -207,7 +226,15 @@ pub fn get_blocks_for_range( hints.get_block_offsets(block_height).into_iter().collect(); // tracing::info!("{task_id} -> {block_height}:{hash}"); let file_path = block_dir.join(format!("{hash}.block")); - let mut file = File::create_new(file_path).expect("duplicate block file"); + let file = File::create_new(file_path); + let mut file = match file { + Ok(file) => file, + Err(e) => { + tracing::warn!("Conflicting open files at: {}", block_height); + tracing::warn!("{e}"); + panic!("files cannot conflict"); + } + }; let block_bytes = consensus::serialize(&block); file.write_all(&block_bytes) .expect("failed to write block file"); @@ -243,7 +270,9 @@ pub fn get_blocks_for_range( output_index += 1 } } - + if batch.len() % 100 == 0 { + tracing::info!("{task_id} has {} remaining blocks", batch.len()); + } if batch.is_empty() { tracing::info!("All block ranges fetched: {task_id}"); return; @@ -277,10 +306,10 @@ pub fn get_blocks_for_range( break; } } - if metrics.ping_timed_out(ping_timeout) { - tracing::info!("{task_id} failed to respond to a ping"); - break; - } + // if metrics.ping_timed_out(ping_timeout) { + // tracing::warn!("{task_id} failed to respond to a ping"); + // break; + // } } if batch.is_empty() { break; @@ -289,21 +318,50 @@ pub fn get_blocks_for_range( tracing::info!("All block ranges fetched: {task_id}"); } -pub fn hashes_from_chain(chain: Arc, chunks: usize) -> Vec> { +pub fn hashes_from_chain(chain: Arc, jobs: usize) -> Vec> { let height = chain.best_header().height(); let mut hashes = Vec::with_capacity(height as usize); let mut curr = chain.best_header(); let tip_hash = BlockHash::from_byte_array(curr.block_hash().hash); hashes.push(tip_hash); + let mut out = Vec::new(); while let Ok(next) = curr.prev() { if next.height() == 0 { - return hashes.chunks(chunks).map(|slice| slice.to_vec()).collect(); + break; } let hash = BlockHash::from_byte_array(next.block_hash().hash); hashes.push(hash); curr = next; } - hashes.chunks(chunks).map(|slice| slice.to_vec()).collect() + // These blocks are empty. Fetch the maximum amount of blocks. + let first_epoch = hashes.split_off(hashes.len() - 200_000); + let first_chunks: Vec> = first_epoch + .chunks(MAX_GETDATA) + .map(|slice| slice.to_vec()) + .collect(); + out.extend(first_chunks); + // These start to get larger, but are still small + let next_epoch = hashes.split_off(hashes.len() - 100_000); + let next_chunks: Vec> = next_epoch + .chunks(MAX_GETDATA / 2) + .map(|slice| slice.to_vec()) + .collect(); + out.extend(next_chunks); + // Still not entirely full, but almost there + let to_segwit = hashes.split_off(hashes.len() - 100_000); + let to_segwit_chunks: Vec> = to_segwit + .chunks(MAX_GETDATA / 4) + .map(|slice| slice.to_vec()) + .collect(); + out.extend(to_segwit_chunks); + // Now divide the rest among jobs + let chunk_size = hashes.len() / jobs; + let rest: Vec> = hashes + .chunks(chunk_size) + .map(|slice| slice.to_vec()) + .collect(); + out.extend(rest); + out } pub trait ChainExt {