Skip to content

Commit a8b95a3

Browse files
varun-doshijonastheisfrisitano
authored
feat: cache incoming blocks to prevent duplication (#259)
* feat: cache incoming blocks to prevent duplication * fix: apply suggestions --------- Co-authored-by: Jonas Theis <[email protected]> Co-authored-by: frisitano <[email protected]>
1 parent e2f7f95 commit a8b95a3

File tree

2 files changed

+50
-21
lines changed

2 files changed

+50
-21
lines changed

crates/manager/src/manager/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ where
207207
/// Returns a new event listener for the rollup node manager.
208208
pub fn event_listener(&mut self) -> EventStream<RollupManagerEvent> {
209209
if let Some(event_sender) = &self.event_sender {
210-
return event_sender.new_listener()
210+
return event_sender.new_listener();
211211
};
212212

213213
let event_sender = EventSender::new(EVENT_CHANNEL_SIZE);

crates/network/src/manager.rs

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use super::{
44
BlockImportOutcome, BlockValidation, NetworkHandleMessage, NetworkManagerEvent,
55
NewBlockWithPeer, ScrollNetworkHandle,
66
};
7-
use alloy_primitives::{FixedBytes, Signature, U128};
7+
use alloy_primitives::{FixedBytes, Signature, B256, U128};
88
use futures::{FutureExt, Stream, StreamExt};
99
use reth_chainspec::EthChainSpec;
1010
use reth_eth_wire_types::NewBlock as EthWireNewBlock;
@@ -54,7 +54,9 @@ pub struct ScrollNetworkManager<N, CS> {
5454
/// The receiver for new blocks received from the network (used to bridge from eth-wire).
5555
eth_wire_listener: Option<EventStream<RethNewBlockWithPeer<ScrollBlock>>>,
5656
/// The scroll wire protocol manager.
57-
scroll_wire: ScrollWireManager,
57+
pub scroll_wire: ScrollWireManager,
58+
/// The LRU cache used to track already seen (block,signature) pair.
59+
pub blocks_seen: LruCache<(B256, Signature)>,
5860
/// The constant value that must be added to the block number to get the total difficulty.
5961
td_constant: U128,
6062
}
@@ -90,6 +92,8 @@ impl<CS: ScrollHardforks + EthChainSpec + Send + Sync + 'static>
9092
// Create the scroll-wire protocol manager.
9193
let scroll_wire = ScrollWireManager::new(events);
9294

95+
let blocks_seen = LruCache::new(LRU_CACHE_SIZE);
96+
9397
// Spawn the inner network manager.
9498
tokio::spawn(inner_network_manager);
9599

@@ -98,6 +102,7 @@ impl<CS: ScrollHardforks + EthChainSpec + Send + Sync + 'static>
98102
handle,
99103
from_handle_rx: from_handle_rx.into(),
100104
scroll_wire,
105+
blocks_seen,
101106
eth_wire_listener,
102107
td_constant,
103108
}
@@ -128,11 +133,14 @@ impl<
128133

129134
let handle = ScrollNetworkHandle::new(to_manager_tx, inner_network_handle);
130135

136+
let blocks_seen = LruCache::new(LRU_CACHE_SIZE);
137+
131138
Self {
132139
chain_spec,
133140
handle,
134141
from_handle_rx: from_handle_rx.into(),
135142
scroll_wire,
143+
blocks_seen,
136144
eth_wire_listener,
137145
td_constant,
138146
}
@@ -177,11 +185,29 @@ impl<
177185
}
178186

