Skip to content

Commit 51a7bc7

Browse files
committed
add BatchHeaderFieldsProvider
1 parent 77d9b42 commit 51a7bc7

File tree

4 files changed

+185
-20
lines changed

4 files changed

+185
-20
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

store/src/transaction.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,11 @@ impl StoreTransaction {
183183
// Also insert into COLUMN_HEADER_INDEX for fast number→hash lookup during IBD
184184
// This allows get_ancestor to use O(1) lookups instead of walking parent chain
185185
let block_number: packed::Uint64 = header.number().into();
186-
self.insert_raw(COLUMN_HEADER_INDEX, block_number.as_slice(), hash.as_slice())
186+
self.insert_raw(
187+
COLUMN_HEADER_INDEX,
188+
block_number.as_slice(),
189+
hash.as_slice(),
190+
)
187191
}
188192

189193
/// TODO(doc): @quake

sync/src/synchronizer/headers_process.rs

Lines changed: 111 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,62 @@ use ckb_error::Error;
66
use ckb_logger::{Level, debug, log_enabled, warn};
77
use ckb_network::{CKBProtocolContext, PeerIndex};
88
use ckb_shared::block_status::BlockStatus;
9-
use ckb_traits::HeaderFieldsProvider;
9+
use ckb_traits::{HeaderFields, HeaderFieldsProvider};
10+
use ckb_types::core::HeaderView;
1011
use ckb_types::{core, packed, prelude::*};
11-
use ckb_verification::{HeaderError, HeaderVerifier};
12+
use ckb_verification::{HeaderError, HeaderVerifier, UnknownParentError};
1213
use ckb_verification_traits::Verifier;
14+
use std::collections::HashMap;
1315
use std::sync::Arc;
1416

17+
/// HeaderFieldsProvider that checks in-memory headers first, then falls back to database
18+
/// This allows batch verification of continuous headers without inserting them into DB first
19+
pub struct BatchHeaderFieldsProvider<'a, DL> {
20+
data_loader: &'a DL,
21+
headers_map: HashMap<packed::Byte32, HeaderFields>,
22+
}
23+
24+
impl<'a, DL: HeaderFieldsProvider> BatchHeaderFieldsProvider<'a, DL> {
25+
pub fn new(data_loader: &'a DL, headers: &[HeaderView]) -> Self {
26+
let mut headers_map = HashMap::new();
27+
for header in headers {
28+
let fields = HeaderFields {
29+
hash: header.hash(),
30+
number: header.number(),
31+
epoch: header.epoch(),
32+
timestamp: header.timestamp(),
33+
parent_hash: header.parent_hash(),
34+
};
35+
headers_map.insert(header.hash(), fields);
36+
}
37+
BatchHeaderFieldsProvider {
38+
data_loader,
39+
headers_map,
40+
}
41+
}
42+
}
43+
44+
impl<'a, DL: HeaderFieldsProvider> HeaderFieldsProvider for BatchHeaderFieldsProvider<'a, DL> {
45+
fn get_header_fields(&self, hash: &packed::Byte32) -> Option<HeaderFields> {
46+
// Check in-memory batch first
47+
if let Some(fields) = self.headers_map.get(hash) {
48+
return Some(HeaderFields {
49+
hash: fields.hash.clone(),
50+
number: fields.number,
51+
epoch: fields.epoch,
52+
timestamp: fields.timestamp,
53+
parent_hash: fields.parent_hash.clone(),
54+
});
55+
}
56+
// Fall back to database
57+
self.data_loader.get_header_fields(hash)
58+
}
59+
60+
// Note: block_median_time() uses the default trait implementation,
61+
// which internally calls get_header_fields() above, so it automatically
62+
// checks the in-memory batch first, then falls back to database.
63+
}
64+
1565
pub struct HeadersProcess<'a> {
1666
message: packed::SendHeadersReader<'a>,
1767
synchronizer: &'a Synchronizer,
@@ -156,33 +206,60 @@ impl<'a> HeadersProcess<'a> {
156206
}
157207
};
158208

