Skip to content

Commit adca36c

Browse files
committed
feat(sync): implement bidirectional block request protocol and orphan cache
Block Request Protocol: - Add BlockRequest/BlockResponse handling in NetworkActor for serving historical blocks to requesting peers (GetBlocks, GetChainStatus) - Forward received blocks from peers to SyncActor for processing - Wire StorageActor to NetworkActor via SetStorageActor message for block range queries when serving requests Orphan Block Cache: - Add OrphanBlockCache for handling out-of-order block reception - Track observed_height from orphaned blocks to detect network height - Update ChainStatus with observed_height and orphan_count fields - Trigger historical sync when observed height exceeds local height Sync Lifecycle Logging: - Add detailed box-format logging for sync state transitions - Log StartSync, StopSync, RequestBlocks, HandleBlockResponse events - Add peer discovery and height reporting logs - Include sync completion summary with blocks synced count Also updates test fixtures with new ChainStatus fields and orphan_cache initialization in ChainActor test helpers.
1 parent 210c4b9 commit adca36c

File tree

12 files changed

+1345
-86
lines changed

12 files changed

+1345
-86
lines changed

app/src/actors_v2/chain/actor.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use uuid::Uuid;
1717

1818
use super::{
1919
messages::{BlockSource, ChainMessage},
20+
orphan_cache::OrphanBlockCache,
2021
state::SyncStatus,
2122
ChainConfig, ChainError, ChainMetrics, ChainState,
2223
};
@@ -102,6 +103,10 @@ pub struct ChainActor {
102103

103104
/// Phase 3: Active gap fill requests (start_height -> GapFillRequest)
104105
pub(crate) gap_fill_requests: Arc<RwLock<HashMap<u64, GapFillRequest>>>,
106+
107+
/// Orphan block cache: stores blocks whose parents haven't been imported yet
108+
/// Used for out-of-order block reception and tracking observed network height
109+
pub(crate) orphan_cache: Arc<RwLock<OrphanBlockCache>>,
105110
}
106111

107112
impl ChainActor {
@@ -130,6 +135,8 @@ impl ChainActor {
130135
// Phase 3: Initialize gap detection queue
131136
queued_blocks: Arc::new(RwLock::new(HashMap::new())),
132137
gap_fill_requests: Arc::new(RwLock::new(HashMap::new())),
138+
// Orphan block cache for out-of-order block reception
139+
orphan_cache: Arc::new(RwLock::new(OrphanBlockCache::new())),
133140
}
134141
}
135142

app/src/actors_v2/chain/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ pub enum ChainError {
8787
#[error("Invalid parent relationship: {0}")]
8888
InvalidParent(String),
8989

90+
#[error("Orphan block: parent not found (parent_hash={parent_hash}, block_height={block_height})")]
91+
OrphanBlock {
92+
parent_hash: ethereum_types::H256,
93+
block_height: u64,
94+
},
95+
9096
#[error("Internal error: {0}")]
9197
Internal(String),
9298
}

