Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions node/config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ doc = "The bitcoin network to operate on. Default `bitcoin`. Options are `bitcoi
[[param]]
name = "ping_timeout"
type = "u64"
default = "15"
default = "60"
doc = "The time (seconds) a peer has to respond to a `ping` message. Pings are sent aggressively throughout IBD to find slow peers."

[[param]]
Expand All @@ -43,11 +43,11 @@ doc = "The maximum time (seconds) to write to a TCP stream until the connection
[[param]]
name = "min_blocks_per_sec"
type = "f64"
default = "1."
default = "3."
doc = "The minimum rate a peer has to respond to block requests."

[[param]]
name = "tasks"
type = "usize"
default = "64"
default = "32"
doc = "The number of tasks to download blocks. Default is 64. Each task uses two OS threads."
5 changes: 2 additions & 3 deletions node/src/bin/ibd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use node::{
};
use p2p::net::TimeoutParams;

const PING_INTERVAL: Duration = Duration::from_secs(15);
const PING_INTERVAL: Duration = Duration::from_secs(10 * 60);

configure_me::include_config!();

Expand Down Expand Up @@ -74,8 +74,7 @@ fn main() {
let acc_task = std::thread::spawn(move || accumulator_state.verify());
let peers = Arc::new(Mutex::new(peers));
let mut tasks = Vec::new();
let chunk_size = chain.best_header().height() as usize / task_num;
let hashes = hashes_from_chain(Arc::clone(&chain), chunk_size);
let hashes = hashes_from_chain(Arc::clone(&chain), task_num);
for (task_id, chunk) in hashes.into_iter().enumerate() {
let chain = Arc::clone(&chain);
let tx = tx.clone();
Expand Down
108 changes: 83 additions & 25 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use p2p::{
};

const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::WTXID_RELAY_VERSION;
const MAX_GETDATA: usize = 50_000;

pub fn elapsed_time(then: Instant) {
let duration_sec = then.elapsed().as_secs_f64();
Expand Down Expand Up @@ -79,11 +80,12 @@ pub fn sync_block_headers(
hosts: &[SocketAddr],
chainman: Arc<ChainstateManager>,
network: Network,
timeout_params: TimeoutParams,
mut timeout_params: TimeoutParams,
) {
let mut rng = thread_rng();
let then = Instant::now();
tracing::info!("Syncing block headers to assume valid hash");
timeout_params.ping_interval(Duration::from_secs(30));
loop {
let random = hosts
.choose(&mut rng)
Expand All @@ -99,20 +101,24 @@ pub fn sync_block_headers(
Err(_) => continue,
};
tracing::info!("Connection established");
let mut get_next = true;
loop {
let curr = chainman.best_header().block_hash().hash;
let locator = BlockHash::from_byte_array(curr);
let getheaders = GetHeadersMessage {
version: PROTOCOL_VERSION,
locator_hashes: vec![locator],
stop_hash: BlockHash::GENESIS_PREVIOUS_BLOCK_HASH,
};
tracing::info!("Requesting {locator}");
if writer
.send_message(NetworkMessage::GetHeaders(getheaders))
.is_err()
{
break;
let mut kill = false;
if get_next {
let curr = chainman.best_header().block_hash().hash;
let locator = BlockHash::from_byte_array(curr);
let getheaders = GetHeadersMessage {
version: PROTOCOL_VERSION,
locator_hashes: vec![locator],
stop_hash: BlockHash::GENESIS_PREVIOUS_BLOCK_HASH,
};
tracing::info!("Requesting {locator}");
if writer
.send_message(NetworkMessage::GetHeaders(getheaders))
.is_err()
{
break;
}
}
while let Ok(Some(message)) = reader.read_message() {
match message {
Expand All @@ -121,6 +127,7 @@ pub fn sync_block_headers(
chainman
.process_new_block_headers(&consensus::serialize(&header), true)
.expect("process headers failed");
get_next = true;
if header.block_hash().eq(&stop_hash) {
tracing::info!("Done syncing block headers");
if let Some(message_rate) =
Expand All @@ -138,12 +145,23 @@ pub fn sync_block_headers(
tracing::info!("Update chain tip: {}", chainman.best_header().height());
break;
}
NetworkMessage::Inv(_) => {
kill = true;
break;
}
NetworkMessage::Ping(nonce) => {
get_next = false;
let _ = writer.send_message(NetworkMessage::Pong(nonce));
}
e => tracing::info!("Ignoring message {}", e.command()),
e => {
get_next = false;
tracing::info!("Ignoring message {}", e.command());
}
}
}
if kill {
break;
}
}
}
}
Expand All @@ -153,7 +171,7 @@ pub fn get_blocks_for_range(
task_id: u32,
timeout_params: TimeoutParams,
blocks_per_sec: f64,
ping_timeout: Duration,
_ping_timeout: Duration,
network: Network,
block_dir: &Path,
chain: Arc<ChainstateManager>,
Expand All @@ -162,6 +180,7 @@ pub fn get_blocks_for_range(
updater: Sender<AccumulatorUpdate>,
mut batch: Vec<BlockHash>,
) {
tracing::info!("{task_id} assigned {} blocks", batch.len());
let mut rng = thread_rng();
loop {
let peer = {
Expand Down Expand Up @@ -207,7 +226,15 @@ pub fn get_blocks_for_range(
hints.get_block_offsets(block_height).into_iter().collect();
// tracing::info!("{task_id} -> {block_height}:{hash}");
let file_path = block_dir.join(format!("{hash}.block"));
let mut file = File::create_new(file_path).expect("duplicate block file");
let file = File::create_new(file_path);
let mut file = match file {
Ok(file) => file,
Err(e) => {
tracing::warn!("Conflicting open files at: {}", block_height);
tracing::warn!("{e}");
panic!("files cannot conflict");
}
};
let block_bytes = consensus::serialize(&block);
file.write_all(&block_bytes)
.expect("failed to write block file");
Expand Down Expand Up @@ -243,7 +270,9 @@ pub fn get_blocks_for_range(
output_index += 1
}
}

if batch.len() % 100 == 0 {
tracing::info!("{task_id} has {} remaining blocks", batch.len());
}
if batch.is_empty() {
tracing::info!("All block ranges fetched: {task_id}");
return;
Expand Down Expand Up @@ -277,10 +306,10 @@ pub fn get_blocks_for_range(
break;
}
}
if metrics.ping_timed_out(ping_timeout) {
tracing::info!("{task_id} failed to respond to a ping");
break;
}
// if metrics.ping_timed_out(ping_timeout) {
// tracing::warn!("{task_id} failed to respond to a ping");
// break;
// }
}
if batch.is_empty() {
break;
Expand All @@ -289,21 +318,50 @@ pub fn get_blocks_for_range(
tracing::info!("All block ranges fetched: {task_id}");
}

pub fn hashes_from_chain(chain: Arc<ChainstateManager>, chunks: usize) -> Vec<Vec<BlockHash>> {
pub fn hashes_from_chain(chain: Arc<ChainstateManager>, jobs: usize) -> Vec<Vec<BlockHash>> {
let height = chain.best_header().height();
let mut hashes = Vec::with_capacity(height as usize);
let mut curr = chain.best_header();
let tip_hash = BlockHash::from_byte_array(curr.block_hash().hash);
hashes.push(tip_hash);
let mut out = Vec::new();
while let Ok(next) = curr.prev() {
if next.height() == 0 {
return hashes.chunks(chunks).map(|slice| slice.to_vec()).collect();
break;
}
let hash = BlockHash::from_byte_array(next.block_hash().hash);
hashes.push(hash);
curr = next;
}
hashes.chunks(chunks).map(|slice| slice.to_vec()).collect()
// These blocks are empty. Fetch the maximum amount of blocks.
let first_epoch = hashes.split_off(hashes.len() - 200_000);
let first_chunks: Vec<Vec<BlockHash>> = first_epoch
.chunks(MAX_GETDATA)
.map(|slice| slice.to_vec())
.collect();
out.extend(first_chunks);
// These start to get larger, but are still small
let next_epoch = hashes.split_off(hashes.len() - 100_000);
let next_chunks: Vec<Vec<BlockHash>> = next_epoch
.chunks(MAX_GETDATA / 2)
.map(|slice| slice.to_vec())
.collect();
out.extend(next_chunks);
// Still not entirely full, but almost there
let to_segwit = hashes.split_off(hashes.len() - 100_000);
let to_segwit_chunks: Vec<Vec<BlockHash>> = to_segwit
.chunks(MAX_GETDATA / 4)
.map(|slice| slice.to_vec())
.collect();
out.extend(to_segwit_chunks);
// Now divide the rest among jobs
let chunk_size = hashes.len() / jobs;
let rest: Vec<Vec<BlockHash>> = hashes
.chunks(chunk_size)
.map(|slice| slice.to_vec())
.collect();
out.extend(rest);
out
}

pub trait ChainExt {
Expand Down