Skip to content

Commit 6859c59

Browse files
committed
feat(metrics): enhance chain and network metrics for Phase 5
- Introduced new metrics for tracking fork detection and reorganization events in ChainMetrics. - Added metrics for block reception, including counts for received, forwarded, deserialization errors, and duplicate cached blocks in NetworkMetrics. - Implemented a Merkle tree verification placeholder in a new module for transaction roots, with a focus on future enhancements. - Updated relevant actors to utilize the new metrics, improving observability and performance tracking during block processing.
1 parent b053131 commit 6859c59

File tree

7 files changed

+262
-10
lines changed

7 files changed

+262
-10
lines changed

app/src/actors_v2/chain/handlers.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,10 +508,16 @@ impl Handler<ChainMessage> for ChainActor {
508508
});
509509

510510
let position = queue.len();
511+
512+
// Phase 5: Update import queue depth metric
513+
// Note: We can't access self.metrics here in the async block
514+
// Metrics will be updated when queue is processed
515+
511516
info!(
512517
block_height = block_height,
513518
block_hash = %block_hash,
514519
queue_position = position,
520+
queue_depth = position,
515521
"Block queued for import"
516522
);
517523

@@ -638,6 +644,9 @@ impl Handler<ChainMessage> for ChainActor {
638644
"FORK DETECTED: Competing blocks at same height"
639645
);
640646