app/src/actors_v2/chain/handlers.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,17 @@ impl Handler<ChainMessage> for ChainActor {
3838

3939
match msg {
4040
ChainMessage::GetChainStatus => {
41+
// Get orphan cache stats (sync access via try_read to avoid blocking)
42+
let (observed_height, orphan_count) = {
43+
match self.orphan_cache.try_read() {
44+
Ok(cache) => (cache.observed_height(), cache.len()),
45+
Err(_) => {
46+
// If we can't get the lock, use current height as observed
47+
(self.state.get_height(), 0)
48+
}
49+
}
50+
};
51+
4152
let status = super::messages::ChainStatus {
4253
height: self.state.get_height(),
4354
head_hash: self.state.get_head_hash(),
@@ -52,6 +63,8 @@ impl Handler<ChainMessage> for ChainActor {
5263
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()),
5364
auxpow_enabled: self.config.enable_auxpow,
5465
blocks_without_pow: self.state.blocks_without_pow,
66+
observed_height,
67+
orphan_count,
5568
};
5669
Box::pin(async move { Ok(ChainResponse::ChainStatus(status)) })
5770
}
@@ -734,6 +747,65 @@ impl Handler<ChainMessage> for ChainActor {
734747
// Step 1.7: Parent hash validation (Phase 3)
735748
if let Some(ref storage_actor) = storage_actor {
736749
if let Err(parent_error) = crate::actors_v2::common::validation::validate_parent_relationship(&block, storage_actor).await {
750+
// Check if this is an orphan block (parent not found)
751+
if let ChainError::OrphanBlock { parent_hash: orphan_parent_hash, block_height: orphan_height } = &parent_error {
752+
// Cache as orphan instead of rejecting
753+
info!(
754+
correlation_id = %correlation_id,
755+
block_hash = %block_hash,
756+
parent_hash = %orphan_parent_hash,
757+
block_height = orphan_height,
758+
"Block is orphan (parent not found) - caching for later processing"
759+
);
760+
761+
// Add to orphan cache
762+
let cache_result = {
763+
let mut cache = self_clone.orphan_cache.write().await;
764+
let parent_hash_h256 = *orphan_parent_hash;
765+
cache.add(
766+
block.clone(),
767+
*orphan_height,
768+
block_hash,
769+
parent_hash_h256,
770+
current_height,
771+
peer_id.clone(),
772+
)
773+
};
774+
775+
match cache_result {
776+
Ok(true) => {
777+
info!(
778+
correlation_id = %correlation_id,
779+
block_hash = %block_hash,
780+
"Orphan block cached successfully"
781+
);
782+
// Return success - block is cached, not rejected
783+
return Ok(ChainResponse::BlockRejected {
784+
reason: format!("Orphan block cached: parent {} not found", orphan_parent_hash),
785+
});
786+
}
787+
Ok(false) => {
788+
debug!(
789+
correlation_id = %correlation_id,
790+
block_hash = %block_hash,
791+
"Orphan block not cached (duplicate or too far ahead)"
792+
);
793+
return Ok(ChainResponse::BlockRejected {
794+
reason: "Orphan block rejected: duplicate or too far ahead".to_string(),
795+
});
796+
}
797+
Err(e) => {
798+
warn!(
799+
correlation_id = %correlation_id,
800+
error = %e,
801+
"Failed to cache orphan block"
802+
);
803+
return Err(parent_error);
804+
}
805+
}
806+
}
807+
808+
// Not an orphan error - propagate the error
737809
error!(
738810
correlation_id = %correlation_id,
739811
block_hash = %block_hash,
@@ -1243,6 +1315,50 @@ impl Handler<ChainMessage> for ChainActor {
12431315
}
12441316
}
12451317

1318+
// Step 8: Process orphan children that were waiting for this block
1319+
// Check if any blocks in the orphan cache were waiting for this parent
1320+
let orphan_children = {
1321+
let mut cache = self_clone.orphan_cache.write().await;
1322+
cache.remove_by_parent(&block_hash)
1323+
};
1324+
1325+
if !orphan_children.is_empty() {
1326+
info!(
1327+
correlation_id = %correlation_id,
1328+
parent_hash = %block_hash,
1329+
orphan_count = orphan_children.len(),
1330+
"Found orphan children waiting for this block - processing recursively"
1331+
);
1332+
1333+
// Process each orphan child as a new import
1334+
for orphan_entry in orphan_children {
1335+
info!(
1336+
correlation_id = %correlation_id,
1337+
orphan_hash = %orphan_entry.hash,
1338+
orphan_height = orphan_entry.height,
1339+
"Re-processing orphan child after parent import"
1340+
);
1341+
1342+
// Re-submit the orphan block for import via the actor address
1343+
// This ensures proper sequencing through the import lock
1344+
let import_msg = ChainMessage::ImportBlock {
1345+
block: orphan_entry.block,
1346+
source: BlockSource::Sync, // Mark as sync since it was cached
1347+
peer_id: orphan_entry.peer_id,
1348+
};
1349+
1350+
// Send to self via the actor address for proper async handling
1351+
if let Err(e) = ctx_addr.send(import_msg).await {
1352+
warn!(
1353+
correlation_id = %correlation_id,
1354+
orphan_hash = %orphan_entry.hash,
1355+
error = ?e,
1356+
"Failed to re-submit orphan block for import"
1357+
);
1358+
}
1359+
}
1360+
}
1361+
12461362
let import_duration = start_time.elapsed();
12471363

12481364
info!(
@@ -1973,6 +2089,10 @@ async fn create_aux_block_helper(
19732089
gap_fill_requests: std::sync::Arc::new(tokio::sync::RwLock::new(
19742090
std::collections::HashMap::new(),
19752091
)),
2092+
// Orphan cache
2093+
orphan_cache: std::sync::Arc::new(tokio::sync::RwLock::new(
2094+
super::orphan_cache::OrphanBlockCache::new(),
2095+
)),
19762096
};
19772097

19782098
actor.create_aux_block(miner_address).await
@@ -2008,6 +2128,10 @@ async fn submit_aux_block_helper(
20082128
gap_fill_requests: std::sync::Arc::new(tokio::sync::RwLock::new(
20092129
std::collections::HashMap::new(),
20102130
)),
2131+
// Orphan cache
2132+
orphan_cache: std::sync::Arc::new(tokio::sync::RwLock::new(
2133+
super::orphan_cache::OrphanBlockCache::new(),
2134+
)),
20112135
};
20122136

20132137
actor

app/src/actors_v2/chain/messages.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,15 @@ pub struct ChainStatus {
240240

241241
/// Blocks without AuxPoW
242242
pub blocks_without_pow: u64,
243+
244+
/// Observed network height (includes orphan blocks)
245+
/// This tracks the highest block height seen from the network,
246+
/// even if those blocks couldn't be imported due to missing parents.
247+
/// Used by SyncActor for network height discovery.
248+
pub observed_height: u64,
249+
250+
/// Number of orphan blocks in cache
251+
pub orphan_count: usize,
243252
}
244253

245254
/// Create AuxPoW block for mining (RPC endpoint)

app/src/actors_v2/chain/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub mod error;
2222
pub mod handlers;
2323
pub mod messages;
2424
pub mod metrics;
25+
pub mod orphan_cache;
2526
pub mod state;
2627
pub mod withdrawals;
2728

@@ -38,6 +39,7 @@ pub use config::ChainConfig;
3839
pub use error::ChainError;
3940
pub use messages::{ChainMessage, ChainResponse};
4041
pub use metrics::ChainMetrics;
42+
pub use orphan_cache::{OrphanBlockCache, OrphanCacheConfig, OrphanCacheStats};
4143
pub use state::ChainState;
4244

4345
// Phase 4 exports

0 commit comments

Comments
 (0)