Skip to content

Commit 7bb1b27

Browse files
committed
Merge remote-tracking branch 'origin/main' into feat/improve-docker-e2e-test
2 parents f1db7f8 + b233c8c commit 7bb1b27

File tree

20 files changed

+469
-474
lines changed

20 files changed

+469
-474
lines changed

Cargo.lock

Lines changed: 129 additions & 129 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/codec/src/decoding/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub mod v4;
3333
pub mod v7;
3434

3535
/// Decoded payload.
36-
pub(crate) mod payload;
36+
pub mod payload;
3737

3838
/// Tests utils.
3939
#[cfg(any(test, feature = "test-utils"))]

crates/database/db/src/operations.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,19 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync {
484484
.map(|x| x.map(Into::into))?)
485485
}
486486

487+
/// Gets the latest L1 messages which has an associated L2 block number if any.
488+
async fn get_latest_executed_l1_message(
489+
&self,
490+
) -> Result<Option<L1MessageEnvelope>, DatabaseError> {
491+
Ok(models::l1_message::Entity::find()
492+
.filter(models::l1_message::Column::L2BlockNumber.is_not_null())
493+
.order_by_desc(models::l1_message::Column::L2BlockNumber)
494+
.order_by_desc(models::l1_message::Column::QueueIndex)
495+
.one(self.get_connection())
496+
.await?
497+
.map(Into::into))
498+
}
499+
487500
/// Get an iterator over all [`L1MessageEnvelope`]s in the database starting from the provided
488501
/// `start` point.
489502
async fn get_l1_messages<'a>(
@@ -667,6 +680,18 @@ impl fmt::Display for L1MessageStart {
667680
}
668681
}
669682

