Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,18 @@ sudo apt-get install unzip
unzip bitcoin.hints.zip
```

To build the Bitcoin kernel, you will need the following on Ubuntu:

```
sudo apt-get install build-essential cmake pkgconf python3 libevent-dev libboost-dev
```

For other systems, follow the Bitcoin Core documentation on how to install the requirements [here](https://github.com/bitcoin/bitcoin/tree/master/doc).

Finally, you will need Rust and cargo installed, you may download them from [here](https://www.rust-lang.org/tools/install).

To start fast IBD:

```
cargo run --bin ibd --release -- <args>
```
Expand All @@ -39,3 +50,9 @@ Arguments:
--write-timeout The maximum time (seconds) to write to a TCP
stream until the connection is killed.
```

If IBD completes, or you experience a bug, you will need to remove the kernel directories from this repository to run the binary again:

```
rm -rf blocks chainstate
```
8 changes: 1 addition & 7 deletions node/config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ doc = "The bitcoin network to operate on. Default `bitcoin`. Options are `bitcoi
[[param]]
name = "min_blocks_per_sec"
type = "f64"
default = "3."
default = "1."
doc = "The minimum rate a peer has to respond to block requests."

[[param]]
Expand All @@ -28,12 +28,6 @@ type = "usize"
default = "32"
doc = "The number of tasks to download blocks. Default is 64. Each task uses two OS threads."

[[param]]
name = "ping_timeout"
type = "u64"
default = "60"
doc = "The time (seconds) a peer has to respond to a `ping` message. Pings are sent aggressively throughout IBD to find slow peers."

