diff --git a/.gitignore b/.gitignore index be2922b..faffbd6 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ Cargo.lock !justfile *.db *.hints +/node/chainstate +/node/blocks +/node/blockfiles diff --git a/Cargo.toml b/Cargo.toml index dd30c53..f2cb4df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,9 @@ [workspace] -members = ["accumulator", "hintfile", "network"] +members = ["accumulator", "hintfile", "network", "node"] default-members = ["accumulator", "network"] resolver = "2" [workspace.dependencies] bitcoin = { git = "https://github.com/rust-bitcoin/rust-bitcoin", default-features = false, rev = "16cc257c3695dea0e7301a5fa9cab44b8ed60598" } +kernel = { package = "bitcoinkernel", git = "https://github.com/alexanderwiederin/rust-bitcoinkernel.git", rev = "353533221e3ba91d672418eab1ae7b83a61214f9" } +p2p = { package = "bitcoin-p2p", git = "https://github.com/2140-dev/bitcoin-p2p.git", rev = "4f67fbeced98e3ddcc65a1333c46823a2b56332a" } diff --git a/accumulator/src/lib.rs b/accumulator/src/lib.rs index d43eb92..7f34478 100644 --- a/accumulator/src/lib.rs +++ b/accumulator/src/lib.rs @@ -34,6 +34,13 @@ fn split_in_half(a: [u8; 32]) -> ([u8; 16], [u8; 16]) { (high, low) } +/// Update an accumulator by adding or spending a pre-hashed outpoint +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AccumulatorUpdate { + Add([u8; 32]), + Spent([u8; 32]), +} + impl Accumulator { /// The zero accumulator pub const ZERO: Accumulator = Accumulator { high: 0, low: 0 }; @@ -59,6 +66,14 @@ impl Accumulator { *self = Self { high, low }; } + /// Update the accumulator + pub fn update(&mut self, update: AccumulatorUpdate) { + match update { + AccumulatorUpdate::Add(added) => self.add_hashed_outpoint(added), + AccumulatorUpdate::Spent(spent) => self.spend_hashed_outpoint(spent), + } + } + /// Spend the inputs in a block by subtracing them from the accumulator. pub fn spend(&mut self, outpoint: OutPoint) { let hash = hash_outpoint(outpoint); @@ -226,4 +241,27 @@ mod tests { acc.spend_hashed_outpoint(hash_two); acc.spend_hashed_outpoint(hash_three); } + + #[test] + fn test_update_method() { + let [outpoint_one, outpoint_two, outpoint_three, outpoint_four, outpoint_five] = + make_five_outpoint(); + let hash_one = hash_outpoint(outpoint_one); + let hash_two = hash_outpoint(outpoint_two); + let hash_three = hash_outpoint(outpoint_three); + let hash_four = hash_outpoint(outpoint_four); + let hash_five = hash_outpoint(outpoint_five); + let mut acc = Accumulator::default(); + acc.update(AccumulatorUpdate::Add(hash_one)); + acc.update(AccumulatorUpdate::Add(hash_two)); + acc.update(AccumulatorUpdate::Add(hash_three)); + acc.update(AccumulatorUpdate::Add(hash_four)); + acc.update(AccumulatorUpdate::Add(hash_five)); + acc.update(AccumulatorUpdate::Spent(hash_five)); + acc.update(AccumulatorUpdate::Spent(hash_four)); + acc.update(AccumulatorUpdate::Spent(hash_three)); + acc.update(AccumulatorUpdate::Spent(hash_two)); + acc.update(AccumulatorUpdate::Spent(hash_one)); + assert!(acc.is_zero()); + } } diff --git a/contrib/data/bitcoin_headers.sqlite b/contrib/data/bitcoin_headers.sqlite deleted file mode 100644 index 39d6d9b..0000000 Binary files a/contrib/data/bitcoin_headers.sqlite and /dev/null differ diff --git a/contrib/data/signet_headers.sqlite b/contrib/data/signet_headers.sqlite deleted file mode 100644 index 2d58ae5..0000000 Binary files a/contrib/data/signet_headers.sqlite and /dev/null differ diff --git a/hintfile/Cargo.toml b/hintfile/Cargo.toml index 5f0f098..639c434 100644 --- a/hintfile/Cargo.toml +++ b/hintfile/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [dependencies] bitcoin = { workspace = true } -kernel = { package = "bitcoinkernel", git = "https://github.com/alexanderwiederin/rust-bitcoinkernel.git", rev = "b7b42d65a70f2c0e5fc8d1d024549bf857211046" } +kernel = { workspace = true } [[bin]] name = "construct" diff --git a/hintfile/src/bin/construct.rs b/hintfile/src/bin/construct.rs index b4e4457..3e47fe0 100644 --- a/hintfile/src/bin/construct.rs +++ b/hintfile/src/bin/construct.rs @@ -3,7 +3,7 @@ use std::{fs::File, io::Write, sync::Arc}; use hintfile::write_compact_size; use kernel::{ChainType, ChainstateManager, ChainstateManagerOptions, ContextBuilder, KernelError}; -const CHAIN_TYPE: ChainType = ChainType::MAINNET; +const CHAIN_TYPE: ChainType = ChainType::SIGNET; fn main() { let mut file = File::create("./bitcoin.hints").unwrap(); diff --git a/network/Cargo.toml b/network/Cargo.toml index e100f2c..5938847 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "peers" +name = "network" version = "0.1.0" edition = "2021" diff --git a/network/tests/test.rs b/network/tests/test.rs index 887e6dd..1cba257 100644 --- a/network/tests/test.rs +++ b/network/tests/test.rs @@ -1,6 +1,6 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use peers::dns::DnsQuery; +use network::dns::DnsQuery; #[tokio::test] #[cfg(feature = "tokio")] diff --git a/node/Cargo.toml b/node/Cargo.toml new file mode 100644 index 0000000..d86b47f --- /dev/null +++ b/node/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "node" +version = "0.1.0" +edition = "2021" + +[dependencies] +accumulator = { path = "../accumulator/" } +bitcoin = { workspace = true } +kernel = { workspace = true } +hintfile = { path = "../hintfile/" } +network = { path = "../network/" } +p2p = { workspace = true } + +tracing = "0.1" +tracing-subscriber = "0.3" + +[[bin]] +name = "ibd" diff --git a/node/justfile b/node/justfile new file mode 100644 index 0000000..0ce66a8 --- /dev/null +++ b/node/justfile @@ -0,0 +1,5 @@ +ibd: + cargo run --bin ibd ./bitcoin.hints --release + +delete: + rm -rf blocks chainstate blockfiles diff --git a/node/src/bin/ibd.rs b/node/src/bin/ibd.rs new file mode 100644 index 0000000..5d0ece6 --- /dev/null +++ b/node/src/bin/ibd.rs @@ -0,0 +1,92 @@ +use std::{ + fs::File, + path::Path, + sync::{mpsc::channel, Arc, Mutex}, + time::Instant, +}; + +use bitcoin::Network; +use hintfile::Hints; +use kernel::{ChainType, ChainstateManager, ChainstateManagerOptions, ContextBuilder}; + +use node::{ + bootstrap_dns, elapsed_time, get_blocks_for_range, hashes_from_chain, sync_block_headers, + AccumulatorState, +}; + +const CHAIN_TYPE: ChainType = ChainType::SIGNET; +const NETWORK: Network = Network::Signet; +const TASKS: usize = 256; +const BLOCK_FILE_PATH: &str = "./blockfiles"; + +fn main() { + let mut args = std::env::args(); + let _ = args.next(); + let hint_path = args.next().expect("Usage: "); + // Logging + let subscriber = tracing_subscriber::FmtSubscriber::new(); + tracing::subscriber::set_global_default(subscriber).unwrap(); + let hintfile_start_time = Instant::now(); + tracing::info!("Reading in {hint_path}"); + let mut hintfile = File::open(hint_path).expect("invalid hintfile path"); + let hints = Arc::new(Hints::from_file(&mut hintfile)); + elapsed_time(hintfile_start_time); + let block_file_path = Path::new(BLOCK_FILE_PATH); + std::fs::create_dir(block_file_path).expect("could not create block file directory"); + let stop_hash = hints.stop_hash(); + tracing::info!("Assume valid hash: {stop_hash}"); + tracing::info!("Finding peers with DNS"); + let dns_start_time = Instant::now(); + let peers = bootstrap_dns(NETWORK); + elapsed_time(dns_start_time); + tracing::info!("Initializing bitcoin kernel"); + let kernel_start_time = Instant::now(); + let ctx = ContextBuilder::new() + .chain_type(CHAIN_TYPE) + .build() + .unwrap(); + let options = ChainstateManagerOptions::new(&ctx, ".", "./blocks").unwrap(); + let chainman = ChainstateManager::new(options).unwrap(); + elapsed_time(kernel_start_time); + let tip = chainman.best_header().height(); + tracing::info!("Kernel best header: {tip}"); + let chain = Arc::new(chainman); + sync_block_headers(stop_hash, &peers, Arc::clone(&chain), NETWORK); + tracing::info!("Assume valid height: {}", chain.best_header().height()); + let (tx, rx) = channel(); + let main_routine_time = Instant::now(); + let mut accumulator_state = AccumulatorState::new(rx); + let acc_task = std::thread::spawn(move || accumulator_state.verify()); + let peers = Arc::new(Mutex::new(peers)); + let mut tasks = Vec::new(); + let chunk_size = chain.best_header().height() as usize / TASKS; + let hashes = hashes_from_chain(Arc::clone(&chain), chunk_size); + for (task_id, chunk) in hashes.into_iter().enumerate() { + let chain = Arc::clone(&chain); + let tx = tx.clone(); + let peers = Arc::clone(&peers); + let hints = Arc::clone(&hints); + let block_task = std::thread::spawn(move || { + get_blocks_for_range( + task_id as u32, + NETWORK, + block_file_path, + chain, + &hints, + peers, + tx, + chunk, + ) + }); + tasks.push(block_task); + } + for task in tasks { + if let Err(e) = task.join() { + tracing::warn!("{:?}", e.downcast::()); + } + } + drop(tx); + let acc_result = acc_task.join().unwrap(); + tracing::info!("Verified: {acc_result}"); + elapsed_time(main_routine_time); +} diff --git a/node/src/lib.rs b/node/src/lib.rs new file mode 100644 index 0000000..20bee5b --- /dev/null +++ b/node/src/lib.rs @@ -0,0 +1,306 @@ +use std::{ + collections::HashSet, + fs::File, + io::Write, + net::SocketAddr, + path::Path, + sync::{ + mpsc::{Receiver, Sender}, + Arc, Mutex, + }, + time::{Duration, Instant}, +}; + +use accumulator::{Accumulator, AccumulatorUpdate}; +use bitcoin::{ + consensus, + key::rand::{seq::SliceRandom, thread_rng}, + script::ScriptExt, + transaction::TransactionExt, + BlockHash, BlockHeight, Network, OutPoint, +}; +use hintfile::Hints; +use kernel::ChainstateManager; +use network::dns::DnsQuery; +use p2p::{ + handshake::ConnectionConfig, + net::{ConnectionExt, TimeoutParams}, + p2p::{ + message::{InventoryPayload, NetworkMessage}, + message_blockdata::{GetHeadersMessage, Inventory}, + NetworkExt, ProtocolVersion, ServiceFlags, + }, + SeedsExt, TimedMessage, +}; + +const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::WTXID_RELAY_VERSION; + +pub fn elapsed_time(then: Instant) { + let duration_sec = then.elapsed().as_secs_f64(); + tracing::info!("Elapsed time {duration_sec} seconds"); +} + +#[derive(Debug)] +pub struct AccumulatorState { + acc: Accumulator, + update_rx: Receiver, +} + +impl AccumulatorState { + pub fn new(rx: Receiver) -> Self { + Self { + acc: Accumulator::new(), + update_rx: rx, + } + } + + pub fn verify(&mut self) -> bool { + while let Ok(update) = self.update_rx.recv() { + self.acc.update(update); + } + self.acc.is_zero() + } +} + +pub fn bootstrap_dns(network: Network) -> Vec { + let mut all_hosts = Vec::new(); + for seed in network.seeds() { + let hosts = DnsQuery::new_cloudflare(seed).lookup().unwrap_or_default(); + all_hosts.extend_from_slice(&hosts); + } + all_hosts + .into_iter() + .map(|host| SocketAddr::new(host, network.default_p2p_port())) + .collect() +} + +pub fn sync_block_headers( + stop_hash: BlockHash, + hosts: &[SocketAddr], + chainman: Arc, + network: Network, +) { + let mut rng = thread_rng(); + let then = Instant::now(); + tracing::info!("Syncing block headers to assume valid hash"); + loop { + let random = hosts + .choose(&mut rng) + .copied() + .expect("dns must return at least one peer"); + tracing::info!("Attempting connection to {random}"); + let mut timeout_conf = TimeoutParams::new(); + timeout_conf.read_timeout(Duration::from_secs(2)); + timeout_conf.write_timeout(Duration::from_secs(2)); + let conn = ConnectionConfig::new() + .change_network(network) + .decrease_version_requirement(ProtocolVersion::BIP0031_VERSION) + .open_connection(random, timeout_conf); + let (writer, mut reader, metrics) = match conn { + Ok((writer, reader, metrics)) => (writer, reader, metrics), + Err(_) => continue, + }; + tracing::info!("Connection established"); + loop { + let curr = chainman.best_header().block_hash().hash; + let locator = BlockHash::from_byte_array(curr); + let getheaders = GetHeadersMessage { + version: PROTOCOL_VERSION, + locator_hashes: vec![locator], + stop_hash: BlockHash::GENESIS_PREVIOUS_BLOCK_HASH, + }; + tracing::info!("Requesting {locator}"); + if writer + .send_message(NetworkMessage::GetHeaders(getheaders)) + .is_err() + { + break; + } + while let Ok(Some(message)) = reader.read_message() { + match message { + NetworkMessage::Headers(message) => { + for header in message.0 { + chainman + .process_new_block_headers(&consensus::serialize(&header), true) + .expect("process headers failed"); + if header.block_hash().eq(&stop_hash) { + tracing::info!("Done syncing block headers"); + if let Some(message_rate) = + metrics.message_rate(p2p::TimedMessage::BlockHeaders) + { + let mps = message_rate + .messages_per_secs(Instant::now()) + .unwrap_or(0.); + tracing::info!("Peer responses per second: {mps}"); + } + elapsed_time(then); + return; + } + } + tracing::info!("Update chain tip: {}", chainman.best_header().height()); + break; + } + NetworkMessage::Ping(nonce) => { + let _ = writer.send_message(NetworkMessage::Pong(nonce)); + } + e => tracing::info!("Ignoring message {}", e.command()), + } + } + } + } +} + +#[allow(clippy::too_many_arguments)] +pub fn get_blocks_for_range( + task_id: u32, + network: Network, + block_dir: &Path, + chain: Arc, + hints: &Hints, + peers: Arc>>, + updater: Sender, + mut batch: Vec, +) { + let mut timeout = TimeoutParams::new(); + timeout.read_timeout(Duration::from_secs(2)); + timeout.tcp_handshake_timeout(Duration::from_secs(2)); + timeout.ping_interval(Duration::from_secs(15)); + let mut rng = thread_rng(); + loop { + let peer = { + let lock_opt = peers.lock().ok(); + let socket_addr = lock_opt.and_then(|lock| lock.choose(&mut rng).copied()); + socket_addr + }; + let Some(peer) = peer else { continue }; + // tracing::info!("Connecting to {peer}"); + let conn = ConnectionConfig::new() + .change_network(network) + .request_addr() + .set_service_requirement(ServiceFlags::NETWORK) + .decrease_version_requirement(ProtocolVersion::BIP0031_VERSION) + .open_connection(peer, timeout); + let Ok((writer, mut reader, metrics)) = conn else { + // tracing::warn!("Connection failed"); + continue; + }; + // tracing::info!("Connection successful"); + let payload = InventoryPayload(batch.iter().map(|hash| Inventory::Block(*hash)).collect()); + // tracing::info!("Requesting {} blocks", payload.0.len()); + let getdata = NetworkMessage::GetData(payload); + if writer.send_message(getdata).is_err() { + continue; + } + while let Ok(Some(message)) = reader.read_message() { + match message { + NetworkMessage::Ping(nonce) => { + let _ = writer.send_message(NetworkMessage::Pong(nonce)); + } + NetworkMessage::Block(block) => { + let hash = block.block_hash(); + batch.retain(|b| hash.ne(b)); + let kernal_hash: kernel::BlockHash = kernel::BlockHash { + hash: hash.to_byte_array(), + }; + let height = chain + .block_index_by_hash(kernal_hash) + .expect("header is in best chain."); + let block_height = BlockHeight::from_u32(height.height().unsigned_abs()); + let unspent_indexes: HashSet = + hints.get_block_offsets(block_height).into_iter().collect(); + // tracing::info!("{task_id} -> {block_height}:{hash}"); + let file_path = block_dir.join(format!("{hash}.block")); + let mut file = File::create_new(file_path).expect("duplicate block file"); + let block_bytes = consensus::serialize(&block); + file.write_all(&block_bytes) + .expect("failed to write block file"); + // tracing::info!("Wrote {hash} to file"); + let (_, transactions) = block.into_parts(); + let mut output_index = 0; + for transaction in transactions { + let tx_hash = transaction.compute_txid(); + if !transaction.is_coinbase() { + for input in transaction.inputs { + let input_hash = accumulator::hash_outpoint(input.previous_output); + let update = AccumulatorUpdate::Spent(input_hash); + updater + .send(update) + .expect("accumulator task must not panic"); + } + } + for (vout, txout) in transaction.outputs.iter().enumerate() { + if !txout.script_pubkey.is_op_return() + && !txout.script_pubkey.len() > 10_000 + && !unspent_indexes.contains(&output_index) + { + let outpoint = OutPoint { + txid: tx_hash, + vout: vout as u32, + }; + let input_hash = accumulator::hash_outpoint(outpoint); + let update = AccumulatorUpdate::Add(input_hash); + updater + .send(update) + .expect("accumulator task must not panic"); + } + output_index += 1 + } + } + + if batch.is_empty() { + tracing::info!("All block ranges fetched: {task_id}"); + return; + } + } + NetworkMessage::AddrV2(payload) => { + if let Ok(mut lock) = peers.lock() { + let addrs: Vec = payload + .0 + .into_iter() + .filter_map(|addr| { + addr.socket_addr().ok().map(|sock| (addr.port, sock)) + }) + .map(|(_, addr)| addr) + .collect(); + // tracing::info!("Adding {} peers", addrs.len()); + lock.extend(addrs); + } + } + _ => (), + } + if let Some(message_rate) = metrics.message_rate(TimedMessage::Block) { + if message_rate.total_count() < 2 { + continue; + } + let Some(rate) = message_rate.messages_per_secs(Instant::now()) else { + continue; + }; + if rate < 2. { + tracing::warn!("Disconnecting from {task_id} for stalling"); + break; + } + } + } + if batch.is_empty() { + break; + } + } + tracing::info!("All block ranges fetched: {task_id}"); +} + +pub fn hashes_from_chain(chain: Arc, chunks: usize) -> Vec> { + let height = chain.best_header().height(); + let mut hashes = Vec::with_capacity(height as usize); + let mut curr = chain.best_header(); + let tip_hash = BlockHash::from_byte_array(curr.block_hash().hash); + hashes.push(tip_hash); + while let Ok(next) = curr.prev() { + if next.height() == 0 { + return hashes.chunks(chunks).map(|slice| slice.to_vec()).collect(); + } + let hash = BlockHash::from_byte_array(next.block_hash().hash); + hashes.push(hash); + curr = next; + } + hashes.chunks(chunks).map(|slice| slice.to_vec()).collect() +}