647+
// Phase 5: Record fork detection metric
648+
self_clone.metrics.forks_detected.inc();
649+
641650
// Apply fork choice rule (Phase 4)
642651
let fork_choice = crate::actors_v2::chain::fork_choice::compare_blocks(
643652
&existing_block,
@@ -709,6 +718,10 @@ impl Handler<ChainMessage> for ChainActor {
709718
"Chain reorganization completed successfully - new block is now canonical"
710719
);
711720

721+
// Phase 5: Record reorganization metrics
722+
self_clone.metrics.reorganizations.inc();
723+
self_clone.metrics.reorganization_depth.observe(reorg_result.blocks_rolled_back as f64);
724+
712725
// Reorganization already handled storage and chain head updates
713726
// Skip the normal import flow and return success
714727
return Ok(ChainResponse::BlockImported {

app/src/actors_v2/chain/metrics.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,16 @@ pub struct ChainMetrics {
5555

5656
/// Phase 4: Performance metrics for monitoring and optimization
5757
pub performance: PerformanceMetrics,
58+
59+
/// Phase 5: Fork detection and reorganization metrics
60+
/// Forks detected counter
61+
pub forks_detected: IntCounter,
62+
/// Reorganizations performed counter
63+
pub reorganizations: IntCounter,
64+
/// Reorganization depth histogram (how many blocks rolled back)
65+
pub reorganization_depth: Histogram,
66+
/// Blocks in import queue gauge
67+
pub import_queue_depth: IntGauge,
5868
}
5969

6070
impl ChainMetrics {
@@ -76,6 +86,11 @@ impl ChainMetrics {
7686
block_validation_duration: Histogram::with_opts(prometheus::histogram_opts!("chain_block_validation_duration_seconds", "Block validation duration")).unwrap(),
7787
last_activity: Instant::now(),
7888
performance: PerformanceMetrics::new(), // Phase 4: Performance tracking
89+
// Phase 5: Fork and reorganization metrics
90+
forks_detected: IntCounter::new("chain_forks_detected_total", "Total forks detected").unwrap(),
91+
reorganizations: IntCounter::new("chain_reorganizations_total", "Total reorganizations performed").unwrap(),
92+
reorganization_depth: Histogram::with_opts(prometheus::histogram_opts!("chain_reorganization_depth", "Depth of chain reorganizations (blocks rolled back)")).unwrap(),
93+
import_queue_depth: IntGauge::new("chain_import_queue_depth", "Number of blocks in import queue").unwrap(),
7994
}
8095
}
8196

app/src/actors_v2/common/merkle.rs

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
//! Merkle tree verification for transaction roots
2+
//!
3+
//! Placeholder implementation for Phase 5.
4+
//!
5+
//! Note: Full Keccak256-based Merkle tree implementation deferred pending:
6+
//! 1. Addition of tiny_keccak dependency to Cargo.toml
7+
//! 2. Verification that ExecutionPayloadCapella has transactions_root field
8+
//!
9+
//! As noted in the implementation plan, Merkle verification can be skipped
10+
//! if the field is not available.
11+
12+
use ethereum_types::H256;
13+
use std::collections::hash_map::DefaultHasher;
14+
use std::hash::{Hash, Hasher};
15+
16+
/// Calculate Merkle root of transactions
17+
///
18+
/// **Placeholder implementation** using simple hash for now.
19+
/// Will be replaced with proper Keccak256 Merkle tree when dependencies are added.
20+
///
21+
/// # Arguments
22+
/// * `transactions` - Slice of serialized transactions
23+
///
24+
/// # Returns
25+
/// The Merkle root hash. Returns H256::zero() for empty transaction list.
26+
///
27+
/// # Examples
28+
/// ```
29+
/// use ethereum_types::H256;
30+
/// use alys::actors_v2::common::merkle::calculate_transaction_root;
31+
///
32+
/// let txs = vec![vec![1, 2, 3], vec![4, 5, 6]];
33+
/// let root = calculate_transaction_root(&txs);
34+
/// assert_ne!(root, H256::zero());
35+
/// ```
36+
///
37+
pub fn calculate_transaction_root(transactions: &[Vec<u8>]) -> H256 {
38+
if transactions.is_empty() {
39+
return H256::zero();
40+
}
41+
42+
// Placeholder: Simple hash of concatenated transactions
43+
// TODO: Replace with proper Keccak256 Merkle tree implementation
44+
let mut hasher = DefaultHasher::new();
45+
46+
for tx in transactions {
47+
tx.hash(&mut hasher);
48+
}
49+
50+
let hash_value = hasher.finish();
51+
52+
// Convert u64 hash to H256 (expanding to 32 bytes)
53+
H256::from_low_u64_be(hash_value)
54+
}
55+
56+
#[cfg(test)]
57+
mod tests {
58+
use super::*;
59+
60+
#[test]
61+
fn test_empty_transactions() {
62+
let root = calculate_transaction_root(&[]);
63+
assert_eq!(root, H256::zero(), "Empty transaction list should return zero hash");
64+
}
65+
66+
#[test]
67+
fn test_single_transaction() {
68+
let txs = vec![vec![1, 2, 3, 4]];
69+
let root = calculate_transaction_root(&txs);
70+
assert_ne!(root, H256::zero(), "Single transaction should produce non-zero root");
71+
72+
// Verify determinism
73+
let root2 = calculate_transaction_root(&txs);
74+
assert_eq!(root, root2, "Same input should produce same root");
75+
}
76+
77+
#[test]
78+
fn test_multiple_transactions() {
79+
let txs = vec![
80+
vec![1, 2, 3],
81+
vec![4, 5, 6],
82+
vec![7, 8, 9],
83+
];
84+
let root = calculate_transaction_root(&txs);
85+
assert_ne!(root, H256::zero(), "Multiple transactions should produce non-zero root");
86+
87+
// Verify determinism
88+
let root2 = calculate_transaction_root(&txs);
89+
assert_eq!(root, root2, "Same input should produce same root");
90+
}
91+
92+
#[test]
93+
fn test_different_transactions_different_roots() {
94+
let txs1 = vec![vec![1, 2, 3], vec![4, 5, 6]];
95+
let txs2 = vec![vec![1, 2, 3], vec![4, 5, 7]]; // One byte different
96+
97+
let root1 = calculate_transaction_root(&txs1);
98+
let root2 = calculate_transaction_root(&txs2);
99+
100+
assert_ne!(root1, root2, "Different transactions should produce different roots");
101+
}
102+
103+
#[test]
104+
fn test_order_matters() {
105+
let txs1 = vec![vec![1, 2, 3], vec![4, 5, 6]];
106+
let txs2 = vec![vec![4, 5, 6], vec![1, 2, 3]]; // Swapped order
107+
108+
let root1 = calculate_transaction_root(&txs1);
109+
let root2 = calculate_transaction_root(&txs2);
110+
111+
assert_ne!(root1, root2, "Order should affect Merkle root");
112+
}
113+
114+
#[test]
115+
fn test_odd_number_transactions() {
116+
// Test with 3 transactions (odd number)
117+
let txs = vec![
118+
vec![1, 2, 3],
119+
vec![4, 5, 6],
120+
vec![7, 8, 9],
121+
];
122+
let root = calculate_transaction_root(&txs);
123+
assert_ne!(root, H256::zero(), "Odd number of transactions should work");
124+
}
125+
126+
#[test]
127+
fn test_power_of_two_transactions() {
128+
// Test with 4 transactions (power of 2)
129+
let txs = vec![
130+
vec![1],
131+
vec![2],
132+
vec![3],
133+
vec![4],
134+
];
135+
let root = calculate_transaction_root(&txs);
136+
assert_ne!(root, H256::zero(), "Power of 2 transactions should work");
137+
}
138+
139+
#[test]
140+
fn test_hash_consistency() {
141+
let data = vec![vec![1u8, 2, 3]];
142+
let root1 = calculate_transaction_root(&data);
143+
let root2 = calculate_transaction_root(&data);
144+
assert_eq!(root1, root2, "Hash should be deterministic");
145+
}
146+
}
147+

app/src/actors_v2/common/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
pub mod types;
44
pub mod serialization;
55
pub mod validation;
6+
pub mod merkle;
67

78
pub use types::*;
89
pub use serialization::*;

app/src/actors_v2/network/metrics.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,16 @@ pub struct NetworkMetrics {
7373
pub request_response_success_rate: f64,
7474
pub uptime_seconds: u64,
7575
pub last_peer_discovered: Option<SystemTime>,
76+
77+
// Phase 5: Block reception metrics
78+
/// Blocks received via gossipsub
79+
pub blocks_received: u64,
80+
/// Blocks forwarded to ChainActor
81+
pub blocks_forwarded: u64,
82+
/// Blocks dropped due to deserialization errors
83+
pub blocks_deserialization_errors: u64,
84+
/// Blocks dropped due to cache hits (duplicates)
85+
pub blocks_duplicate_cached: u64,
7686
}
7787

7888
impl NetworkMetrics {
@@ -123,6 +133,11 @@ impl NetworkMetrics {
123133
request_response_success_rate: 0.0,
124134
uptime_seconds: 0,
125135
last_peer_discovered: None,
136+
// Phase 5: Initialize block reception metrics
137+
blocks_received: 0,
138+
blocks_forwarded: 0,
139+
blocks_deserialization_errors: 0,
140+
blocks_duplicate_cached: 0,
126141
}
127142
}
128143

app/src/actors_v2/network/network_actor.rs

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,14 @@
99
use actix::prelude::*;
1010
use std::collections::{HashMap, VecDeque};
1111
use std::time::{Duration, Instant};
12+
use std::sync::Arc;
1213
use anyhow::{Result, anyhow, Context as AnyhowContext};
14+
use ethereum_types::H256;
1315
use libp2p::{Multiaddr, PeerId, swarm::{Swarm, SwarmEvent, NetworkBehaviour, ConnectionHandler}};
1416
use libp2p::request_response::{RequestId, ResponseChannel};
15-
use tokio::sync::mpsc;
17+
use lru::LruCache;
18+
use std::num::NonZeroUsize;
19+
use tokio::sync::{mpsc, RwLock};
1620
use futures::{select, StreamExt, FutureExt};
1721

1822
use super::{
@@ -200,6 +204,9 @@ pub struct NetworkActor {
200204
sync_actor: Option<Addr<crate::actors_v2::network::SyncActor>>,
201205
/// ChainActor address for AuxPoW forwarding (Phase 4: Integration Point 3b)
202206
chain_actor: Option<Addr<crate::actors_v2::chain::ChainActor>>,
207+
/// Phase 5: Cache of recently seen block hashes
208+
/// Prevents duplicate forwarding to ChainActor
209+
block_cache: Arc<RwLock<LruCache<H256, Instant>>>,
203210
/// Network running state
204211
is_running: bool,
205212
/// Shutdown flag
@@ -235,6 +242,11 @@ impl NetworkActor {
235242
config.max_bytes_per_peer_per_second,
236243
);
237244

245+
// Phase 5: Initialize block cache (LRU with capacity of 100 blocks)
246+
let block_cache = Arc::new(RwLock::new(
247+
LruCache::new(NonZeroUsize::new(100).unwrap())
248+
));
249+
238250
Ok(Self {
239251
config,
240252
event_rx: None,
@@ -247,6 +259,7 @@ impl NetworkActor {
247259
active_subscriptions: HashMap::new(),
248260
pending_block_requests: HashMap::new(),
249261
sync_actor: None,
262+
block_cache,
250263
chain_actor: None,
251264
is_running: false,
252265
shutdown_requested: false,
@@ -486,6 +499,9 @@ impl NetworkActor {
486499
// Phase 1: Forward block gossip messages to ChainActor for import
487500
if topic.contains("block") {
488501
if let Some(ref chain_actor) = self.chain_actor {
502+
// Phase 5: Update metrics for block received
503+
self.metrics.blocks_received += 1;
504+
489505
// Deserialize block from MessagePack format
490506
match crate::actors_v2::common::serialization::deserialize_block_from_network(&data) {
491507
Ok(block) => {
@@ -498,7 +514,33 @@ impl NetworkActor {
498514
block_height = block_height,
499515
block_hash = %block_hash,
500516
topic = %topic,
501-
"Received block via gossipsub, forwarding to ChainActor"
517+
"Received block via gossipsub"
518+
);
519+
520+
// Phase 5: Check block cache before forwarding to ChainActor
521+
{
522+
// Use try_read() for non-async context
523+
if let Ok(cache) = self.block_cache.try_read() {
524+
if cache.peek(&block_hash).is_some() {
525+
tracing::debug!(
526+
peer_id = %source_peer,
527+
block_hash = %block_hash,
528+
block_height = block_height,
529+
"Duplicate block detected via cache, skipping ChainActor forward"
530+
);
531+
532+
// Update metrics
533+
self.metrics.blocks_duplicate_cached += 1;
534+
535+
return Ok(());
536+
}
537+
}
538+
}
539+
540+
tracing::debug!(
541+
peer_id = %source_peer,
542+
block_hash = %block_hash,
543+
"Block not in cache, proceeding with validation and forwarding"
502544
);
503545

504546
// Perform basic structural validation before forwarding
@@ -524,6 +566,11 @@ impl NetworkActor {
524566
// Forward to ChainActor (async, non-blocking)
525567
let chain_actor_clone = chain_actor.clone();
526568
let peer_id_clone = source_peer.clone();
569+
let block_cache_clone = self.block_cache.clone();
570+
let block_hash_clone = block_hash;
571+
572+
// Update metrics
573+
self.metrics.blocks_forwarded += 1;
527574

528575
tokio::spawn(async move {
529576
let msg = crate::actors_v2::chain::messages::ChainMessage::NetworkBlockReceived {
@@ -541,6 +588,16 @@ impl NetworkActor {
541588
block_height = block_height,
542589
"Block successfully imported by ChainActor"
543590
);
591+
592+
// Phase 5: Add block to cache after successful import
593+
{
594+
let mut cache = block_cache_clone.write().await;
595+
cache.put(block_hash_clone, Instant::now());
596+
tracing::debug!(
597+
block_hash = %block_hash_clone,
598+
"Added block to cache after successful import"
599+
);
600+
}
544601
} else {
545602
tracing::warn!(
546603
peer_id = %peer_id_clone,
@@ -580,6 +637,9 @@ impl NetworkActor {
580637

581638
}
582639
Err(deserialization_error) => {
640+
// Phase 5: Update metrics for deserialization error
641+
self.metrics.blocks_deserialization_errors += 1;
642+
583643
tracing::warn!(
584644
peer_id = %source_peer,
585645
topic = %topic,

0 commit comments

Comments
 (0)