[[param]]
name = "tcp_timeout"
type = "u64"
Expand Down
10 changes: 5 additions & 5 deletions node/src/bin/ibd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ fn main() {
.network
.parse::<Network>()
.expect("invalid network string");
let ping_timeout = Duration::from_secs(config.ping_timeout);
let block_per_sec = config.min_blocks_per_sec;
let task_num = config.tasks;
let tcp_timeout = Duration::from_secs(config.tcp_timeout);
Expand Down Expand Up @@ -76,26 +75,27 @@ fn main() {
let acc_task = std::thread::spawn(move || accumulator_state.verify());
let peers = Arc::new(Mutex::new(peers));
let mut tasks = Vec::new();
let hashes = hashes_from_chain(Arc::clone(&chain), network, task_num);
for (task_id, chunk) in hashes.into_iter().enumerate() {
let hashes = hashes_from_chain(Arc::clone(&chain));
let arc_hashes = Arc::new(Mutex::new(hashes));
for task_id in 0..task_num {
let chain = Arc::clone(&chain);
let tx = tx.clone();
let peers = Arc::clone(&peers);
let hashes = Arc::clone(&arc_hashes);
let hints = Arc::clone(&hints);
let block_file_path = block_file_path.clone();
let block_task = std::thread::spawn(move || {
get_blocks_for_range(
task_id as u32,
timeout_conf,
block_per_sec,
ping_timeout,
network,
block_file_path,
chain,
&hints,
peers,
tx,
chunk,
hashes,
)
});
tasks.push(block_task);
Expand Down
112 changes: 48 additions & 64 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use p2p::{
};

const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::WTXID_RELAY_VERSION;
const MAX_GETDATA: usize = 50_000;
const CHUNK_SIZE: usize = 100;
const CONSIDERED_DEAD: f64 = 0.1;

pub fn elapsed_time(then: Instant) {
let duration_sec = then.elapsed().as_secs_f64();
Expand Down Expand Up @@ -168,23 +169,30 @@ pub fn get_blocks_for_range(
task_id: u32,
timeout_params: TimeoutParams,
blocks_per_sec: f64,
_ping_timeout: Duration,
network: Network,
block_dir: Option<PathBuf>,
chain: Arc<ChainstateManager>,
hints: &Hints,
peers: Arc<Mutex<Vec<SocketAddr>>>,
updater: Sender<AccumulatorUpdate>,
mut batch: Vec<BlockHash>,
hashes: Arc<Mutex<Vec<Vec<BlockHash>>>>,
) {
tracing::info!("{task_id} assigned {} blocks", batch.len());
let mut batch = Vec::new();
let mut rng = thread_rng();
loop {
let peer = {
let lock_opt = peers.lock().ok();
let socket_addr = lock_opt.and_then(|lock| lock.choose(&mut rng).copied());
socket_addr
};
if batch.is_empty() {
let mut jobs_lock = hashes.lock().expect("could not take lock on hashes");
let Some(next) = jobs_lock.pop() else {
return;
};
tracing::info!("[thread {task_id:2}]: requesting next batch");
batch = next;
}
let Some(peer) = peer else { continue };
// tracing::info!("Connecting to {peer}");
let conn = ConnectionConfig::new()
Expand All @@ -197,9 +205,9 @@ pub fn get_blocks_for_range(
// tracing::warn!("Connection failed");
continue;
};
// tracing::info!("Connection successful");
let mut completed_batches = 0;
tracing::info!("[thread {task_id:2}]: established connection {peer}");
let payload = InventoryPayload(batch.iter().map(|hash| Inventory::Block(*hash)).collect());
// tracing::info!("Requesting {} blocks", payload.0.len());
let getdata = NetworkMessage::GetData(payload);
if writer.send_message(getdata).is_err() {
continue;
Expand All @@ -211,6 +219,7 @@ pub fn get_blocks_for_range(
}
NetworkMessage::Block(block) => {
let hash = block.block_hash();
// tracing::info!("[thread {task_id:2}]: {hash}");
batch.retain(|b| hash.ne(b));
let kernal_hash: kernel::BlockHash = kernel::BlockHash {
hash: hash.to_byte_array(),
Expand All @@ -221,7 +230,6 @@ pub fn get_blocks_for_range(
let block_height = block_index.height().unsigned_abs();
let unspent_indexes: HashSet<u64> =
hints.get_indexes(block_height).into_iter().collect();
// tracing::info!("{task_id} -> {block_height}:{hash}");
if let Some(block_dir) = block_dir.as_ref() {
let file_path = block_dir.join(format!("{hash}.block"));
let file = File::create_new(file_path);
Expand All @@ -238,7 +246,6 @@ pub fn get_blocks_for_range(
.expect("failed to write block file");
file.sync_data().expect("could not sync file with OS");
}
// tracing::info!("Wrote {hash} to file");
let (_, transactions) = block.into_parts();
let mut output_index = 0;
for transaction in transactions {
Expand Down Expand Up @@ -270,12 +277,29 @@ pub fn get_blocks_for_range(
output_index += 1
}
}
if batch.len() % 100 == 0 {
tracing::info!("{task_id} has {} remaining blocks", batch.len());
}
if batch.is_empty() {
tracing::info!("All block ranges fetched: {task_id}");
return;
let mut jobs_lock = hashes.lock().expect("could not take lock on hashes");
let Some(next) = jobs_lock.pop() else {
tracing::info!("[thread {task_id:2}]: no jobs remaining, please wait for other threads");
return;
};
batch = next;
completed_batches += 1;
tracing::info!(
"[thread {task_id:2}]: requesting next batch. blocks downloaded: {}",
CHUNK_SIZE * completed_batches
);
tracing::info!(
"[thread m]: blocks remaining {}",
CHUNK_SIZE * jobs_lock.len()
);
let payload = InventoryPayload(
batch.iter().map(|hash| Inventory::Block(*hash)).collect(),
);
let getdata = NetworkMessage::GetData(payload);
if writer.send_message(getdata).is_err() {
break;
}
}
}
NetworkMessage::AddrV2(payload) => {
Expand All @@ -288,47 +312,34 @@ pub fn get_blocks_for_range(
})
.map(|(_, addr)| addr)
.collect();
// tracing::info!("Adding {} peers", addrs.len());
lock.extend(addrs);
}
}
_ => (),
}
if let Some(message_rate) = metrics.message_rate(TimedMessage::Block) {
if message_rate.total_count() < 100 {
continue;
}
let Some(rate) = message_rate.messages_per_secs(Instant::now()) else {
continue;
};
if rate < blocks_per_sec {
tracing::warn!("Disconnecting from {task_id} for stalling");
if rate < CONSIDERED_DEAD {
tracing::warn!("[thread {task_id:2}]: block rate considered dead");
break;
}
if rate < blocks_per_sec && message_rate.total_count() > 20 {
tracing::warn!("[thread {task_id:2}]: insufficient blocks/second rate");
break;
}
}
// if metrics.ping_timed_out(ping_timeout) {
// tracing::warn!("{task_id} failed to respond to a ping");
// break;
// }
}
if batch.is_empty() {
break;
}
}
tracing::info!("All block ranges fetched: {task_id}");
}

pub fn hashes_from_chain(
chain: Arc<ChainstateManager>,
network: Network,
jobs: usize,
) -> Vec<Vec<BlockHash>> {
pub fn hashes_from_chain(chain: Arc<ChainstateManager>) -> Vec<Vec<BlockHash>> {
let height = chain.best_header().height();
let mut hashes = Vec::with_capacity(height as usize);
let mut curr = chain.best_header();
let tip_hash = BlockHash::from_byte_array(curr.block_hash().hash);
hashes.push(tip_hash);
let mut out = Vec::new();
while let Ok(next) = curr.prev() {
if next.height() == 0 {
break;
Expand All @@ -337,38 +348,11 @@ pub fn hashes_from_chain(
hashes.push(hash);
curr = next;
}
if matches!(network, Network::Signet) {
return hashes.chunks(20_000).map(|slice| slice.to_vec()).collect();
}
// These blocks are empty. Fetch the maximum amount of blocks.
let first_epoch = hashes.split_off(hashes.len() - 200_000);
let first_chunks: Vec<Vec<BlockHash>> = first_epoch
.chunks(MAX_GETDATA)
hashes
.chunks(CHUNK_SIZE)
.map(|slice| slice.to_vec())
.collect();
out.extend(first_chunks);
// These start to get larger, but are still small
let next_epoch = hashes.split_off(hashes.len() - 100_000);
let next_chunks: Vec<Vec<BlockHash>> = next_epoch
.chunks(MAX_GETDATA / 2)
.map(|slice| slice.to_vec())
.collect();
out.extend(next_chunks);
// Still not entirely full, but almost there
let to_segwit = hashes.split_off(hashes.len() - 100_000);
let to_segwit_chunks: Vec<Vec<BlockHash>> = to_segwit
.chunks(MAX_GETDATA / 4)
.map(|slice| slice.to_vec())
.collect();
out.extend(to_segwit_chunks);
// Now divide the rest among jobs
let chunk_size = hashes.len() / jobs;
let rest: Vec<Vec<BlockHash>> = hashes
.chunks(chunk_size)
.map(|slice| slice.to_vec())
.collect();
out.extend(rest);
out
.rev()
.collect()
}

pub trait ChainExt {
Expand Down