diff --git a/node/config_spec.toml b/node/config_spec.toml index acb48ef..d7ab631 100644 --- a/node/config_spec.toml +++ b/node/config_spec.toml @@ -8,7 +8,7 @@ doc = "The path to your `bitcoin.hints` file that will be used for IBD. Default name = "blocks_dir" type = "String" default = "\"./blockfiles\".into()" -doc = "Directory where you would like to store the bitcoin blocks. Default `./blockfiles`" +doc = "Temporary directory where you would like to store the bitcoin blocks. Default `./blockfiles`" [[param]] name = "network" diff --git a/node/src/bin/ibd.rs b/node/src/bin/ibd.rs index 675c308..ed4d6b0 100644 --- a/node/src/bin/ibd.rs +++ b/node/src/bin/ibd.rs @@ -1,6 +1,7 @@ use std::{ fs::File, path::PathBuf, + str::FromStr, sync::{mpsc::channel, Arc, Mutex}, time::{Duration, Instant}, }; @@ -10,8 +11,8 @@ use hintfile::Hints; use kernel::{ChainstateManager, ChainstateManagerOptions, ContextBuilder}; use node::{ - bootstrap_dns, elapsed_time, get_blocks_for_range, hashes_from_chain, sync_block_headers, - AccumulatorState, ChainExt, + bootstrap_dns, elapsed_time, emit_hashes_in_order, get_blocks_for_range, hashes_from_chain, + migrate_blocks, setup_validation_interface, sync_block_headers, AccumulatorState, ChainExt, }; use p2p::net::TimeoutParams; @@ -20,9 +21,14 @@ const PING_INTERVAL: Duration = Duration::from_secs(10 * 60); configure_me::include_config!(); fn main() { + let subscriber = tracing_subscriber::FmtSubscriber::new(); + tracing::subscriber::set_global_default(subscriber).unwrap(); let (config, _) = Config::including_optional_config_files::<&[&str]>(&[]).unwrap_or_exit(); let hint_path = config.hintfile; let blocks_dir = config.blocks_dir; + let home_var = std::env::var("HOME").unwrap(); + let home_dir = PathBuf::from_str(&home_var).unwrap(); + let bitcoind_dir = home_dir.join(".bitcoin"); let network = config .network .parse::() @@ -38,8 +44,6 @@ fn main() { timeout_conf.write_timeout(write_timeout); timeout_conf.tcp_handshake_timeout(tcp_timeout); timeout_conf.ping_interval(PING_INTERVAL); - let subscriber = tracing_subscriber::FmtSubscriber::new(); - tracing::subscriber::set_global_default(subscriber).unwrap(); let hintfile_start_time = Instant::now(); tracing::info!("Reading in {hint_path}"); let mut hintfile = File::open(hint_path).expect("invalid hintfile path"); @@ -58,9 +62,16 @@ fn main() { let kernel_start_time = Instant::now(); let ctx = ContextBuilder::new() .chain_type(network.chain_type()) + .validation_interface(setup_validation_interface()) .build() .unwrap(); - let options = ChainstateManagerOptions::new(&ctx, ".", "./blocks").unwrap(); + let bitcoind_block_dir = bitcoind_dir.join("blocks"); + let options = ChainstateManagerOptions::new( + &ctx, + bitcoind_dir.to_str().unwrap(), + bitcoind_block_dir.to_str().unwrap(), + ) + .unwrap(); let chainman = ChainstateManager::new(options).unwrap(); elapsed_time(kernel_start_time); let tip = chainman.best_header().height(); @@ -74,7 +85,12 @@ fn main() { let acc_task = std::thread::spawn(move || accumulator_state.verify()); let peers = Arc::new(Mutex::new(peers)); let mut tasks = Vec::new(); - let hashes = hashes_from_chain(Arc::clone(&chain), task_num); + let hashes = if matches!(network, Network::Bitcoin) { + hashes_from_chain(Arc::clone(&chain), task_num) + } else { + let hashes = emit_hashes_in_order(Arc::clone(&chain)).collect::>(); + hashes.chunks(10_000).map(|slice| slice.to_vec()).collect() + }; for (task_id, chunk) in hashes.into_iter().enumerate() { let chain = Arc::clone(&chain); let tx = tx.clone(); @@ -107,4 +123,6 @@ fn main() { let acc_result = acc_task.join().unwrap(); tracing::info!("Verified: {acc_result}"); elapsed_time(main_routine_time); + tracing::info!("Migrating blocks to Bitcoin Core"); + migrate_blocks(chain, &block_file_path); } diff --git a/node/src/lib.rs b/node/src/lib.rs index 58c0c3e..20d0659 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -1,7 +1,7 @@ use std::{ collections::HashSet, fs::File, - io::Write, + io::{Read, Write}, net::{IpAddr, Ipv4Addr, SocketAddr}, path::Path, sync::{ @@ -198,7 +198,12 @@ pub fn get_blocks_for_range( continue; }; // tracing::info!("Connection successful"); - let payload = InventoryPayload(batch.iter().map(|hash| Inventory::Block(*hash)).collect()); + let payload = InventoryPayload( + batch + .iter() + .map(|hash| Inventory::WitnessBlock(*hash)) + .collect(), + ); // tracing::info!("Requesting {} blocks", payload.0.len()); let getdata = NetworkMessage::GetData(payload); if writer.send_message(getdata).is_err() { @@ -232,9 +237,12 @@ pub fn get_blocks_for_range( panic!("files cannot conflict"); } }; - let block_bytes = consensus::serialize(&block); + let block_bytes = consensus::encode::serialize(&block); file.write_all(&block_bytes) .expect("failed to write block file"); + file.flush().expect("failed to flush entire block to disk"); + file.sync_all().expect("file failed to sync with the OS"); + drop(file); // tracing::info!("Wrote {hash} to file"); let (_, transactions) = block.into_parts(); let mut output_index = 0; @@ -361,6 +369,73 @@ pub fn hashes_from_chain(chain: Arc, jobs: usize) -> Vec) -> impl Iterator { + let height = chain.best_header().height(); + let mut hashes = Vec::with_capacity(height as usize); + let mut curr = chain.best_header(); + let tip_hash = BlockHash::from_byte_array(curr.block_hash().hash); + hashes.push(tip_hash); + while let Ok(next) = curr.prev() { + if next.height() == 0 { + break; + } + let hash = BlockHash::from_byte_array(next.block_hash().hash); + hashes.push(hash); + curr = next; + } + hashes.into_iter().rev() +} + +pub fn migrate_blocks(chain: Arc, block_dir: &Path) { + let start = Instant::now(); + for (i, hash) in emit_hashes_in_order(Arc::clone(&chain)).enumerate() { + let file_path = block_dir.join(format!("{hash}.block")); + let mut file = + File::open(&file_path).expect("block file not present. did IBD complete successfully?"); + let mut block_bytes = Vec::new(); + file.read_to_end(&mut block_bytes) + .expect("unexpected error parsing block file"); + let block = + kernel::Block::try_from(block_bytes.as_slice()).expect("invalid block serialization"); + let (accepted, _) = chain.process_block(&block); + if !accepted { + tracing::warn!("{hash} was rejected"); + panic!("could not migrate blocks"); + } + drop(file); + if let Err(e) = std::fs::remove_file(file_path) { + tracing::warn!("Could not remove block file {e}"); + } + if i % 100 == 0 { + tracing::info!("{i}th block migrated"); + elapsed_time(start); + } + } +} + +pub fn setup_validation_interface() -> Box { + Box::new(kernel::ValidationInterfaceCallbacks { + block_checked: Box::new(move |_block, _mode, result| match result { + kernel::BlockValidationResult::MUTATED => tracing::warn!("Received mutated block"), + kernel::BlockValidationResult::CONSENSUS => tracing::warn!("Invalid consensus"), + kernel::BlockValidationResult::CACHED_INVALID => tracing::warn!("Cached as invalid"), + kernel::BlockValidationResult::INVALID_HEADER => { + tracing::warn!("Block header is malformed") + } + kernel::BlockValidationResult::TIME_FUTURE => tracing::warn!("Invalid timestamp"), + kernel::BlockValidationResult::MISSING_PREV => tracing::warn!("Missing previous block"), + kernel::BlockValidationResult::HEADER_LOW_WORK => { + tracing::warn!("Header has too-low work") + } + kernel::BlockValidationResult::INVALID_PREV => { + tracing::warn!("Invalid previous block hash") + } + // Result is unset (not rejected) + _ => (), + }), + }) +} + pub trait ChainExt { fn chain_type(&self) -> ChainType; }