179187
/// Handler for received events from the [`ScrollWireManager`].
180-
fn on_scroll_wire_event(&mut self, event: ScrollWireEvent) -> NetworkManagerEvent {
188+
fn on_scroll_wire_event(&mut self, event: ScrollWireEvent) -> Option<NetworkManagerEvent> {
181189
match event {
182190
ScrollWireEvent::NewBlock { peer_id, block, signature } => {
183-
trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block.hash_slow(), signature = ?signature, "Received new block");
184-
NetworkManagerEvent::NewBlock(NewBlockWithPeer { peer_id, block, signature })
191+
let block_hash = block.hash_slow();
192+
trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, signature = ?signature, "Received new block");
193+
if self.blocks_seen.contains(&(block_hash, signature)) {
194+
None
195+
} else {
196+
// Update the state of the peer cache i.e. peer has seen this block.
197+
self.scroll_wire
198+
.state_mut()
199+
.entry(peer_id)
200+
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
201+
.insert(block_hash);
202+
// Update the state of the block cache i.e. we have seen this block.
203+
self.blocks_seen.insert((block.hash_slow(), signature));
204+
205+
Some(NetworkManagerEvent::NewBlock(NewBlockWithPeer {
206+
peer_id,
207+
block,
208+
signature,
209+
}))
210+
}
185211
}
186212
// Only `NewBlock` events are expected from the scroll-wire protocol.
187213
_ => {
@@ -214,12 +240,6 @@ impl<
214240
Ok(BlockValidation::ValidBlock { new_block: msg }) |
215241
Ok(BlockValidation::ValidHeader { new_block: msg }) => {
216242
trace!(target: "scroll::network::manager", peer_id = ?peer, block = ?msg.block, "Block import successful - announcing block to network");
217-
let hash = msg.block.hash_slow();
218-
self.scroll_wire
219-
.state_mut()
220-
.entry(peer)
221-
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
222-
.insert(hash);
223243
self.announce_block(msg);
224244
}
225245
Err(BlockImportError::Consensus(err)) => {
@@ -241,14 +261,6 @@ impl<
241261
block: reth_network_api::block::NewBlockWithPeer<ScrollBlock>,
242262
) -> Option<NetworkManagerEvent> {
243263
let reth_network_api::block::NewBlockWithPeer { peer_id, mut block } = block;
244-
let block_hash = block.hash_slow();
245-
self.scroll_wire
246-
.state_mut()
247-
.entry(peer_id)
248-
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
249-
.insert(block_hash);
250-
251-
trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, "Received new block from eth-wire protocol");
252264

253265
// We purge the extra data field post euclid v2 to align with protocol specification.
254266
let extra_data = if self.chain_spec.is_euclid_v2_active_at_timestamp(block.timestamp) {
@@ -268,6 +280,21 @@ impl<
268280
.checked_sub(ECDSA_SIGNATURE_LEN)
269281
.and_then(|i| Signature::from_raw(&extra_data[i..]).ok())
270282
{
283+
let block_hash = block.hash_slow();
284+
if self.blocks_seen.contains(&(block_hash, signature)) {
285+
return None;
286+
}
287+
trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, "Received new block from eth-wire protocol");
288+
289+
// Update the state of the peer cache i.e. peer has seen this block.
290+
self.scroll_wire
291+
.state_mut()
292+
.entry(peer_id)
293+
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
294+
.insert(block_hash);
295+
296+
// Update the state of the block cache i.e. we have seen this block.
297+
self.blocks_seen.insert((block_hash, signature));
271298
Some(NetworkManagerEvent::NewBlock(NewBlockWithPeer { peer_id, block, signature }))
272299
} else {
273300
tracing::warn!(target: "scroll::bridge::import", peer_id = %peer_id, "Failed to extract signature from block extra data, penalizing peer");
@@ -306,7 +333,9 @@ impl<
306333

307334
// Next we handle the scroll-wire events.
308335
if let Poll::Ready(event) = this.scroll_wire.poll_unpin(cx) {
309-
return Poll::Ready(Some(this.on_scroll_wire_event(event)));
336+
if let Some(event) = this.on_scroll_wire_event(event) {
337+
return Poll::Ready(Some(event));
338+
}
310339
}
311340

312341
// Handle blocks received from the eth-wire protocol.

0 commit comments

Comments
 (0)