159-
for header in headers.iter().skip(1) {
160-
let verifier = HeaderVerifier::new(shared, consensus);
209+
// Batch verify all headers using BatchHeaderFieldsProvider
210+
// This allows headers to reference each other without being in DB yet
211+
let batch_provider = BatchHeaderFieldsProvider::new(shared, &headers[1..]);
212+
let mut headers_to_insert = Vec::new();
213+
214+
for (idx, header) in headers.iter().enumerate().skip(1) {
215+
// Check if already valid (skip re-validation)
216+
let status = self.active_chain.get_block_status(&header.hash());
217+
if status.contains(BlockStatus::HEADER_VALID) {
218+
continue;
219+
}
220+
221+
let verifier = HeaderVerifier::new(&batch_provider, consensus);
161222
let acceptor =
162223
HeaderAcceptor::new(header, self.peer, verifier, self.active_chain.clone());
163-
let result = acceptor.accept();
224+
let (result, already_valid) = acceptor.validate();
225+
164226
match result.state {
165227
ValidationState::Invalid => {
228+
// Mark all remaining headers as invalid (invalid parent)
229+
for remaining_header in headers.iter().skip(idx + 1) {
230+
shared.shared().insert_block_status(
231+
remaining_header.hash(),
232+
BlockStatus::BLOCK_INVALID,
233+
);
234+
}
166235
debug!(
167-
"HeadersProcess accept result is invalid, error = {:?}, header = {:?}",
168-
result.error, headers,
236+
"HeadersProcess validation failed, error = {:?}, header = {:?}",
237+
result.error, header,
169238
);
170239
return StatusCode::HeadersIsInvalid
171-
.with_context(format!("accept header {header:?}"));
240+
.with_context(format!("validate header {header:?}"));
172241
}
173242
ValidationState::TemporaryInvalid => {
174243
debug!(
175-
"HeadersProcess accept result is temporarily invalid, header = {:?}",
244+
"HeadersProcess validation temporarily invalid, header = {:?}",
176245
header
177246
);
178247
return Status::ok();
179248
}
180249
ValidationState::Valid => {
181-
// Valid, do nothing
250+
// Collect valid headers for batch insertion
251+
if !already_valid {
252+
headers_to_insert.push(header.clone());
253+
}
182254
}
183255
};
184256
}
185257

258+
// Batch insert all valid headers in ONE transaction
259+
if !headers_to_insert.is_empty() {
260+
shared.insert_valid_headers_batch(self.peer, &headers_to_insert);
261+
}
262+
186263
self.debug();
187264

188265
if headers.len() == MAX_HEADERS_LEN {
@@ -273,6 +350,10 @@ impl<'a, DL: HeaderFieldsProvider> HeaderAcceptor<'a, DL> {
273350
state.invalid(Some(ValidationError::Verify(error)));
274351
true
275352
}
353+
} else if error.downcast_ref::<UnknownParentError>().is_some() {
354+
// UnknownParent is temporary - we just don't have the parent header yet
355+
state.temporary_invalid(Some(ValidationError::Verify(error)));
356+
false
276357
} else {
277358
state.invalid(Some(ValidationError::Verify(error)));
278359
true
@@ -289,14 +370,15 @@ impl<'a, DL: HeaderFieldsProvider> HeaderAcceptor<'a, DL> {
289370
}
290371
}
291372

292-
pub fn accept(&self) -> ValidationResult {
373+
/// Validate header without inserting it to database
374+
/// Returns ValidationResult and whether the header was already valid
375+
pub fn validate(&self) -> (ValidationResult, bool) {
293376
let mut result = ValidationResult::default();
294377
let sync_shared = self.active_chain.sync_shared();
295378
let state = self.active_chain.state();
296379
let shared = sync_shared.shared();
297380

298-
// FIXME If status == BLOCK_INVALID then return early. But which error
299-
// type should we return?
381+
// Check if already valid
300382
let status = self.active_chain.get_block_status(&self.header.hash());
301383
if status.contains(BlockStatus::HEADER_VALID) {
302384
let header_index = sync_shared
@@ -311,7 +393,7 @@ impl<'a, DL: HeaderFieldsProvider> HeaderAcceptor<'a, DL> {
311393
state
312394
.peers()
313395
.may_set_best_known_header(self.peer, header_index);
314-
return result;
396+
return (result, true);
315397
}
316398

317399
if self.prev_block_check(&mut result).is_err() {
@@ -321,7 +403,7 @@ impl<'a, DL: HeaderFieldsProvider> HeaderAcceptor<'a, DL> {
321403
self.header.hash(),
322404
);
323405
shared.insert_block_status(self.header.hash(), BlockStatus::BLOCK_INVALID);
324-
return result;
406+
return (result, false);
325407
}
326408

327409
if let Some(is_invalid) = self.non_contextual_check(&mut result).err() {
@@ -333,7 +415,7 @@ impl<'a, DL: HeaderFieldsProvider> HeaderAcceptor<'a, DL> {
333415
if is_invalid {
334416
shared.insert_block_status(self.header.hash(), BlockStatus::BLOCK_INVALID);
335417
}
336-
return result;
418+
return (result, false);
337419
}
338420

339421
if self.version_check(&mut result).is_err() {
@@ -343,10 +425,21 @@ impl<'a, DL: HeaderFieldsProvider> HeaderAcceptor<'a, DL> {
343425
self.header.hash(),
344426
);
345427
shared.insert_block_status(self.header.hash(), BlockStatus::BLOCK_INVALID);
346-
return result;
428+
return (result, false);
429+
}
430+
431+
(result, false)
432+
}
433+
434+
pub fn accept(&self) -> ValidationResult {
435+
let (result, already_valid) = self.validate();
436+
437+
// If validation passed and header wasn't already valid, insert it
438+
if result.state == ValidationState::Valid && !already_valid {
439+
let sync_shared = self.active_chain.sync_shared();
440+
sync_shared.insert_valid_header(self.peer, self.header);
347441
}
348442

349-
sync_shared.insert_valid_header(self.peer, self.header);
350443
result
351444
}
352445
}

sync/src/types/mod.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,6 +1147,74 @@ impl SyncShared {
11471147
self.state.may_set_shared_best_header(header_index);
11481148
}
11491149

1150+
/// Batch insert multiple valid headers in a single transaction
1151+
/// Headers must be continuous (parent-child chain)
1152+
/// Returns the last header's HeaderIndex
1153+
pub fn insert_valid_headers_batch(
1154+
&self,
1155+
peer: PeerIndex,
1156+
headers: &[core::HeaderView],
1157+
) -> Option<HeaderIndex> {
1158+
if headers.is_empty() {
1159+
return None;
1160+
}
1161+
1162+
let store = self.store();
1163+
1164+
// Start with the first header's parent total_difficulty
1165+
let first_parent_hash = headers[0].parent_hash();
1166+
let mut current_td = {
1167+
let shared_best = self.state.shared_best_header_ref();
1168+
if shared_best.hash() == first_parent_hash {
1169+
shared_best.total_difficulty().clone()
1170+
} else {
1171+
drop(shared_best);
1172+
let parent_header = store
1173+
.get_block_header(&first_parent_hash)
1174+
.expect("parent should exist");
1175+
self.state
1176+
.get_header_total_difficulty(store, &first_parent_hash, &parent_header)
1177+
.expect("parent total_difficulty should be available")
1178+
}
1179+
};
1180+
1181+
// Create ONE transaction for all headers
1182+
let db_txn = store.begin_transaction();
1183+
1184+
let mut last_header_index = None;
1185+
1186+
// Insert all headers in sequence
1187+
for header in headers {
1188+
// Compute total_difficulty incrementally
1189+
current_td = current_td + header.difficulty();
1190+
1191+
// Insert header to DB
1192+
db_txn
1193+
.insert_header(header)
1194+
.expect("insert header should be ok");
1195+
1196+
// Cache the total_difficulty
1197+
let hash = header.hash();
1198+
self.state
1199+
.cache_header_difficulty(hash.clone(), current_td.clone());
1200+
1201+
// Track the last header index
1202+
let header_index = HeaderIndex::new(header.number(), hash, current_td.clone());
1203+
last_header_index = Some(header_index.clone());
1204+
1205+
// Update peer's best known header
1206+
self.state
1207+
.peers()
1208+
.may_set_best_known_header(peer, header_index.clone());
1209+
self.state.may_set_shared_best_header(header_index);
1210+
}
1211+
1212+
// Commit ONCE - single fsync for all headers
1213+
db_txn.commit().expect("commit should be ok");
1214+
1215+
last_header_index
1216+
}
1217+
11501218
/// Get HeaderIndex for a given hash
11511219
/// Returns HeaderIndex with number, hash, and total_difficulty
11521220
pub(crate) fn get_header_index(&self, hash: &Byte32) -> Option<HeaderIndex> {

0 commit comments

Comments
 (0)