Skip to content

Commit fa6ff33

Browse files
greged93frisitano
andauthored
fix: l1 messages cursor (#324)
* refactor database locking model * feat: rework l1 message provider interface * fix prepare on strat up * feat: advance L1 message index after payload is accepted by EN * feat: update based on latest database interface changes * feat: update sequencer L1 message index based on BlockImportOutcome * fix: fmt * feat: update trait based on comments * feat: answer comments --------- Co-authored-by: frisitano <[email protected]> Co-authored-by: frisitano <[email protected]>
1 parent 8f8c250 commit fa6ff33

File tree

19 files changed

+323
-344
lines changed

19 files changed

+323
-344
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/codec/src/decoding/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub mod v4;
3333
pub mod v7;
3434

3535
/// Decoded payload.
36-
pub(crate) mod payload;
36+
pub mod payload;
3737

3838
/// Tests utils.
3939
#[cfg(any(test, feature = "test-utils"))]

crates/database/db/src/operations.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,18 @@ impl fmt::Display for L1MessageStart {
667667
}
668668
}
669669

670+
impl From<u64> for L1MessageStart {
671+
fn from(value: u64) -> Self {
672+
Self::Index(value)
673+
}
674+
}
675+
676+
impl From<B256> for L1MessageStart {
677+
fn from(value: B256) -> Self {
678+
Self::Hash(value)
679+
}
680+
}
681+
670682
/// The result of [`DatabaseWriteOperations::unwind`].
671683
#[derive(Debug)]
672684
pub struct UnwindResult {

crates/derivation-pipeline/benches/pipeline.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use alloy_primitives::{address, b256, bytes, U256};
88
use criterion::{criterion_group, criterion_main, Criterion};
99
use futures::StreamExt;
1010
use rollup_node_primitives::{BatchCommitData, BatchInfo, L1MessageEnvelope};
11-
use rollup_node_providers::{test_utils::MockL1Provider, DatabaseL1MessageProvider};
11+
use rollup_node_providers::test_utils::MockL1Provider;
1212
use scroll_alloy_consensus::TxL1Message;
1313
use scroll_codec::decoding::test_utils::read_to_bytes;
1414
use scroll_db::{
@@ -17,8 +17,7 @@ use scroll_db::{
1717
use scroll_derivation_pipeline::DerivationPipeline;
1818
use tokio::runtime::{Handle, Runtime};
1919

20-
async fn setup_pipeline(
21-
) -> DerivationPipeline<MockL1Provider<DatabaseL1MessageProvider<Arc<Database>>>> {
20+
async fn setup_pipeline() -> DerivationPipeline<MockL1Provider<Arc<Database>>> {
2221
// load batch data in the db.
2322
let db = Arc::new(setup_test_db().await);
2423
let raw_calldata = read_to_bytes("./testdata/calldata_v0.bin").unwrap();
@@ -69,7 +68,7 @@ async fn setup_pipeline(
6968
tx.commit().await.unwrap();
7069

7170
// construct the pipeline.
72-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
71+
let l1_messages_provider = db.clone();
7372
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
7473
DerivationPipeline::new(mock_l1_provider, db, u64::MAX)
7574
}

crates/derivation-pipeline/src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,14 @@ pub enum DerivationPipelineError {
88
/// Missing L1 messages cursor.
99
#[error("missing l1 message queue cursor")]
1010
MissingL1MessageQueueCursor,
11+
/// Invalid L1 messages count.
12+
#[error("invalid l1 messages count: expected {expected}, got {got}")]
13+
InvalidL1MessagesCount {
14+
/// The expected count.
15+
expected: u64,
16+
/// The actual count.
17+
got: u64,
18+
},
1119
/// Missing L1 message.
1220
#[error("missing l1 message for L2 block {0:?}")]
1321
MissingL1Message(L2Block),

crates/derivation-pipeline/src/lib.rs

Lines changed: 75 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ use core::{
2424
};
2525
use futures::{FutureExt, Stream};
2626
use rollup_node_primitives::{
27-
BatchCommitData, BatchInfo, ScrollPayloadAttributesWithBatchInfo, WithBlockNumber,
28-
WithFinalizedBatchInfo, WithFinalizedBlockNumber,
27+
BatchCommitData, BatchInfo, L1MessageEnvelope, ScrollPayloadAttributesWithBatchInfo,
28+
WithBlockNumber, WithFinalizedBatchInfo, WithFinalizedBlockNumber,
2929
};
3030
use rollup_node_providers::{BlockDataProvider, L1Provider};
3131
use scroll_alloy_rpc_types_engine::{BlockDataHint, ScrollPayloadAttributes};
32-
use scroll_codec::Codec;
32+
use scroll_codec::{decoding::payload::PayloadData, Codec};
3333
use scroll_db::{Database, DatabaseReadOperations, DatabaseTransactionProvider};
3434
use tokio::time::Interval;
3535

@@ -272,25 +272,10 @@ pub async fn derive<L1P: L1Provider + Sync + Send, L2P: BlockDataProvider + Sync
272272
let decoded = Codec::decode(&data)?;
273273

274274
// set the cursor for the l1 provider.
275-
let data = &decoded.data;
276-
if let Some(index) = data.queue_index_start() {
277-
l1_provider.set_queue_index_cursor(index);
278-
} else if let Some(hash) = data.prev_l1_message_queue_hash() {
279-
// If the message queue hash is zero then we should use the V2 L1 message queue start index.
280-
// We must apply this branch logic because we do not have a L1 message associated with a
281-
// queue hash of ZERO (we only compute a queue hash for the first L1 message of the V2
282-
// contract).
283-
if hash == &B256::ZERO {
284-
l1_provider.set_queue_index_cursor(l1_v2_message_queue_start_index);
285-
} else {
286-
l1_provider.set_hash_cursor(*hash).await;
287-
// we skip the first l1 message, as we are interested in the one starting after
288-
// prev_l1_message_queue_hash.
289-
let _ = l1_provider.next_l1_message().await.map_err(Into::into)?;
290-
}
291-
} else {
292-
return Err(DerivationPipelineError::MissingL1MessageQueueCursor)
293-
}
275+
let payload_data = &decoded.data;
276+
let mut l1_messages_iter =
277+
iter_l1_messages_from_payload(&l1_provider, payload_data, l1_v2_message_queue_start_index)
278+
.await?;
294279

295280
let skipped_l1_messages = decoded.data.skipped_l1_message_bitmap.clone().unwrap_or_default();
296281
let mut skipped_l1_messages = skipped_l1_messages.into_iter();
@@ -302,18 +287,15 @@ pub async fn derive<L1P: L1Provider + Sync + Send, L2P: BlockDataProvider + Sync
302287
for _ in 0..block.context.num_l1_messages {
303288
// check if the next l1 message should be skipped.
304289
if matches!(skipped_l1_messages.next(), Some(bit) if bit) {
305-
l1_provider.increment_cursor();
290+
let _ = l1_messages_iter.next();
306291
continue;
307292
}
308293

309-
// TODO: fetch L1 messages range.
310-
let l1_message = l1_provider
311-
.next_l1_message()
312-
.await
313-
.map_err(Into::into)?
294+
let l1_message = l1_messages_iter
295+
.next()
314296
.ok_or(DerivationPipelineError::MissingL1Message(block.clone()))?;
315-
let mut bytes = Vec::with_capacity(l1_message.eip2718_encoded_length());
316-
l1_message.eip2718_encode(&mut bytes);
297+
let mut bytes = Vec::with_capacity(l1_message.transaction.eip2718_encoded_length());
298+
l1_message.transaction.eip2718_encode(&mut bytes);
317299
txs.push(bytes.into());
318300
}
319301

@@ -353,6 +335,59 @@ fn delayed_interval(interval: u64) -> Interval {
353335
interval
354336
}
355337

338+
/// Returns an iterator over L1 messages from the `PayloadData`. If the `PayloadData` returns a
339+
/// `prev_l1_message_queue_hash` of zero, uses the `l1_v2_message_queue_start_index` to fetch
340+
/// messages from the L1 provider.
341+
///
342+
/// # Errors
343+
///
344+
/// Propagates any error from the L1 provider.
345+
/// Returns an error if the retrieved number of L1 messages does not match the expected number from
346+
/// the payload data.
347+
async fn iter_l1_messages_from_payload<L1P: L1Provider>(
348+
provider: &L1P,
349+
data: &PayloadData,
350+
l1_v2_message_queue_start_index: u64,
351+
) -> Result<Box<dyn Iterator<Item = L1MessageEnvelope> + Send>, DerivationPipelineError> {
352+
let total_l1_messages = data.blocks.iter().map(|b| b.context.num_l1_messages as u64).sum();
353+
354+
let messages = if let Some(index) = data.queue_index_start() {
355+
provider.get_n_messages(index.into(), total_l1_messages).await.map_err(Into::into)?
356+
} else if let Some(hash) = data.prev_l1_message_queue_hash() {
357+
// If the message queue hash is zero then we should use the V2 L1 message queue start
358+
// index. We must apply this branch logic because we do not have a L1
359+
// message associated with a queue hash of ZERO (we only compute a queue
360+
// hash for the first L1 message of the V2 contract).
361+
if hash == &B256::ZERO {
362+
provider
363+
.get_n_messages(l1_v2_message_queue_start_index.into(), total_l1_messages)
364+
.await
365+
.map_err(Into::into)?
366+
} else {
367+
let mut messages = provider
368+
.get_n_messages((*hash).into(), total_l1_messages + 1)
369+
.await
370+
.map_err(Into::into)?;
371+
// we skip the first l1 message, as we are interested in the one starting after
372+
// prev_l1_message_queue_hash.
373+
messages.pop();
374+
messages
375+
}
376+
} else {
377+
return Err(DerivationPipelineError::MissingL1MessageQueueCursor)
378+
};
379+
380+
// Check we received the expected amount of L1 messages.
381+
if messages.len() as u64 != total_l1_messages {
382+
return Err(DerivationPipelineError::InvalidL1MessagesCount {
383+
expected: total_l1_messages,
384+
got: messages.len() as u64,
385+
})
386+
}
387+
388+
Ok(Box::new(messages.into_iter()))
389+
}
390+
356391
#[cfg(test)]
357392
mod tests {
358393
use super::*;
@@ -362,9 +397,7 @@ mod tests {
362397
use alloy_primitives::{address, b256, bytes, U256};
363398
use futures::StreamExt;
364399
use rollup_node_primitives::L1MessageEnvelope;
365-
use rollup_node_providers::{
366-
test_utils::MockL1Provider, DatabaseL1MessageProvider, L1ProviderError,
367-
};
400+
use rollup_node_providers::{test_utils::MockL1Provider, L1ProviderError};
368401
use scroll_alloy_consensus::TxL1Message;
369402
use scroll_alloy_rpc_types_engine::BlockDataHint;
370403
use scroll_codec::decoding::test_utils::read_to_bytes;
@@ -426,7 +459,7 @@ mod tests {
426459
async fn test_should_correctly_handle_batch_revert() -> eyre::Result<()> {
427460
// construct the pipeline.
428461
let db = Arc::new(setup_test_db().await);
429-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
462+
let l1_messages_provider = db.clone();
430463
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
431464

432465
let mut pipeline = DerivationPipeline {
@@ -497,7 +530,7 @@ mod tests {
497530
tx.commit().await?;
498531

499532
// construct the pipeline.
500-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
533+
let l1_messages_provider = db.clone();
501534
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
502535
let mut pipeline = DerivationPipeline::new(mock_l1_provider, db.clone(), u64::MAX);
503536

@@ -568,7 +601,7 @@ mod tests {
568601
tx.commit().await?;
569602

570603
// construct the pipeline.
571-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
604+
let l1_messages_provider = db.clone();
572605
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
573606
let mut pipeline = DerivationPipeline::new(mock_l1_provider, db, u64::MAX);
574607

@@ -627,7 +660,7 @@ mod tests {
627660
}
628661
tx.commit().await?;
629662

630-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
663+
let l1_messages_provider = db.clone();
631664
let l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
632665
let l2_provider = MockL2Provider;
633666

@@ -727,7 +760,7 @@ mod tests {
727760
}
728761
tx.commit().await?;
729762

730-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
763+
let l1_messages_provider = db.clone();
731764
let l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
732765
let l2_provider = MockL2Provider;
733766

@@ -783,7 +816,7 @@ mod tests {
783816
}
784817
tx.commit().await?;
785818

786-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
819+
let l1_messages_provider = db.clone();
787820
let l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
788821
let l2_provider = MockL2Provider;
789822

@@ -894,7 +927,7 @@ mod tests {
894927
}
895928
tx.commit().await?;
896929

897-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
930+
let l1_messages_provider = db.clone();
898931
let l1_provider = MockL1Provider {
899932
l1_messages_provider,
900933
blobs: HashMap::from([(
@@ -928,8 +961,7 @@ mod tests {
928961
Ok(())
929962
}
930963

931-
async fn new_test_pipeline(
932-
) -> DerivationPipeline<MockL1Provider<DatabaseL1MessageProvider<Arc<Database>>>> {
964+
async fn new_test_pipeline() -> DerivationPipeline<MockL1Provider<Arc<Database>>> {
933965
let initial_block = 200;
934966

935967
let batches = (initial_block - 100..initial_block)
@@ -948,7 +980,7 @@ mod tests {
948980
.collect();
949981

950982
let db = Arc::new(setup_test_db().await);
951-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
983+
let l1_messages_provider = db.clone();
952984
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
953985

954986
DerivationPipeline {

crates/manager/src/manager/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,9 @@ where
458458
match event {
459459
EngineDriverEvent::BlockImportOutcome(outcome) => {
460460
if let Some(block) = outcome.block() {
461+
if let Some(sequencer) = self.sequencer.as_mut() {
462+
sequencer.handle_new_payload(&block);
463+
}
461464
if let Some(event_sender) = self.event_sender.as_ref() {
462465
event_sender.notify(RollupManagerEvent::BlockImported(block.clone()));
463466
}
@@ -470,6 +473,11 @@ where
470473
let _ = signer.sign_block(payload.clone()).inspect_err(|err| error!(target: "scroll::node::manager", ?err, "Failed to send new payload to signer"));
471474
}
472475

476+
self.sequencer
477+
.as_mut()
478+
.expect("Sequencer must be enabled to build payload")
479+
.handle_new_payload(&payload);
480+
473481
if let Some(event_sender) = self.event_sender.as_ref() {
474482
event_sender.notify(RollupManagerEvent::BlockSequenced(payload));
475483
}
@@ -488,6 +496,9 @@ where
488496
}
489497
EngineDriverEvent::ChainImportOutcome(outcome) => {
490498
if let Some(block) = outcome.outcome.block() {
499+
if let Some(sequencer) = self.sequencer.as_mut() {
500+
sequencer.handle_new_payload(&block);
501+
}
491502
if let Some(event_sender) = self.event_sender.as_ref() {
492503
event_sender.notify(RollupManagerEvent::BlockImported(block));
493504
}

crates/node/src/args.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ use rollup_node_manager::{
3131
};
3232
use rollup_node_primitives::{BlockInfo, NodeConfig};
3333
use rollup_node_providers::{
34-
BlobProvidersBuilder, DatabaseL1MessageProvider, FullL1Provider, L1MessageProvider, L1Provider,
35-
SystemContractProvider,
34+
BlobProvidersBuilder, FullL1Provider, L1MessageProvider, L1Provider, SystemContractProvider,
3635
};
3736
use rollup_node_sequencer::{L1MessageInclusionMode, Sequencer};
3837
use rollup_node_watcher::{L1Notification, L1Watcher};
@@ -326,7 +325,7 @@ impl ScrollRollupNodeConfig {
326325
};
327326

328327
// Construct the l1 provider.
329-
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
328+
let l1_messages_provider = db.clone();
330329
let blob_providers_builder = BlobProvidersBuilder {
331330
beacon: self.blob_provider_args.beacon_node_urls,
332331
s3: self.blob_provider_args.s3_url,
@@ -335,7 +334,6 @@ impl ScrollRollupNodeConfig {
335334
};
336335
let blob_provider =
337336
blob_providers_builder.build().await.expect("failed to construct L1 blob provider");
338-
339337
let l1_provider = FullL1Provider::new(blob_provider, l1_messages_provider.clone()).await;
340338

341339
// Construct the Sequencer.
@@ -351,6 +349,8 @@ impl ScrollRollupNodeConfig {
351349
.unwrap_or(chain_config.l1_config.num_l1_messages_per_block),
352350
0,
353351
self.sequencer_args.l1_message_inclusion_mode,
352+
// TODO (issue 169): update with correct start value.
353+
0,
354354
);
355355
(Some(sequencer), (args.block_time != 0).then_some(args.block_time), args.auto_start)
356356
} else {

0 commit comments

Comments
 (0)