Skip to content

Commit 96b33c2

Browse files
authored
Merge branch 'main' into feat/add_tokio_console
2 parents d7a194b + 325202e commit 96b33c2

File tree

10 files changed

+90
-35
lines changed

10 files changed

+90
-35
lines changed

crates/chain-orchestrator/src/event.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ use alloy_consensus::Header;
22
use alloy_primitives::{Signature, B256};
33
use reth_network_peers::PeerId;
44
use reth_scroll_primitives::ScrollBlock;
5-
use rollup_node_primitives::{
6-
BatchInfo, BlockInfo, ChainImport, L2BlockInfoWithL1Messages, WithFinalizedBlockNumber,
7-
};
5+
use rollup_node_primitives::{BatchInfo, BlockInfo, ChainImport, L2BlockInfoWithL1Messages};
86

97
/// An event emitted by the `ChainOrchestrator`.
108
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -40,9 +38,8 @@ pub enum ChainOrchestratorEvent {
4038
/// The safe L2 block info.
4139
safe_head: Option<BlockInfo>,
4240
},
43-
/// A batch has been finalized returning an optional finalized L2 block. Also returns a
44-
/// [`BatchInfo`] if the finalized event occurred in a finalized L1 block.
45-
BatchFinalized(Option<WithFinalizedBlockNumber<BatchInfo>>, Option<BlockInfo>),
41+
/// A batch has been finalized returning a list of finalized batches.
42+
BatchFinalized(u64, Vec<BatchInfo>),
4643
/// An L1 block has been finalized returning the L1 block number and the list of finalized
4744
/// batches.
4845
L1BlockFinalized(u64, Vec<BatchInfo>),

crates/chain-orchestrator/src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,7 @@ impl<
556556
self.database.clone(),
557557
index,
558558
block_number,
559+
self.l1_finalized_block_number.clone(),
559560
)),
560561
))
561562
}
@@ -695,10 +696,20 @@ impl<
695696
database: Arc<Database>,
696697
batch_index: u64,
697698
block_number: u64,
699+
finalized_block_number: Arc<AtomicU64>,
698700
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
699701
// finalize all batches up to `batch_index`.
700702
database.finalize_batches_up_to_index(batch_index, block_number).await?;
701703

704+
// Get all unprocessed batches that have been finalized by this L1 block finalization.
705+
let finalized_block_number = finalized_block_number.load(Ordering::Relaxed);
706+
if finalized_block_number >= block_number {
707+
let finalized_batches = database
708+
.fetch_and_update_unprocessed_finalized_batches(finalized_block_number)
709+
.await?;
710+
return Ok(Some(ChainOrchestratorEvent::BatchFinalized(block_number, finalized_batches)))
711+
}
712+
702713
Ok(None)
703714
}
704715
}

