Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion node/config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ doc = "The path to your `bitcoin.hints` file that will be used for IBD. Default
name = "blocks_dir"
type = "String"
default = "\"./blockfiles\".into()"
doc = "Directory where you would like to store the bitcoin blocks. Default `./blockfiles`"
doc = "Temporary directory where you would like to store the bitcoin blocks. Default `./blockfiles`"

[[param]]
name = "network"
Expand Down
30 changes: 24 additions & 6 deletions node/src/bin/ibd.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
fs::File,
path::PathBuf,
str::FromStr,
sync::{mpsc::channel, Arc, Mutex},
time::{Duration, Instant},
};
Expand All @@ -10,8 +11,8 @@ use hintfile::Hints;
use kernel::{ChainstateManager, ChainstateManagerOptions, ContextBuilder};

use node::{
bootstrap_dns, elapsed_time, get_blocks_for_range, hashes_from_chain, sync_block_headers,
AccumulatorState, ChainExt,
bootstrap_dns, elapsed_time, emit_hashes_in_order, get_blocks_for_range, hashes_from_chain,
migrate_blocks, setup_validation_interface, sync_block_headers, AccumulatorState, ChainExt,
};
use p2p::net::TimeoutParams;

Expand All @@ -20,9 +21,14 @@ const PING_INTERVAL: Duration = Duration::from_secs(10 * 60);
configure_me::include_config!();

fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
let (config, _) = Config::including_optional_config_files::<&[&str]>(&[]).unwrap_or_exit();
let hint_path = config.hintfile;
let blocks_dir = config.blocks_dir;
let home_var = std::env::var("HOME").unwrap();
let home_dir = PathBuf::from_str(&home_var).unwrap();
let bitcoind_dir = home_dir.join(".bitcoin");
let network = config
.network
.parse::<Network>()
Expand All @@ -38,8 +44,6 @@ fn main() {
timeout_conf.write_timeout(write_timeout);
timeout_conf.tcp_handshake_timeout(tcp_timeout);
timeout_conf.ping_interval(PING_INTERVAL);
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
let hintfile_start_time = Instant::now();
tracing::info!("Reading in {hint_path}");
let mut hintfile = File::open(hint_path).expect("invalid hintfile path");
Expand All @@ -58,9 +62,16 @@ fn main() {
let kernel_start_time = Instant::now();
let ctx = ContextBuilder::new()
.chain_type(network.chain_type())
.validation_interface(setup_validation_interface())
.build()
.unwrap();
let options = ChainstateManagerOptions::new(&ctx, ".", "./blocks").unwrap();
let bitcoind_block_dir = bitcoind_dir.join("blocks");
let options = ChainstateManagerOptions::new(
&ctx,
bitcoind_dir.to_str().unwrap(),
bitcoind_block_dir.to_str().unwrap(),
)
.unwrap();
let chainman = ChainstateManager::new(options).unwrap();
elapsed_time(kernel_start_time);
let tip = chainman.best_header().height();
Expand All @@ -74,7 +85,12 @@ fn main() {
let acc_task = std::thread::spawn(move || accumulator_state.verify());
let peers = Arc::new(Mutex::new(peers));
let mut tasks = Vec::new();
let hashes = hashes_from_chain(Arc::clone(&chain), task_num);
let hashes = if matches!(network, Network::Bitcoin) {
hashes_from_chain(Arc::clone(&chain), task_num)
} else {
let hashes = emit_hashes_in_order(Arc::clone(&chain)).collect::<Vec<BlockHash>>();
hashes.chunks(10_000).map(|slice| slice.to_vec()).collect()
};
for (task_id, chunk) in hashes.into_iter().enumerate() {
let chain = Arc::clone(&chain);
let tx = tx.clone();
Expand Down Expand Up @@ -107,4 +123,6 @@ fn main() {
let acc_result = acc_task.join().unwrap();
tracing::info!("Verified: {acc_result}");
elapsed_time(main_routine_time);
tracing::info!("Migrating blocks to Bitcoin Core");
migrate_blocks(chain, &block_file_path);
}
81 changes: 78 additions & 3 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::HashSet,
fs::File,
io::Write,
io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::Path,
sync::{
Expand Down Expand Up @@ -198,7 +198,12 @@ pub fn get_blocks_for_range(
continue;
};
// tracing::info!("Connection successful");
let payload = InventoryPayload(batch.iter().map(|hash| Inventory::Block(*hash)).collect());
let payload = InventoryPayload(
batch
.iter()
.map(|hash| Inventory::WitnessBlock(*hash))
.collect(),
);
// tracing::info!("Requesting {} blocks", payload.0.len());
let getdata = NetworkMessage::GetData(payload);
if writer.send_message(getdata).is_err() {
Expand Down Expand Up @@ -232,9 +237,12 @@ pub fn get_blocks_for_range(
panic!("files cannot conflict");
}
};
let block_bytes = consensus::serialize(&block);
let block_bytes = consensus::encode::serialize(&block);
file.write_all(&block_bytes)
.expect("failed to write block file");
file.flush().expect("failed to flush entire block to disk");
file.sync_all().expect("file failed to sync with the OS");
drop(file);
// tracing::info!("Wrote {hash} to file");
let (_, transactions) = block.into_parts();
let mut output_index = 0;
Expand Down Expand Up @@ -361,6 +369,73 @@ pub fn hashes_from_chain(chain: Arc<ChainstateManager>, jobs: usize) -> Vec<Vec<
out
}

pub fn emit_hashes_in_order(chain: Arc<ChainstateManager>) -> impl Iterator<Item = BlockHash> {
let height = chain.best_header().height();
let mut hashes = Vec::with_capacity(height as usize);
let mut curr = chain.best_header();
let tip_hash = BlockHash::from_byte_array(curr.block_hash().hash);
hashes.push(tip_hash);
while let Ok(next) = curr.prev() {
if next.height() == 0 {
break;
}
let hash = BlockHash::from_byte_array(next.block_hash().hash);
hashes.push(hash);
curr = next;
}
hashes.into_iter().rev()
}

pub fn migrate_blocks(chain: Arc<ChainstateManager>, block_dir: &Path) {
let start = Instant::now();
for (i, hash) in emit_hashes_in_order(Arc::clone(&chain)).enumerate() {
let file_path = block_dir.join(format!("{hash}.block"));
let mut file =
File::open(&file_path).expect("block file not present. did IBD complete successfully?");
let mut block_bytes = Vec::new();
file.read_to_end(&mut block_bytes)
.expect("unexpected error parsing block file");
let block =
kernel::Block::try_from(block_bytes.as_slice()).expect("invalid block serialization");
let (accepted, _) = chain.process_block(&block);
if !accepted {
tracing::warn!("{hash} was rejected");
panic!("could not migrate blocks");
}
drop(file);
if let Err(e) = std::fs::remove_file(file_path) {
tracing::warn!("Could not remove block file {e}");
}
if i % 100 == 0 {
tracing::info!("{i}th block migrated");
elapsed_time(start);
}
}
}

pub fn setup_validation_interface() -> Box<kernel::ValidationInterfaceCallbacks> {
Box::new(kernel::ValidationInterfaceCallbacks {
block_checked: Box::new(move |_block, _mode, result| match result {
kernel::BlockValidationResult::MUTATED => tracing::warn!("Received mutated block"),
kernel::BlockValidationResult::CONSENSUS => tracing::warn!("Invalid consensus"),
kernel::BlockValidationResult::CACHED_INVALID => tracing::warn!("Cached as invalid"),
kernel::BlockValidationResult::INVALID_HEADER => {
tracing::warn!("Block header is malformed")
}
kernel::BlockValidationResult::TIME_FUTURE => tracing::warn!("Invalid timestamp"),
kernel::BlockValidationResult::MISSING_PREV => tracing::warn!("Missing previous block"),
kernel::BlockValidationResult::HEADER_LOW_WORK => {
tracing::warn!("Header has too-low work")
}
kernel::BlockValidationResult::INVALID_PREV => {
tracing::warn!("Invalid previous block hash")
}
// Result is unset (not rejected)
_ => (),
}),
})
}

pub trait ChainExt {
fn chain_type(&self) -> ChainType;
}
Expand Down