Skip to content

Commit 4798c2b

Browse files
committed
node: All threads remain active during IBD
In the previous iteration when threads complete their jobs the return to the main loop. We can keep good peers running by giving them more blocks to fetch. This implements a simple queue that jobs can steal work from.
1 parent 2557016 commit 4798c2b

File tree

4 files changed

+71
-76
lines changed

4 files changed

+71
-76
lines changed

node/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,18 @@ sudo apt-get install unzip
1212
unzip bitcoin.hints.zip
1313
```
1414

15+
To build the Bitcoin kernel, you will need the following on Ubuntu:
16+
17+
```
18+
sudo apt-get install build-essential cmake pkgconf python3 libevent-dev libboost-dev
19+
```
20+
21+
For other systems, follow the Bitcoin Core documentation on how to install the requirements [here](https://github.com/bitcoin/bitcoin/tree/master/doc).
22+
23+
Finally, you will need Rust and cargo installed, you may download them from [here](https://www.rust-lang.org/tools/install).
24+
1525
To start fast IBD:
26+
1627
```
1728
cargo run --bin ibd --release -- <args>
1829
```
@@ -39,3 +50,9 @@ Arguments:
3950
--write-timeout The maximum time (seconds) to write to a TCP
4051
stream until the connection is killed.
4152
```
53+
54+
If IBD completes, or you experience a bug, you will need to remove the kernel directories from this repository to run the binary again:
55+
56+
```
57+
rm -rf blocks chainstate
58+
```

node/config_spec.toml

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ doc = "The bitcoin network to operate on. Default `bitcoin`. Options are `bitcoi
1919
[[param]]
2020
name = "min_blocks_per_sec"
2121
type = "f64"
22-
default = "3."
22+
default = "1."
2323
doc = "The minimum rate a peer has to respond to block requests."
2424

2525
[[param]]
@@ -28,12 +28,6 @@ type = "usize"
2828
default = "32"
2929
doc = "The number of tasks to download blocks. Default is 64. Each task uses two OS threads."
3030

31-
[[param]]
32-
name = "ping_timeout"
33-
type = "u64"
34-
default = "60"
35-
doc = "The time (seconds) a peer has to respond to a `ping` message. Pings are sent aggressively throughout IBD to find slow peers."
36-
3731
[[param]]
3832
name = "tcp_timeout"
3933
type = "u64"

