Skip to content

Commit d86dc0c

Browse files
authored
feat: add L1 processed value (#356)
* add L1 processed value * remove changes to existing migrations
1 parent 5472bfd commit d86dc0c

File tree

7 files changed

+113
-6
lines changed

7 files changed

+113
-6
lines changed

crates/chain-orchestrator/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,10 +330,12 @@ impl<
330330
let db_tx = self.database.tx().await?;
331331
let l1_latest = db_tx.get_latest_l1_block_number().await?;
332332
let l1_finalized = db_tx.get_finalized_l1_block_number().await?;
333+
let l1_processed = db_tx.get_processed_l1_block_number().await?;
333334
let status = ChainOrchestratorStatus::new(
334335
&self.sync_state,
335336
l1_latest,
336337
l1_finalized,
338+
l1_processed,
337339
self.engine.fcs().clone(),
338340
);
339341
let _ = tx.send(status);
@@ -488,6 +490,12 @@ impl<
488490
notification: Arc<L1Notification>,
489491
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
490492
match &*notification {
493+
L1Notification::Processed(block_number) => {
494+
let tx = self.database.tx_mut().await?;
495+
tx.set_processed_l1_block_number(*block_number).await?;
496+
tx.commit().await?;
497+
Ok(None)
498+
}
491499
L1Notification::Reorg(block_number) => self.handle_l1_reorg(*block_number).await,
492500
L1Notification::Consensus(update) => {
493501
self.consensus.update_config(update);

crates/chain-orchestrator/src/status.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@ impl ChainOrchestratorStatus {
1717
sync_state: &SyncState,
1818
l1_latest: u64,
1919
l1_finalized: u64,
20+
l1_processed: u64,
2021
l2_fcs: ForkchoiceState,
2122
) -> Self {
2223
Self {
2324
l1: L1ChainStatus {
2425
status: sync_state.l1().clone(),
2526
latest: l1_latest,
2627
finalized: l1_finalized,
28+
processed: l1_processed,
2729
},
2830
l2: L2ChainStatus { status: sync_state.l2().clone(), fcs: l2_fcs },
2931
}
@@ -40,6 +42,8 @@ pub struct L1ChainStatus {
4042
pub latest: u64,
4143
/// The finalized block number of the chain.
4244
pub finalized: u64,
45+
/// The highest block number that has been processed.
46+
pub processed: u64,
4347
}
4448

4549
/// The status of the L2 chain.

crates/database/db/src/operations.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,23 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati
9898
.map(|_| ())?)
9999
}
100100

101+
/// Set the processed L1 block number.
102+
async fn set_processed_l1_block_number(&self, block_number: u64) -> Result<(), DatabaseError> {
103+
tracing::trace!(target: "scroll::db", block_number, "Updating the processed L1 block number in the database.");
104+
let metadata: models::metadata::ActiveModel =
105+
Metadata { key: "l1_processed_block".to_string(), value: block_number.to_string() }
106+
.into();
107+
Ok(models::metadata::Entity::insert(metadata)
108+
.on_conflict(
109+
OnConflict::column(models::metadata::Column::Key)
110+
.update_column(models::metadata::Column::Value)
111+
.to_owned(),
112+
)
113+
.exec(self.get_connection())
114+
.await
115+
.map(|_| ())?)
116+
}
117+
101118
/// Set the L2 head block number.
102119
async fn set_l2_head_block_number(&self, number: u64) -> Result<(), DatabaseError> {
103120
tracing::trace!(target: "scroll::db", ?number, "Updating the L2 head block number in the database.");
@@ -543,6 +560,20 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync {
543560
.expect("l1_finalized_block should always be a valid u64"))
544561
}
545562

563+
/// Get the processed L1 block number from the database.
564+
async fn get_processed_l1_block_number(&self) -> Result<u64, DatabaseError> {
565+
Ok(models::metadata::Entity::find()
566+
.filter(models::metadata::Column::Key.eq("l1_processed_block"))
567+
.select_only()
568+
.column(models::metadata::Column::Value)
569+
.into_tuple::<String>()
570+
.one(self.get_connection())
571+
.await?
572+
.expect("l1_processed_block should always be set")
573+
.parse::<u64>()
574+
.expect("l1_processed_block should always be a valid u64"))
575+
}
576+
546577
/// Get the latest L2 head block info.
547578
async fn get_l2_head_block_number(&self) -> Result<u64, DatabaseError> {
548579
Ok(models::metadata::Entity::find()

crates/database/migration/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ mod m20250923_135359_add_index_block_hash;
1414
mod m20250929_161536_add_additional_indexes;
1515
mod m20251001_125444_add_index_processed;
1616
mod m20251005_160938_add_initial_l1_block_numbers;
17+
mod m20251013_140946_add_initial_l1_processed_block_number;
1718

1819
mod migration_info;
1920
pub use migration_info::{
@@ -40,6 +41,7 @@ impl<MI: MigrationInfo + Send + Sync + 'static> MigratorTrait for Migrator<MI> {
4041
Box::new(m20250929_161536_add_additional_indexes::Migration),
4142
Box::new(m20251001_125444_add_index_processed::Migration),
4243
Box::new(m20251005_160938_add_initial_l1_block_numbers::Migration),
44+
Box::new(m20251013_140946_add_initial_l1_processed_block_number::Migration),
4345
]
4446
}
4547
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use sea_orm_migration::prelude::*;
2+
3+
#[derive(DeriveMigrationName)]
4+
pub struct Migration;
5+
6+
#[async_trait::async_trait]
7+
impl MigrationTrait for Migration {
8+
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
9+
let db = manager.get_connection();
10+
11+
// Insert both keys if they don't already exist
12+
db.execute_unprepared(
13+
r#"
14+
INSERT INTO metadata (key, value)
15+
VALUES
16+
('l1_processed_block', '0')
17+
ON CONFLICT(key) DO NOTHING;
18+
"#,
19+
)
20+
.await?;
21+
22+
Ok(())
23+
}
24+
25+
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
26+
let db = manager.get_connection();
27+
28+
db.execute_unprepared(
29+
r#"
30+
DELETE FROM metadata
31+
WHERE key = 'l1_processed_block';
32+
"#,
33+
)
34+
.await?;
35+
36+
Ok(())
37+
}
38+
}

crates/watcher/src/lib.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ pub struct L1Watcher<EP> {
8989
/// The L1 notification type yielded by the [`L1Watcher`].
9090
#[derive(Debug, Clone, PartialEq, Eq)]
9191
pub enum L1Notification {
92+
/// A notification that the L1 watcher has processed up to a given block number.
93+
Processed(u64),
9294
/// A notification for a reorg of the L1 up to a given block number.
9395
Reorg(u64),
9496
/// A new batch has been committed on the L1 rollup contract.
@@ -124,6 +126,7 @@ pub enum L1Notification {
124126
impl Display for L1Notification {
125127
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
126128
match self {
129+
Self::Processed(n) => write!(f, "Processed({n})"),
127130
Self::Reorg(n) => write!(f, "Reorg({n:?})"),
128131
Self::BatchCommit(b) => {
129132
write!(f, "BatchCommit {{ hash: {}, index: {} }}", b.hash, b.index)
@@ -266,7 +269,7 @@ where
266269
self.notify_all(notifications).await?;
267270

268271
// update the latest block the l1 watcher has indexed.
269-
self.update_current_block(&latest);
272+
self.update_current_block(&latest).await?;
270273
}
271274

272275
Ok(())
@@ -606,11 +609,12 @@ where
606609
}
607610

608611
/// Updates the current block number, saturating at the head of the chain.
609-
fn update_current_block(&mut self, latest: &Block) {
612+
async fn update_current_block(&mut self, latest: &Block) -> L1WatcherResult<()> {
610613
self.current_block_number = self
611614
.current_block_number
612615
.saturating_add(LOGS_QUERY_BLOCK_RANGE)
613616
.min(latest.header.number);
617+
self.notify(L1Notification::Processed(self.current_block_number)).await
614618
}
615619

616620
/// Returns the latest L1 block.

crates/watcher/tests/reorg.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,16 +83,26 @@ async fn test_should_detect_reorg() -> eyre::Result<()> {
8383
for (latest, finalized) in latest_blocks[1..].iter().zip(finalized_blocks[1..].iter()) {
8484
// check finalized first.
8585
if finalized_number < finalized.header.number {
86-
let notification = l1_watcher.recv().await.unwrap();
86+
let mut notification = l1_watcher.recv().await.unwrap();
87+
// skip the `L1Notification::Processed` notifications
88+
if matches!(notification.as_ref(), L1Notification::Processed(_)) {
89+
notification = l1_watcher.recv().await.unwrap();
90+
}
8791
assert_eq!(notification.as_ref(), &L1Notification::Finalized(finalized.header.number));
8892
}
8993

9094
if latest_number == latest.header.number {
9195
continue
9296
}
9397

94-
// skip the `L1Notification::Synced` notifications
9598
let mut notification = l1_watcher.recv().await.unwrap();
99+
100+
// skip the `L1Notification::Processed` notifications
101+
if matches!(notification.as_ref(), L1Notification::Processed(_)) {
102+
notification = l1_watcher.recv().await.unwrap();
103+
}
104+
105+
// skip the `L1Notification::Synced` notifications
96106
if matches!(notification.as_ref(), L1Notification::Synced) {
97107
notification = l1_watcher.recv().await.unwrap();
98108
}
@@ -174,16 +184,26 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> {
174184
for (latest, finalized) in latest_blocks[1..].iter().zip(finalized_blocks[1..].iter()) {
175185
// check finalized first.
176186
if finalized_number < finalized.header.number {
177-
let notification = l1_watcher.recv().await.unwrap();
187+
let mut notification = l1_watcher.recv().await.unwrap();
188+
// skip the `L1Notification::Processed` notifications
189+
if matches!(notification.as_ref(), L1Notification::Processed(_)) {
190+
notification = l1_watcher.recv().await.unwrap();
191+
}
178192
assert_eq!(notification.as_ref(), &L1Notification::Finalized(finalized.header.number));
179193
}
180194

181195
if latest_number == latest.header.number {
182196
continue
183197
}
184198

185-
// skip the `L1Notification::Synced` notifications
186199
let mut notification = l1_watcher.recv().await.unwrap();
200+
201+
// skip the `L1Notification::Processed` notifications
202+
if matches!(notification.as_ref(), L1Notification::Processed(_)) {
203+
notification = l1_watcher.recv().await.unwrap();
204+
}
205+
206+
// skip the `L1Notification::Synced` notifications
187207
if matches!(notification.as_ref(), L1Notification::Synced) {
188208
notification = l1_watcher.recv().await.unwrap();
189209
}

0 commit comments

Comments
 (0)