Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,12 @@ impl<
let db_tx = self.database.tx().await?;
let l1_latest = db_tx.get_latest_l1_block_number().await?;
let l1_finalized = db_tx.get_finalized_l1_block_number().await?;
let l1_processed = db_tx.get_processed_l1_block_number().await?;
let status = ChainOrchestratorStatus::new(
&self.sync_state,
l1_latest,
l1_finalized,
l1_processed,
self.engine.fcs().clone(),
);
let _ = tx.send(status);
Expand Down Expand Up @@ -488,6 +490,12 @@ impl<
notification: Arc<L1Notification>,
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
match &*notification {
L1Notification::Processed(block_number) => {
let tx = self.database.tx_mut().await?;
tx.set_processed_l1_block_number(*block_number).await?;
tx.commit().await?;
Ok(None)
}
L1Notification::Reorg(block_number) => self.handle_l1_reorg(*block_number).await,
L1Notification::Consensus(update) => {
self.consensus.update_config(update);
Expand Down
4 changes: 4 additions & 0 deletions crates/chain-orchestrator/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ impl ChainOrchestratorStatus {
sync_state: &SyncState,
l1_latest: u64,
l1_finalized: u64,
l1_processed: u64,
l2_fcs: ForkchoiceState,
) -> Self {
Self {
l1: L1ChainStatus {
status: sync_state.l1().clone(),
latest: l1_latest,
finalized: l1_finalized,
processed: l1_processed,
},
l2: L2ChainStatus { status: sync_state.l2().clone(), fcs: l2_fcs },
}
Expand All @@ -40,6 +42,8 @@ pub struct L1ChainStatus {
pub latest: u64,
/// The finalized block number of the chain.
pub finalized: u64,
/// The highest block number that has been processed.
pub processed: u64,
}

/// The status of the L2 chain.
Expand Down
31 changes: 31 additions & 0 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,23 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati
.map(|_| ())?)
}

/// Set the processed L1 block number.
async fn set_processed_l1_block_number(&self, block_number: u64) -> Result<(), DatabaseError> {
tracing::trace!(target: "scroll::db", block_number, "Updating the processed L1 block number in the database.");
let metadata: models::metadata::ActiveModel =
Metadata { key: "l1_processed_block".to_string(), value: block_number.to_string() }
.into();
Ok(models::metadata::Entity::insert(metadata)
.on_conflict(
OnConflict::column(models::metadata::Column::Key)
.update_column(models::metadata::Column::Value)
.to_owned(),
)
.exec(self.get_connection())
.await
.map(|_| ())?)
}

/// Set the L2 head block number.
async fn set_l2_head_block_number(&self, number: u64) -> Result<(), DatabaseError> {
tracing::trace!(target: "scroll::db", ?number, "Updating the L2 head block number in the database.");
Expand Down Expand Up @@ -543,6 +560,20 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync {
.expect("l1_finalized_block should always be a valid u64"))
}

/// Get the processed L1 block number from the database.
async fn get_processed_l1_block_number(&self) -> Result<u64, DatabaseError> {
Ok(models::metadata::Entity::find()
.filter(models::metadata::Column::Key.eq("l1_processed_block"))
.select_only()
.column(models::metadata::Column::Value)
.into_tuple::<String>()
.one(self.get_connection())
.await?
.expect("l1_processed_block should always be set")
.parse::<u64>()
.expect("l1_processed_block should always be a valid u64"))
}

/// Get the latest L2 head block info.
async fn get_l2_head_block_number(&self) -> Result<u64, DatabaseError> {
Ok(models::metadata::Entity::find()
Expand Down
2 changes: 2 additions & 0 deletions crates/database/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod m20250923_135359_add_index_block_hash;
mod m20250929_161536_add_additional_indexes;
mod m20251001_125444_add_index_processed;
mod m20251005_160938_add_initial_l1_block_numbers;
mod m20251013_140946_add_initial_l1_processed_block_number;

mod migration_info;
pub use migration_info::{
Expand All @@ -40,6 +41,7 @@ impl<MI: MigrationInfo + Send + Sync + 'static> MigratorTrait for Migrator<MI> {
Box::new(m20250929_161536_add_additional_indexes::Migration),
Box::new(m20251001_125444_add_index_processed::Migration),
Box::new(m20251005_160938_add_initial_l1_block_numbers::Migration),
Box::new(m20251013_140946_add_initial_l1_processed_block_number::Migration),
]
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();

// Insert both keys if they don't already exist
db.execute_unprepared(
r#"
INSERT INTO metadata (key, value)
VALUES
('l1_processed_block', '0')
ON CONFLICT(key) DO NOTHING;
"#,
)
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();

db.execute_unprepared(
r#"
DELETE FROM metadata
WHERE key = 'l1_processed_block';
"#,
)
.await?;

Ok(())
}
}
8 changes: 6 additions & 2 deletions crates/watcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ pub struct L1Watcher<EP> {
/// The L1 notification type yielded by the [`L1Watcher`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum L1Notification {
/// A notification that the L1 watcher has processed up to a given block number.
Processed(u64),
/// A notification for a reorg of the L1 up to a given block number.
Reorg(u64),
/// A new batch has been committed on the L1 rollup contract.
Expand Down Expand Up @@ -124,6 +126,7 @@ pub enum L1Notification {
impl Display for L1Notification {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Processed(n) => write!(f, "Processed({n})"),
Self::Reorg(n) => write!(f, "Reorg({n:?})"),
Self::BatchCommit(b) => {
write!(f, "BatchCommit {{ hash: {}, index: {} }}", b.hash, b.index)
Expand Down Expand Up @@ -266,7 +269,7 @@ where
self.notify_all(notifications).await?;

// update the latest block the l1 watcher has indexed.
self.update_current_block(&latest);
self.update_current_block(&latest).await?;
}

Ok(())
Expand Down Expand Up @@ -606,11 +609,12 @@ where
}

/// Updates the current block number, saturating at the head of the chain.
fn update_current_block(&mut self, latest: &Block) {
async fn update_current_block(&mut self, latest: &Block) -> L1WatcherResult<()> {
self.current_block_number = self
.current_block_number
.saturating_add(LOGS_QUERY_BLOCK_RANGE)
.min(latest.header.number);
self.notify(L1Notification::Processed(self.current_block_number)).await
}

/// Returns the latest L1 block.
Expand Down
28 changes: 24 additions & 4 deletions crates/watcher/tests/reorg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,26 @@ async fn test_should_detect_reorg() -> eyre::Result<()> {
for (latest, finalized) in latest_blocks[1..].iter().zip(finalized_blocks[1..].iter()) {
// check finalized first.
if finalized_number < finalized.header.number {
let notification = l1_watcher.recv().await.unwrap();
let mut notification = l1_watcher.recv().await.unwrap();
// skip the `L1Notification::Processed` notifications
if matches!(notification.as_ref(), L1Notification::Processed(_)) {
notification = l1_watcher.recv().await.unwrap();
}
assert_eq!(notification.as_ref(), &L1Notification::Finalized(finalized.header.number));
}

if latest_number == latest.header.number {
continue
}

// skip the `L1Notification::Synced` notifications
let mut notification = l1_watcher.recv().await.unwrap();

// skip the `L1Notification::Processed` notifications
if matches!(notification.as_ref(), L1Notification::Processed(_)) {
notification = l1_watcher.recv().await.unwrap();
}

// skip the `L1Notification::Synced` notifications
if matches!(notification.as_ref(), L1Notification::Synced) {
notification = l1_watcher.recv().await.unwrap();
}
Expand Down Expand Up @@ -174,16 +184,26 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> {
for (latest, finalized) in latest_blocks[1..].iter().zip(finalized_blocks[1..].iter()) {
// check finalized first.
if finalized_number < finalized.header.number {
let notification = l1_watcher.recv().await.unwrap();
let mut notification = l1_watcher.recv().await.unwrap();
// skip the `L1Notification::Processed` notifications
if matches!(notification.as_ref(), L1Notification::Processed(_)) {
notification = l1_watcher.recv().await.unwrap();
}
assert_eq!(notification.as_ref(), &L1Notification::Finalized(finalized.header.number));
}

if latest_number == latest.header.number {
continue
}

// skip the `L1Notification::Synced` notifications
let mut notification = l1_watcher.recv().await.unwrap();

// skip the `L1Notification::Processed` notifications
if matches!(notification.as_ref(), L1Notification::Processed(_)) {
notification = l1_watcher.recv().await.unwrap();
}

// skip the `L1Notification::Synced` notifications
if matches!(notification.as_ref(), L1Notification::Synced) {
notification = l1_watcher.recv().await.unwrap();
}
Expand Down
Loading