683+
impl From<u64> for L1MessageStart {
684+
fn from(value: u64) -> Self {
685+
Self::Index(value)
686+
}
687+
}
688+
689+
impl From<B256> for L1MessageStart {
690+
fn from(value: B256) -> Self {
691+
Self::Hash(value)
692+
}
693+
}
694+
670695
/// The result of [`DatabaseWriteOperations::unwind`].
671696
#[derive(Debug)]
672697
pub struct UnwindResult {

crates/derivation-pipeline/benches/pipeline.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use alloy_primitives::{address, b256, bytes, U256};
88
use criterion::{criterion_group, criterion_main, Criterion};
99
use futures::StreamExt;
1010
use rollup_node_primitives::{BatchCommitData, BatchInfo, L1MessageEnvelope};
11-
use rollup_node_providers::{test_utils::MockL1Provider, DatabaseL1MessageProvider};
11+
use rollup_node_providers::test_utils::MockL1Provider;
1212
use scroll_alloy_consensus::TxL1Message;
1313
use scroll_codec::decoding::test_utils::read_to_bytes;
1414
use scroll_db::{
@@ -17,8 +17,7 @@ use scroll_db::{
1717
use scroll_derivation_pipeline::DerivationPipeline;
1818
use tokio::runtime::{Handle, Runtime};
1919

20-
async fn setup_pipeline(
21-
) -> DerivationPipeline<MockL1Provider<DatabaseL1MessageProvider<Arc<Database>>>> {
20+
async fn setup_pipeline() -> DerivationPipeline<MockL1Provider<Arc<Database>>> {
2221
// load batch data in the db.
2322
let db = Arc::new(setup_test_db().await);
2423
let raw_calldata = read_to_bytes("./testdata/calldata_v0.bin").unwrap();
@@ -69,7 +68,7 @@ async fn setup_pipeline(
6968
tx.commit().await.unwrap();
7069

7170
// construct the pipeline.
72-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
71+
let l1_messages_provider = db.clone();
7372
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
7473
DerivationPipeline::new(mock_l1_provider, db, u64::MAX)
7574
}

crates/derivation-pipeline/src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,14 @@ pub enum DerivationPipelineError {
88
/// Missing L1 messages cursor.
99
#[error("missing l1 message queue cursor")]
1010
MissingL1MessageQueueCursor,
11+
/// Invalid L1 messages count.
12+
#[error("invalid l1 messages count: expected {expected}, got {got}")]
13+
InvalidL1MessagesCount {
14+
/// The expected count.
15+
expected: u64,
16+
/// The actual count.
17+
got: u64,
18+
},
1119
/// Missing L1 message.
1220
#[error("missing l1 message for L2 block {0:?}")]
1321
MissingL1Message(L2Block),

crates/derivation-pipeline/src/lib.rs

Lines changed: 75 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ use core::{
2424
};
2525
use futures::{FutureExt, Stream};
2626
use rollup_node_primitives::{
27-
BatchCommitData, BatchInfo, ScrollPayloadAttributesWithBatchInfo, WithBlockNumber,
28-
WithFinalizedBatchInfo, WithFinalizedBlockNumber,
27+
BatchCommitData, BatchInfo, L1MessageEnvelope, ScrollPayloadAttributesWithBatchInfo,
28+
WithBlockNumber, WithFinalizedBatchInfo, WithFinalizedBlockNumber,
2929
};
3030
use rollup_node_providers::{BlockDataProvider, L1Provider};
3131
use scroll_alloy_rpc_types_engine::{BlockDataHint, ScrollPayloadAttributes};
32-
use scroll_codec::Codec;
32+
use scroll_codec::{decoding::payload::PayloadData, Codec};
3333
use scroll_db::{Database, DatabaseReadOperations, DatabaseTransactionProvider};
3434
use tokio::time::Interval;
3535

@@ -272,25 +272,10 @@ pub async fn derive<L1P: L1Provider + Sync + Send, L2P: BlockDataProvider + Sync
272272
let decoded = Codec::decode(&data)?;
273273

274274
// set the cursor for the l1 provider.
275-
let data = &decoded.data;
276-
if let Some(index) = data.queue_index_start() {
277-
l1_provider.set_queue_index_cursor(index);
278-
} else if let Some(hash) = data.prev_l1_message_queue_hash() {
279-
// If the message queue hash is zero then we should use the V2 L1 message queue start index.
280-
// We must apply this branch logic because we do not have a L1 message associated with a
281-
// queue hash of ZERO (we only compute a queue hash for the first L1 message of the V2
282-
// contract).
283-
if hash == &B256::ZERO {
284-
l1_provider.set_queue_index_cursor(l1_v2_message_queue_start_index);
285-
} else {
286-
l1_provider.set_hash_cursor(*hash).await;
287-
// we skip the first l1 message, as we are interested in the one starting after
288-
// prev_l1_message_queue_hash.
289-
let _ = l1_provider.next_l1_message().await.map_err(Into::into)?;
290-
}
291-
} else {
292-
return Err(DerivationPipelineError::MissingL1MessageQueueCursor)
293-
}
275+
let payload_data = &decoded.data;
276+
let mut l1_messages_iter =
277+
iter_l1_messages_from_payload(&l1_provider, payload_data, l1_v2_message_queue_start_index)
278+
.await?;
294279

295280
let skipped_l1_messages = decoded.data.skipped_l1_message_bitmap.clone().unwrap_or_default();
296281
let mut skipped_l1_messages = skipped_l1_messages.into_iter();
@@ -302,18 +287,15 @@ pub async fn derive<L1P: L1Provider + Sync + Send, L2P: BlockDataProvider + Sync
302287
for _ in 0..block.context.num_l1_messages {
303288
// check if the next l1 message should be skipped.
304289
if matches!(skipped_l1_messages.next(), Some(bit) if bit) {
305-
l1_provider.increment_cursor();
290+
let _ = l1_messages_iter.next();
306291
continue;
307292
}
308293

309-
// TODO: fetch L1 messages range.
310-
let l1_message = l1_provider
311-
.next_l1_message()
312-
.await
313-
.map_err(Into::into)?
294+
let l1_message = l1_messages_iter
295+
.next()
314296
.ok_or(DerivationPipelineError::MissingL1Message(block.clone()))?;
315-
let mut bytes = Vec::with_capacity(l1_message.eip2718_encoded_length());
316-
l1_message.eip2718_encode(&mut bytes);
297+
let mut bytes = Vec::with_capacity(l1_message.transaction.eip2718_encoded_length());
298+
l1_message.transaction.eip2718_encode(&mut bytes);
317299
txs.push(bytes.into());
318300
}
319301

@@ -353,6 +335,59 @@ fn delayed_interval(interval: u64) -> Interval {
353335
interval
354336
}
355337

338+
/// Returns an iterator over L1 messages from the `PayloadData`. If the `PayloadData` returns a
339+
/// `prev_l1_message_queue_hash` of zero, uses the `l1_v2_message_queue_start_index` to fetch
340+
/// messages from the L1 provider.
341+
///
342+
/// # Errors
343+
///
344+
/// Propagates any error from the L1 provider.
345+
/// Returns an error if the retrieved number of L1 messages does not match the expected number from
346+
/// the payload data.
347+
async fn iter_l1_messages_from_payload<L1P: L1Provider>(
348+
provider: &L1P,
349+
data: &PayloadData,
350+
l1_v2_message_queue_start_index: u64,
351+
) -> Result<Box<dyn Iterator<Item = L1MessageEnvelope> + Send>, DerivationPipelineError> {
352+
let total_l1_messages = data.blocks.iter().map(|b| b.context.num_l1_messages as u64).sum();
353+
354+
let messages = if let Some(index) = data.queue_index_start() {
355+
provider.get_n_messages(index.into(), total_l1_messages).await.map_err(Into::into)?
356+
} else if let Some(hash) = data.prev_l1_message_queue_hash() {
357+
// If the message queue hash is zero then we should use the V2 L1 message queue start
358+
// index. We must apply this branch logic because we do not have a L1
359+
// message associated with a queue hash of ZERO (we only compute a queue
360+
// hash for the first L1 message of the V2 contract).
361+
if hash == &B256::ZERO {
362+
provider
363+
.get_n_messages(l1_v2_message_queue_start_index.into(), total_l1_messages)
364+
.await
365+
.map_err(Into::into)?
366+
} else {
367+
let mut messages = provider
368+
.get_n_messages((*hash).into(), total_l1_messages + 1)
369+
.await
370+
.map_err(Into::into)?;
371+
// we skip the first l1 message, as we are interested in the one starting after
372+
// prev_l1_message_queue_hash.
373+
messages.pop();
374+
messages
375+
}
376+
} else {
377+
return Err(DerivationPipelineError::MissingL1MessageQueueCursor)
378+
};
379+
380+
// Check we received the expected amount of L1 messages.
381+
if messages.len() as u64 != total_l1_messages {
382+
return Err(DerivationPipelineError::InvalidL1MessagesCount {
383+
expected: total_l1_messages,
384+
got: messages.len() as u64,
385+
})
386+
}
387+
388+
Ok(Box::new(messages.into_iter()))
389+
}
390+
356391
#[cfg(test)]
357392
mod tests {
358393
use super::*;
@@ -362,9 +397,7 @@ mod tests {
362397
use alloy_primitives::{address, b256, bytes, U256};
363398
use futures::StreamExt;
364399
use rollup_node_primitives::L1MessageEnvelope;
365-
use rollup_node_providers::{
366-
test_utils::MockL1Provider, DatabaseL1MessageProvider, L1ProviderError,
367-
};
400+
use rollup_node_providers::{test_utils::MockL1Provider, L1ProviderError};
368401
use scroll_alloy_consensus::TxL1Message;
369402
use scroll_alloy_rpc_types_engine::BlockDataHint;
370403
use scroll_codec::decoding::test_utils::read_to_bytes;
@@ -426,7 +459,7 @@ mod tests {
426459
async fn test_should_correctly_handle_batch_revert() -> eyre::Result<()> {
427460
// construct the pipeline.
428461
let db = Arc::new(setup_test_db().await);
429-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
462+
let l1_messages_provider = db.clone();
430463
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
431464

432465
let mut pipeline = DerivationPipeline {
@@ -497,7 +530,7 @@ mod tests {
497530
tx.commit().await?;
498531

499532
// construct the pipeline.
500-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
533+
let l1_messages_provider = db.clone();
501534
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
502535
let mut pipeline = DerivationPipeline::new(mock_l1_provider, db.clone(), u64::MAX);
503536

@@ -568,7 +601,7 @@ mod tests {
568601
tx.commit().await?;
569602

570603
// construct the pipeline.
571-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
604+
let l1_messages_provider = db.clone();
572605
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
573606
let mut pipeline = DerivationPipeline::new(mock_l1_provider, db, u64::MAX);
574607

@@ -627,7 +660,7 @@ mod tests {
627660
}
628661
tx.commit().await?;
629662

630-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
663+
let l1_messages_provider = db.clone();
631664
let l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
632665
let l2_provider = MockL2Provider;
633666

@@ -727,7 +760,7 @@ mod tests {
727760
}
728761
tx.commit().await?;
729762

730-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
763+
let l1_messages_provider = db.clone();
731764
let l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
732765
let l2_provider = MockL2Provider;
733766

@@ -783,7 +816,7 @@ mod tests {
783816
}
784817
tx.commit().await?;
785818

786-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
819+
let l1_messages_provider = db.clone();
787820
let l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
788821
let l2_provider = MockL2Provider;
789822

@@ -894,7 +927,7 @@ mod tests {
894927
}
895928
tx.commit().await?;
896929

897-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
930+
let l1_messages_provider = db.clone();
898931
let l1_provider = MockL1Provider {
899932
l1_messages_provider,
900933
blobs: HashMap::from([(
@@ -928,8 +961,7 @@ mod tests {
928961
Ok(())
929962
}
930963

931-
async fn new_test_pipeline(
932-
) -> DerivationPipeline<MockL1Provider<DatabaseL1MessageProvider<Arc<Database>>>> {
964+
async fn new_test_pipeline() -> DerivationPipeline<MockL1Provider<Arc<Database>>> {
933965
let initial_block = 200;
934966

935967
let batches = (initial_block - 100..initial_block)
@@ -948,7 +980,7 @@ mod tests {
948980
.collect();
949981

950982
let db = Arc::new(setup_test_db().await);
951-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
983+
let l1_messages_provider = db.clone();
952984
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
953985

954986
DerivationPipeline {

crates/manager/src/manager/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,9 @@ where
458458
match event {
459459
EngineDriverEvent::BlockImportOutcome(outcome) => {
460460
if let Some(block) = outcome.block() {
461+
if let Some(sequencer) = self.sequencer.as_mut() {
462+
sequencer.handle_new_payload(&block);
463+
}
461464
if let Some(event_sender) = self.event_sender.as_ref() {
462465
event_sender.notify(RollupManagerEvent::BlockImported(block.clone()));
463466
}
@@ -470,6 +473,11 @@ where
470473
let _ = signer.sign_block(payload.clone()).inspect_err(|err| error!(target: "scroll::node::manager", ?err, "Failed to send new payload to signer"));
471474
}
472475

476+
self.sequencer
477+
.as_mut()
478+
.expect("Sequencer must be enabled to build payload")
479+
.handle_new_payload(&payload);
480+
473481
if let Some(event_sender) = self.event_sender.as_ref() {
474482
event_sender.notify(RollupManagerEvent::BlockSequenced(payload));
475483
}
@@ -488,6 +496,9 @@ where
488496
}
489497
EngineDriverEvent::ChainImportOutcome(outcome) => {
490498
if let Some(block) = outcome.outcome.block() {
499+
if let Some(sequencer) = self.sequencer.as_mut() {
500+
sequencer.handle_new_payload(&block);
501+
}
491502
if let Some(event_sender) = self.event_sender.as_ref() {
492503
event_sender.notify(RollupManagerEvent::BlockImported(block));
493504
}

crates/node/src/args.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,16 @@ use rollup_node_manager::{
3131
};
3232
use rollup_node_primitives::{BlockInfo, NodeConfig};
3333
use rollup_node_providers::{
34-
BlobProvidersBuilder, DatabaseL1MessageProvider, FullL1Provider, L1MessageProvider, L1Provider,
35-
SystemContractProvider,
34+
BlobProvidersBuilder, FullL1Provider, L1MessageProvider, L1Provider, SystemContractProvider,
3635
};
3736
use rollup_node_sequencer::{L1MessageInclusionMode, Sequencer};
3837
use rollup_node_watcher::{L1Notification, L1Watcher};
3938
use scroll_alloy_hardforks::ScrollHardforks;
4039
use scroll_alloy_network::Scroll;
4140
use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi};
4241
use scroll_db::{
43-
Database, DatabaseConnectionProvider, DatabaseTransactionProvider, DatabaseWriteOperations,
42+
Database, DatabaseConnectionProvider, DatabaseReadOperations, DatabaseTransactionProvider,
43+
DatabaseWriteOperations,
4444
};
4545
use scroll_engine::{genesis_hash_from_chain_spec, EngineDriver, ForkchoiceState};
4646
use scroll_migration::traits::ScrollMigrator;
@@ -326,7 +326,7 @@ impl ScrollRollupNodeConfig {
326326
};
327327

328328
// Construct the l1 provider.
329-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
329+
let l1_messages_provider = db.clone();
330330
let blob_providers_builder = BlobProvidersBuilder {
331331
beacon: self.blob_provider_args.beacon_node_urls,
332332
s3: self.blob_provider_args.s3_url,
@@ -335,11 +335,13 @@ impl ScrollRollupNodeConfig {
335335
};
336336
let blob_provider =
337337
blob_providers_builder.build().await.expect("failed to construct L1 blob provider");
338-
339338
let l1_provider = FullL1Provider::new(blob_provider, l1_messages_provider.clone()).await;
340339

341340
// Construct the Sequencer.
342341
let chain_config = chain_spec.chain_config();
342+
let latest_l1_message = db.tx().await?.get_latest_executed_l1_message().await?;
343+
let sequencer_l1_messages_queue_index =
344+
latest_l1_message.map(|msg| msg.transaction.queue_index + 1).unwrap_or_default();
343345
let (sequencer, block_time, auto_start) = if self.sequencer_args.sequencer_enabled {
344346
let args = &self.sequencer_args;
345347
let sequencer = Sequencer::new(
@@ -351,6 +353,7 @@ impl ScrollRollupNodeConfig {
351353
.unwrap_or(chain_config.l1_config.num_l1_messages_per_block),
352354
0,
353355
self.sequencer_args.l1_message_inclusion_mode,
356+
sequencer_l1_messages_queue_index,
354357
);
355358
(Some(sequencer), (args.block_time != 0).then_some(args.block_time), args.auto_start)
356359
} else {

0 commit comments

Comments
 (0)