diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index e7ba91d9..41ac5e58 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -330,10 +330,12 @@ impl< let db_tx = self.database.tx().await?; let l1_latest = db_tx.get_latest_l1_block_number().await?; let l1_finalized = db_tx.get_finalized_l1_block_number().await?; + let l1_processed = db_tx.get_processed_l1_block_number().await?; let status = ChainOrchestratorStatus::new( &self.sync_state, l1_latest, l1_finalized, + l1_processed, self.engine.fcs().clone(), ); let _ = tx.send(status); @@ -488,6 +490,12 @@ impl< notification: Arc, ) -> Result, ChainOrchestratorError> { match &*notification { + L1Notification::Processed(block_number) => { + let tx = self.database.tx_mut().await?; + tx.set_processed_l1_block_number(*block_number).await?; + tx.commit().await?; + Ok(None) + } L1Notification::Reorg(block_number) => self.handle_l1_reorg(*block_number).await, L1Notification::Consensus(update) => { self.consensus.update_config(update); diff --git a/crates/chain-orchestrator/src/status.rs b/crates/chain-orchestrator/src/status.rs index 87f7f75f..ba70e15c 100644 --- a/crates/chain-orchestrator/src/status.rs +++ b/crates/chain-orchestrator/src/status.rs @@ -17,6 +17,7 @@ impl ChainOrchestratorStatus { sync_state: &SyncState, l1_latest: u64, l1_finalized: u64, + l1_processed: u64, l2_fcs: ForkchoiceState, ) -> Self { Self { @@ -24,6 +25,7 @@ impl ChainOrchestratorStatus { status: sync_state.l1().clone(), latest: l1_latest, finalized: l1_finalized, + processed: l1_processed, }, l2: L2ChainStatus { status: sync_state.l2().clone(), fcs: l2_fcs }, } @@ -40,6 +42,8 @@ pub struct L1ChainStatus { pub latest: u64, /// The finalized block number of the chain. pub finalized: u64, + /// The highest block number that has been processed. + pub processed: u64, } /// The status of the L2 chain. diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index 73e705aa..a40178f1 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -98,6 +98,23 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati .map(|_| ())?) } + /// Set the processed L1 block number. + async fn set_processed_l1_block_number(&self, block_number: u64) -> Result<(), DatabaseError> { + tracing::trace!(target: "scroll::db", block_number, "Updating the processed L1 block number in the database."); + let metadata: models::metadata::ActiveModel = + Metadata { key: "l1_processed_block".to_string(), value: block_number.to_string() } + .into(); + Ok(models::metadata::Entity::insert(metadata) + .on_conflict( + OnConflict::column(models::metadata::Column::Key) + .update_column(models::metadata::Column::Value) + .to_owned(), + ) + .exec(self.get_connection()) + .await + .map(|_| ())?) + } + /// Set the L2 head block number. async fn set_l2_head_block_number(&self, number: u64) -> Result<(), DatabaseError> { tracing::trace!(target: "scroll::db", ?number, "Updating the L2 head block number in the database."); @@ -543,6 +560,20 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { .expect("l1_finalized_block should always be a valid u64")) } + /// Get the processed L1 block number from the database. + async fn get_processed_l1_block_number(&self) -> Result { + Ok(models::metadata::Entity::find() + .filter(models::metadata::Column::Key.eq("l1_processed_block")) + .select_only() + .column(models::metadata::Column::Value) + .into_tuple::() + .one(self.get_connection()) + .await? + .expect("l1_processed_block should always be set") + .parse::() + .expect("l1_processed_block should always be a valid u64")) + } + /// Get the latest L2 head block info. async fn get_l2_head_block_number(&self) -> Result { Ok(models::metadata::Entity::find() diff --git a/crates/database/migration/src/lib.rs b/crates/database/migration/src/lib.rs index bd05f56c..9a8bff99 100644 --- a/crates/database/migration/src/lib.rs +++ b/crates/database/migration/src/lib.rs @@ -14,6 +14,7 @@ mod m20250923_135359_add_index_block_hash; mod m20250929_161536_add_additional_indexes; mod m20251001_125444_add_index_processed; mod m20251005_160938_add_initial_l1_block_numbers; +mod m20251013_140946_add_initial_l1_processed_block_number; mod migration_info; pub use migration_info::{ @@ -40,6 +41,7 @@ impl MigratorTrait for Migrator { Box::new(m20250929_161536_add_additional_indexes::Migration), Box::new(m20251001_125444_add_index_processed::Migration), Box::new(m20251005_160938_add_initial_l1_block_numbers::Migration), + Box::new(m20251013_140946_add_initial_l1_processed_block_number::Migration), ] } } diff --git a/crates/database/migration/src/m20251013_140946_add_initial_l1_processed_block_number.rs b/crates/database/migration/src/m20251013_140946_add_initial_l1_processed_block_number.rs new file mode 100644 index 00000000..3dcc7168 --- /dev/null +++ b/crates/database/migration/src/m20251013_140946_add_initial_l1_processed_block_number.rs @@ -0,0 +1,38 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let db = manager.get_connection(); + + // Insert both keys if they don't already exist + db.execute_unprepared( + r#" + INSERT INTO metadata (key, value) + VALUES + ('l1_processed_block', '0') + ON CONFLICT(key) DO NOTHING; + "#, + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let db = manager.get_connection(); + + db.execute_unprepared( + r#" + DELETE FROM metadata + WHERE key = 'l1_processed_block'; + "#, + ) + .await?; + + Ok(()) + } +} diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index da9dca49..9d455daa 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -89,6 +89,8 @@ pub struct L1Watcher { /// The L1 notification type yielded by the [`L1Watcher`]. #[derive(Debug, Clone, PartialEq, Eq)] pub enum L1Notification { + /// A notification that the L1 watcher has processed up to a given block number. + Processed(u64), /// A notification for a reorg of the L1 up to a given block number. Reorg(u64), /// A new batch has been committed on the L1 rollup contract. @@ -124,6 +126,7 @@ pub enum L1Notification { impl Display for L1Notification { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { + Self::Processed(n) => write!(f, "Processed({n})"), Self::Reorg(n) => write!(f, "Reorg({n:?})"), Self::BatchCommit(b) => { write!(f, "BatchCommit {{ hash: {}, index: {} }}", b.hash, b.index) @@ -266,7 +269,7 @@ where self.notify_all(notifications).await?; // update the latest block the l1 watcher has indexed. - self.update_current_block(&latest); + self.update_current_block(&latest).await?; } Ok(()) @@ -606,11 +609,12 @@ where } /// Updates the current block number, saturating at the head of the chain. - fn update_current_block(&mut self, latest: &Block) { + async fn update_current_block(&mut self, latest: &Block) -> L1WatcherResult<()> { self.current_block_number = self .current_block_number .saturating_add(LOGS_QUERY_BLOCK_RANGE) .min(latest.header.number); + self.notify(L1Notification::Processed(self.current_block_number)).await } /// Returns the latest L1 block. diff --git a/crates/watcher/tests/reorg.rs b/crates/watcher/tests/reorg.rs index 1088d1ed..3fc6b2b9 100644 --- a/crates/watcher/tests/reorg.rs +++ b/crates/watcher/tests/reorg.rs @@ -83,7 +83,11 @@ async fn test_should_detect_reorg() -> eyre::Result<()> { for (latest, finalized) in latest_blocks[1..].iter().zip(finalized_blocks[1..].iter()) { // check finalized first. if finalized_number < finalized.header.number { - let notification = l1_watcher.recv().await.unwrap(); + let mut notification = l1_watcher.recv().await.unwrap(); + // skip the `L1Notification::Processed` notifications + if matches!(notification.as_ref(), L1Notification::Processed(_)) { + notification = l1_watcher.recv().await.unwrap(); + } assert_eq!(notification.as_ref(), &L1Notification::Finalized(finalized.header.number)); } @@ -91,8 +95,14 @@ async fn test_should_detect_reorg() -> eyre::Result<()> { continue } - // skip the `L1Notification::Synced` notifications let mut notification = l1_watcher.recv().await.unwrap(); + + // skip the `L1Notification::Processed` notifications + if matches!(notification.as_ref(), L1Notification::Processed(_)) { + notification = l1_watcher.recv().await.unwrap(); + } + + // skip the `L1Notification::Synced` notifications if matches!(notification.as_ref(), L1Notification::Synced) { notification = l1_watcher.recv().await.unwrap(); } @@ -174,7 +184,11 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> { for (latest, finalized) in latest_blocks[1..].iter().zip(finalized_blocks[1..].iter()) { // check finalized first. if finalized_number < finalized.header.number { - let notification = l1_watcher.recv().await.unwrap(); + let mut notification = l1_watcher.recv().await.unwrap(); + // skip the `L1Notification::Processed` notifications + if matches!(notification.as_ref(), L1Notification::Processed(_)) { + notification = l1_watcher.recv().await.unwrap(); + } assert_eq!(notification.as_ref(), &L1Notification::Finalized(finalized.header.number)); } @@ -182,8 +196,14 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> { continue } - // skip the `L1Notification::Synced` notifications let mut notification = l1_watcher.recv().await.unwrap(); + + // skip the `L1Notification::Processed` notifications + if matches!(notification.as_ref(), L1Notification::Processed(_)) { + notification = l1_watcher.recv().await.unwrap(); + } + + // skip the `L1Notification::Synced` notifications if matches!(notification.as_ref(), L1Notification::Synced) { notification = l1_watcher.recv().await.unwrap(); }