crates/manager/src/manager/mod.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ where
175175
sequencer: Option<Sequencer<L1MP>>,
176176
signer: Option<SignerHandle>,
177177
block_time: Option<u64>,
178+
auto_start: bool,
178179
chain_orchestrator: ChainOrchestrator<CS, <N as BlockDownloaderProvider>::Client, P>,
179180
l1_v2_message_queue_start_index: u64,
180181
) -> (Self, RollupManagerHandle<N>) {
@@ -193,7 +194,11 @@ where
193194
event_sender: None,
194195
sequencer,
195196
signer,
196-
block_building_trigger: block_time.map(delayed_interval),
197+
block_building_trigger: if auto_start {
198+
block_time.map(delayed_interval)
199+
} else {
200+
None
201+
},
197202
block_time_config: block_time,
198203
};
199204
(rnm, RollupManagerHandle::new(handle_tx))
@@ -202,7 +207,7 @@ where
202207
/// Returns a new event listener for the rollup node manager.
203208
pub fn event_listener(&mut self) -> EventStream<RollupManagerEvent> {
204209
if let Some(event_sender) = &self.event_sender {
205-
return event_sender.new_listener()
210+
return event_sender.new_listener();
206211
};
207212

208213
let event_sender = EventSender::new(EVENT_CHANNEL_SIZE);
@@ -269,17 +274,16 @@ where
269274
// // push the batch info into the derivation pipeline.
270275
// self.derivation_pipeline.push_batch(batch_info, l1_block_number);
271276
}
272-
ChainOrchestratorEvent::BatchFinalized(batch_info, ..) => {
277+
ChainOrchestratorEvent::BatchFinalized(block_number, finalized_batches) => {
273278
// Uncomment once we implement issue #273.
274279
// // update the fcs on new finalized block.
275280
// if let Some(finalized_block) = finalized_block {
276281
// self.engine.set_finalized_block_info(finalized_block);
277282
// }
278283
// Remove once we implement issue #273.
279284
// Update the derivation pipeline on new finalized batch.
280-
#[allow(clippy::collapsible_match)]
281-
if let Some(batch_info) = batch_info {
282-
self.derivation_pipeline.push_batch(batch_info.inner, batch_info.number);
285+
for batch_info in finalized_batches {
286+
self.derivation_pipeline.push_batch(batch_info, block_number);
283287
}
284288
}
285289
ChainOrchestratorEvent::L1BlockFinalized(l1_block_number, finalized_batches, ..) => {

crates/network/src/manager.rs

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use super::{
44
BlockImportOutcome, BlockValidation, NetworkHandleMessage, NetworkManagerEvent,
55
NewBlockWithPeer, ScrollNetworkHandle,
66
};
7-
use alloy_primitives::{FixedBytes, Signature, U128};
7+
use alloy_primitives::{FixedBytes, Signature, B256, U128};
88
use futures::{FutureExt, Stream, StreamExt};
99
use reth_chainspec::EthChainSpec;
1010
use reth_eth_wire_types::NewBlock as EthWireNewBlock;
@@ -54,7 +54,9 @@ pub struct ScrollNetworkManager<N, CS> {
5454
/// The receiver for new blocks received from the network (used to bridge from eth-wire).
5555
eth_wire_listener: Option<EventStream<RethNewBlockWithPeer<ScrollBlock>>>,
5656
/// The scroll wire protocol manager.
57-
scroll_wire: ScrollWireManager,
57+
pub scroll_wire: ScrollWireManager,
58+
/// The LRU cache used to track already seen (block,signature) pair.
59+
pub blocks_seen: LruCache<(B256, Signature)>,
5860
/// The constant value that must be added to the block number to get the total difficulty.
5961
td_constant: U128,
6062
}
@@ -90,6 +92,8 @@ impl<CS: ScrollHardforks + EthChainSpec + Send + Sync + 'static>
9092
// Create the scroll-wire protocol manager.
9193
let scroll_wire = ScrollWireManager::new(events);
9294

95+
let blocks_seen = LruCache::new(LRU_CACHE_SIZE);
96+
9397
// Spawn the inner network manager.
9498
tokio::spawn(inner_network_manager);
9599

@@ -98,6 +102,7 @@ impl<CS: ScrollHardforks + EthChainSpec + Send + Sync + 'static>
98102
handle,
99103
from_handle_rx: from_handle_rx.into(),
100104
scroll_wire,
105+
blocks_seen,
101106
eth_wire_listener,
102107
td_constant,
103108
}
@@ -128,11 +133,14 @@ impl<
128133

129134
let handle = ScrollNetworkHandle::new(to_manager_tx, inner_network_handle);
130135

136+
let blocks_seen = LruCache::new(LRU_CACHE_SIZE);
137+
131138
Self {
132139
chain_spec,
133140
handle,
134141
from_handle_rx: from_handle_rx.into(),
135142
scroll_wire,
143+
blocks_seen,
136144
eth_wire_listener,
137145
td_constant,
138146
}
@@ -177,11 +185,29 @@ impl<
177185
}
178186

179187
/// Handler for received events from the [`ScrollWireManager`].
180-
fn on_scroll_wire_event(&mut self, event: ScrollWireEvent) -> NetworkManagerEvent {
188+
fn on_scroll_wire_event(&mut self, event: ScrollWireEvent) -> Option<NetworkManagerEvent> {
181189
match event {
182190
ScrollWireEvent::NewBlock { peer_id, block, signature } => {
183-
trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block.hash_slow(), signature = ?signature, "Received new block");
184-
NetworkManagerEvent::NewBlock(NewBlockWithPeer { peer_id, block, signature })
191+
let block_hash = block.hash_slow();
192+
trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, signature = ?signature, "Received new block");
193+
if self.blocks_seen.contains(&(block_hash, signature)) {
194+
None
195+
} else {
196+
// Update the state of the peer cache i.e. peer has seen this block.
197+
self.scroll_wire
198+
.state_mut()
199+
.entry(peer_id)
200+
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
201+
.insert(block_hash);
202+
// Update the state of the block cache i.e. we have seen this block.
203+
self.blocks_seen.insert((block.hash_slow(), signature));
204+
205+
Some(NetworkManagerEvent::NewBlock(NewBlockWithPeer {
206+
peer_id,
207+
block,
208+
signature,
209+
}))
210+
}
185211
}
186212
// Only `NewBlock` events are expected from the scroll-wire protocol.
187213
_ => {
@@ -214,12 +240,6 @@ impl<
214240
Ok(BlockValidation::ValidBlock { new_block: msg }) |
215241
Ok(BlockValidation::ValidHeader { new_block: msg }) => {
216242
trace!(target: "scroll::network::manager", peer_id = ?peer, block = ?msg.block, "Block import successful - announcing block to network");
217-
let hash = msg.block.hash_slow();
218-
self.scroll_wire
219-
.state_mut()
220-
.entry(peer)
221-
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
222-
.insert(hash);
223243
self.announce_block(msg);
224244
}
225245
Err(BlockImportError::Consensus(err)) => {
@@ -241,14 +261,6 @@ impl<
241261
block: reth_network_api::block::NewBlockWithPeer<ScrollBlock>,
242262
) -> Option<NetworkManagerEvent> {
243263
let reth_network_api::block::NewBlockWithPeer { peer_id, mut block } = block;
244-
let block_hash = block.hash_slow();
245-
self.scroll_wire
246-
.state_mut()
247-
.entry(peer_id)
248-
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
249-
.insert(block_hash);
250-
251-
trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, "Received new block from eth-wire protocol");
252264

253265
// We purge the extra data field post euclid v2 to align with protocol specification.
254266
let extra_data = if self.chain_spec.is_euclid_v2_active_at_timestamp(block.timestamp) {
@@ -268,6 +280,21 @@ impl<
268280
.checked_sub(ECDSA_SIGNATURE_LEN)
269281
.and_then(|i| Signature::from_raw(&extra_data[i..]).ok())
270282
{
283+
let block_hash = block.hash_slow();
284+
if self.blocks_seen.contains(&(block_hash, signature)) {
285+
return None;
286+
}
287+
trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, "Received new block from eth-wire protocol");
288+
289+
// Update the state of the peer cache i.e. peer has seen this block.
290+
self.scroll_wire
291+
.state_mut()
292+
.entry(peer_id)
293+
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
294+
.insert(block_hash);
295+
296+
// Update the state of the block cache i.e. we have seen this block.
297+
self.blocks_seen.insert((block_hash, signature));
271298
Some(NetworkManagerEvent::NewBlock(NewBlockWithPeer { peer_id, block, signature }))
272299
} else {
273300
tracing::warn!(target: "scroll::bridge::import", peer_id = %peer_id, "Failed to extract signature from block extra data, penalizing peer");
@@ -306,7 +333,9 @@ impl<
306333

307334
// Next we handle the scroll-wire events.
308335
if let Poll::Ready(event) = this.scroll_wire.poll_unpin(cx) {
309-
return Poll::Ready(Some(this.on_scroll_wire_event(event)));
336+
if let Some(event) = this.on_scroll_wire_event(event) {
337+
return Poll::Ready(Some(event));
338+
}
310339
}
311340

312341
// Handle blocks received from the eth-wire protocol.

crates/node/src/args.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ impl ScrollRollupNodeConfig {
307307

308308
// Construct the Sequencer.
309309
let chain_config = chain_spec.chain_config();
310-
let (sequencer, block_time) = if self.sequencer_args.sequencer_enabled {
310+
let (sequencer, block_time, auto_start) = if self.sequencer_args.sequencer_enabled {
311311
let args = &self.sequencer_args;
312312
let sequencer = Sequencer::new(
313313
Arc::new(l1_messages_provider),
@@ -317,9 +317,9 @@ impl ScrollRollupNodeConfig {
317317
0,
318318
self.sequencer_args.l1_message_inclusion_mode,
319319
);
320-
(Some(sequencer), (args.block_time != 0).then_some(args.block_time))
320+
(Some(sequencer), (args.block_time != 0).then_some(args.block_time), args.auto_start)
321321
} else {
322-
(None, None)
322+
(None, None, false)
323323
};
324324

325325
// Instantiate the signer
@@ -366,6 +366,7 @@ impl ScrollRollupNodeConfig {
366366
sequencer,
367367
signer,
368368
block_time,
369+
auto_start,
369370
chain_orchestrator,
370371
l1_v2_message_queue_start_index,
371372
)
@@ -549,6 +550,9 @@ pub struct SequencerArgs {
549550
/// Enable the scroll block sequencer.
550551
#[arg(long = "sequencer.enabled", default_value_t = false)]
551552
pub sequencer_enabled: bool,
553+
/// Whether the sequencer should start sequencing automatically on startup.
554+
#[arg(long = "sequencer.auto-start", default_value_t = false)]
555+
pub auto_start: bool,
552556
/// The block time for the sequencer.
553557
#[arg(long = "sequencer.block-time", id = "sequencer_block_time", value_name = "SEQUENCER_BLOCK_TIME", default_value_t = constants::DEFAULT_BLOCK_TIME)]
554558
pub block_time: u64,

crates/node/src/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ pub fn default_sequencer_test_scroll_rollup_node_config() -> ScrollRollupNodeCon
185185
},
186186
sequencer_args: SequencerArgs {
187187
sequencer_enabled: true,
188+
auto_start: true,
188189
block_time: 0,
189190
payload_building_duration: 40,
190191
fee_recipient: Default::default(),

crates/node/tests/e2e.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ async fn can_bridge_l1_messages() -> eyre::Result<()> {
6262
chain_orchestrator_args: ChainOrchestratorArgs::default(),
6363
sequencer_args: SequencerArgs {
6464
sequencer_enabled: true,
65+
auto_start: true,
6566
block_time: 0,
6667
l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0),
6768
allow_empty_blocks: true,
@@ -157,6 +158,7 @@ async fn can_sequence_and_gossip_blocks() {
157158
chain_orchestrator_args: ChainOrchestratorArgs::default(),
158159
sequencer_args: SequencerArgs {
159160
sequencer_enabled: true,
161+
auto_start: true,
160162
block_time: 0,
161163
l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0),
162164
payload_building_duration: 1000,
@@ -255,6 +257,7 @@ async fn can_penalize_peer_for_invalid_block() {
255257
engine_driver_args: EngineDriverArgs::default(),
256258
sequencer_args: SequencerArgs {
257259
sequencer_enabled: true,
260+
auto_start: true,
258261
block_time: 0,
259262
l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0),
260263
payload_building_duration: 1000,

crates/node/tests/sync.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ async fn test_should_consolidate_after_optimistic_sync() -> eyre::Result<()> {
197197
chain_orchestrator_args: ChainOrchestratorArgs::default(),
198198
sequencer_args: SequencerArgs {
199199
sequencer_enabled: true,
200+
auto_start: true,
200201
block_time: 0,
201202
l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0),
202203
allow_empty_blocks: true,
@@ -445,6 +446,7 @@ async fn test_consolidation() -> eyre::Result<()> {
445446
chain_orchestrator_args: ChainOrchestratorArgs::default(),
446447
sequencer_args: SequencerArgs {
447448
sequencer_enabled: true,
449+
auto_start: true,
448450
block_time: 0,
449451
l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0),
450452
allow_empty_blocks: true,
@@ -618,6 +620,7 @@ async fn test_chain_orchestrator_shallow_reorg_with_gap() -> eyre::Result<()> {
618620
chain_orchestrator_args: ChainOrchestratorArgs::default(),
619621
sequencer_args: SequencerArgs {
620622
sequencer_enabled: true,
623+
auto_start: true,
621624
block_time: 0,
622625
l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0),
623626
allow_empty_blocks: true,

crates/sequencer/tests/e2e.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,7 @@ async fn can_sequence_blocks_with_private_key_file() -> eyre::Result<()> {
502502
chain_orchestrator_args: ChainOrchestratorArgs::default(),
503503
sequencer_args: SequencerArgs {
504504
sequencer_enabled: true,
505+
auto_start: true,
505506
block_time: 0,
506507
l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0),
507508
payload_building_duration: 1000,
@@ -594,6 +595,7 @@ async fn can_sequence_blocks_with_hex_key_file_without_prefix() -> eyre::Result<
594595
chain_orchestrator_args: ChainOrchestratorArgs::default(),
595596
sequencer_args: SequencerArgs {
596597
sequencer_enabled: true,
598+
auto_start: true,
597599
block_time: 0,
598600
l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0),
599601
payload_building_duration: 1000,

tests/launch_rollup_node_sequencer.bash

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ exec rollup-node node --chain dev --datadir=/l2reth --metrics=0.0.0.0:6060 --net
77
--log.stdout.format log-fmt -vvv \
88
--test \
99
--sequencer.enabled \
10+
--sequencer.auto-start \
1011
--sequencer.block-time 250 \
1112
--sequencer.payload-building-duration 230 \
1213
--sequencer.allow-empty-blocks \

0 commit comments

Comments
 (0)