node/src/bin/ibd.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ fn main() {
2727
.network
2828
.parse::<Network>()
2929
.expect("invalid network string");
30-
let ping_timeout = Duration::from_secs(config.ping_timeout);
3130
let block_per_sec = config.min_blocks_per_sec;
3231
let task_num = config.tasks;
3332
let tcp_timeout = Duration::from_secs(config.tcp_timeout);
@@ -76,26 +75,27 @@ fn main() {
7675
let acc_task = std::thread::spawn(move || accumulator_state.verify());
7776
let peers = Arc::new(Mutex::new(peers));
7877
let mut tasks = Vec::new();
79-
let hashes = hashes_from_chain(Arc::clone(&chain), network, task_num);
80-
for (task_id, chunk) in hashes.into_iter().enumerate() {
78+
let hashes = hashes_from_chain(Arc::clone(&chain));
79+
let arc_hashes = Arc::new(Mutex::new(hashes));
80+
for task_id in 0..task_num {
8181
let chain = Arc::clone(&chain);
8282
let tx = tx.clone();
8383
let peers = Arc::clone(&peers);
84+
let hashes = Arc::clone(&arc_hashes);
8485
let hints = Arc::clone(&hints);
8586
let block_file_path = block_file_path.clone();
8687
let block_task = std::thread::spawn(move || {
8788
get_blocks_for_range(
8889
task_id as u32,
8990
timeout_conf,
9091
block_per_sec,
91-
ping_timeout,
9292
network,
9393
block_file_path,
9494
chain,
9595
&hints,
9696
peers,
9797
tx,
98-
chunk,
98+
hashes,
9999
)
100100
});
101101
tasks.push(block_task);

node/src/lib.rs

Lines changed: 48 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ use p2p::{
3434
};
3535

3636
const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::WTXID_RELAY_VERSION;
37-
const MAX_GETDATA: usize = 50_000;
37+
const CHUNK_SIZE: usize = 100;
38+
const CONSIDERED_DEAD: f64 = 0.1;
3839

3940
pub fn elapsed_time(then: Instant) {
4041
let duration_sec = then.elapsed().as_secs_f64();
@@ -168,23 +169,30 @@ pub fn get_blocks_for_range(
168169
task_id: u32,
169170
timeout_params: TimeoutParams,
170171
blocks_per_sec: f64,
171-
_ping_timeout: Duration,
172172
network: Network,
173173
block_dir: Option<PathBuf>,
174174
chain: Arc<ChainstateManager>,
175175
hints: &Hints,
176176
peers: Arc<Mutex<Vec<SocketAddr>>>,
177177
updater: Sender<AccumulatorUpdate>,
178-
mut batch: Vec<BlockHash>,
178+
hashes: Arc<Mutex<Vec<Vec<BlockHash>>>>,
179179
) {
180-
tracing::info!("{task_id} assigned {} blocks", batch.len());
180+
let mut batch = Vec::new();
181181
let mut rng = thread_rng();
182182
loop {
183183
let peer = {
184184
let lock_opt = peers.lock().ok();
185185
let socket_addr = lock_opt.and_then(|lock| lock.choose(&mut rng).copied());
186186
socket_addr
187187
};
188+
if batch.is_empty() {
189+
let mut jobs_lock = hashes.lock().expect("could not take lock on hashes");
190+
let Some(next) = jobs_lock.pop() else {
191+
return;
192+
};
193+
tracing::info!("[thread {task_id:2}]: requesting next batch");
194+
batch = next;
195+
}
188196
let Some(peer) = peer else { continue };
189197
// tracing::info!("Connecting to {peer}");
190198
let conn = ConnectionConfig::new()
@@ -197,9 +205,9 @@ pub fn get_blocks_for_range(
197205
// tracing::warn!("Connection failed");
198206
continue;
199207
};
200-
// tracing::info!("Connection successful");
208+
let mut completed_batches = 0;
209+
tracing::info!("[thread {task_id:2}]: established connection {peer}");
201210
let payload = InventoryPayload(batch.iter().map(|hash| Inventory::Block(*hash)).collect());
202-
// tracing::info!("Requesting {} blocks", payload.0.len());
203211
let getdata = NetworkMessage::GetData(payload);
204212
if writer.send_message(getdata).is_err() {
205213
continue;
@@ -211,6 +219,7 @@ pub fn get_blocks_for_range(
211219
}
212220
NetworkMessage::Block(block) => {
213221
let hash = block.block_hash();
222+
// tracing::info!("[thread {task_id:2}]: {hash}");
214223
batch.retain(|b| hash.ne(b));
215224
let kernal_hash: kernel::BlockHash = kernel::BlockHash {
216225
hash: hash.to_byte_array(),
@@ -221,7 +230,6 @@ pub fn get_blocks_for_range(
221230
let block_height = block_index.height().unsigned_abs();
222231
let unspent_indexes: HashSet<u64> =
223232
hints.get_indexes(block_height).into_iter().collect();
224-
// tracing::info!("{task_id} -> {block_height}:{hash}");
225233
if let Some(block_dir) = block_dir.as_ref() {
226234
let file_path = block_dir.join(format!("{hash}.block"));
227235
let file = File::create_new(file_path);
@@ -238,7 +246,6 @@ pub fn get_blocks_for_range(
238246
.expect("failed to write block file");
239247
file.sync_data().expect("could not sync file with OS");
240248
}
241-
// tracing::info!("Wrote {hash} to file");
242249
let (_, transactions) = block.into_parts();
243250
let mut output_index = 0;
244251
for transaction in transactions {
@@ -270,12 +277,29 @@ pub fn get_blocks_for_range(
270277
output_index += 1
271278
}
272279
}
273-
if batch.len() % 100 == 0 {
274-
tracing::info!("{task_id} has {} remaining blocks", batch.len());
275-
}
276280
if batch.is_empty() {
277-
tracing::info!("All block ranges fetched: {task_id}");
278-
return;
281+
let mut jobs_lock = hashes.lock().expect("could not take lock on hashes");
282+
let Some(next) = jobs_lock.pop() else {
283+
tracing::info!("[thread {task_id:2}]: no jobs remaining, please wait for other threads");
284+
return;
285+
};
286+
batch = next;
287+
completed_batches += 1;
288+
tracing::info!(
289+
"[thread {task_id:2}]: requesting next batch. blocks downloaded: {}",
290+
CHUNK_SIZE * completed_batches
291+
);
292+
tracing::info!(
293+
"[thread m]: blocks remaining {}",
294+
CHUNK_SIZE * jobs_lock.len()
295+
);
296+
let payload = InventoryPayload(
297+
batch.iter().map(|hash| Inventory::Block(*hash)).collect(),
298+
);
299+
let getdata = NetworkMessage::GetData(payload);
300+
if writer.send_message(getdata).is_err() {
301+
break;
302+
}
279303
}
280304
}
281305
NetworkMessage::AddrV2(payload) => {
@@ -288,47 +312,34 @@ pub fn get_blocks_for_range(
288312
})
289313
.map(|(_, addr)| addr)
290314
.collect();
291-
// tracing::info!("Adding {} peers", addrs.len());
292315
lock.extend(addrs);
293316
}
294317
}
295318
_ => (),
296319
}
297320
if let Some(message_rate) = metrics.message_rate(TimedMessage::Block) {
298-
if message_rate.total_count() < 100 {
299-
continue;
300-
}
301321
let Some(rate) = message_rate.messages_per_secs(Instant::now()) else {
302322
continue;
303323
};
304-
if rate < blocks_per_sec {
305-
tracing::warn!("Disconnecting from {task_id} for stalling");
324+
if rate < CONSIDERED_DEAD {
325+
tracing::warn!("[thread {task_id:2}]: block rate considered dead");
326+
break;
327+
}
328+
if rate < blocks_per_sec && message_rate.total_count() > 20 {
329+
tracing::warn!("[thread {task_id:2}]: insufficient blocks/second rate");
306330
break;
307331
}
308332
}
309-
// if metrics.ping_timed_out(ping_timeout) {
310-
// tracing::warn!("{task_id} failed to respond to a ping");
311-
// break;
312-
// }
313-
}
314-
if batch.is_empty() {
315-
break;
316333
}
317334
}
318-
tracing::info!("All block ranges fetched: {task_id}");
319335
}
320336

321-
pub fn hashes_from_chain(
322-
chain: Arc<ChainstateManager>,
323-
network: Network,
324-
jobs: usize,
325-
) -> Vec<Vec<BlockHash>> {
337+
pub fn hashes_from_chain(chain: Arc<ChainstateManager>) -> Vec<Vec<BlockHash>> {
326338
let height = chain.best_header().height();
327339
let mut hashes = Vec::with_capacity(height as usize);
328340
let mut curr = chain.best_header();
329341
let tip_hash = BlockHash::from_byte_array(curr.block_hash().hash);
330342
hashes.push(tip_hash);
331-
let mut out = Vec::new();
332343
while let Ok(next) = curr.prev() {
333344
if next.height() == 0 {
334345
break;
@@ -337,38 +348,11 @@ pub fn hashes_from_chain(
337348
hashes.push(hash);
338349
curr = next;
339350
}
340-
if matches!(network, Network::Signet) {
341-
return hashes.chunks(20_000).map(|slice| slice.to_vec()).collect();
342-
}
343-
// These blocks are empty. Fetch the maximum amount of blocks.
344-
let first_epoch = hashes.split_off(hashes.len() - 200_000);
345-
let first_chunks: Vec<Vec<BlockHash>> = first_epoch
346-
.chunks(MAX_GETDATA)
351+
hashes
352+
.chunks(CHUNK_SIZE)
347353
.map(|slice| slice.to_vec())
348-
.collect();
349-
out.extend(first_chunks);
350-
// These start to get larger, but are still small
351-
let next_epoch = hashes.split_off(hashes.len() - 100_000);
352-
let next_chunks: Vec<Vec<BlockHash>> = next_epoch
353-
.chunks(MAX_GETDATA / 2)
354-
.map(|slice| slice.to_vec())
355-
.collect();
356-
out.extend(next_chunks);
357-
// Still not entirely full, but almost there
358-
let to_segwit = hashes.split_off(hashes.len() - 100_000);
359-
let to_segwit_chunks: Vec<Vec<BlockHash>> = to_segwit
360-
.chunks(MAX_GETDATA / 4)
361-
.map(|slice| slice.to_vec())
362-
.collect();
363-
out.extend(to_segwit_chunks);
364-
// Now divide the rest among jobs
365-
let chunk_size = hashes.len() / jobs;
366-
let rest: Vec<Vec<BlockHash>> = hashes
367-
.chunks(chunk_size)
368-
.map(|slice| slice.to_vec())
369-
.collect();
370-
out.extend(rest);
371-
out
354+
.rev()
355+
.collect()
372356
}
373357

374358
pub trait ChainExt {

0 commit comments

Comments
 (0)