Skip to content

Commit 987000d

Browse files
authored
feat: database optimisation (#440)
* cleanup and optimise database * add composite index * update docker compose grafana * add derivation pipeline cache * add async pre-fetch logic * cleanup code
1 parent 868d085 commit 987000d

32 files changed

+976
-745
lines changed

Cargo.lock

Lines changed: 25 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ futures = { version = "0.3", default-features = false }
223223
lru = "0.13.0"
224224
metrics = "0.24.0"
225225
metrics-derive = "0.1"
226+
moka = "0.12.11"
226227
parking_lot = "0.12"
227228
rand = { version = "0.9" }
228229
rayon = "1.7"

crates/database/db/src/db.rs

Lines changed: 14 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,14 @@ impl DatabaseReadOperations for Database {
643643
)
644644
}
645645

646+
async fn get_max_block_data_hint_block_number(&self) -> Result<u64, DatabaseError> {
647+
metered!(
648+
DatabaseOperation::GetMaxBlockDataHintBlockNumber,
649+
self,
650+
tx(|tx| async move { tx.get_max_block_data_hint_block_number().await })
651+
)
652+
}
653+
646654
async fn get_l2_block_and_batch_info_by_hash(
647655
&self,
648656
block_hash: B256,
@@ -862,7 +870,7 @@ mod test {
862870
BatchCommitData, BatchInfo, BlockInfo, L1MessageEnvelope, L2BlockInfoWithL1Messages,
863871
};
864872
use scroll_alloy_consensus::TxL1Message;
865-
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
873+
use sea_orm::EntityTrait;
866874

867875
#[tokio::test]
868876
async fn test_database_round_trip_batch_commit() {
@@ -1227,11 +1235,13 @@ mod test {
12271235

12281236
#[tokio::test]
12291237
async fn test_delete_l1_messages_gt() {
1238+
reth_tracing::init_test_tracing();
1239+
12301240
// Set up the test database.
12311241
let db = setup_test_db().await;
12321242

12331243
// Generate unstructured bytes.
1234-
let mut bytes = [0u8; 1024];
1244+
let mut bytes = [0u8; 10240];
12351245
rand::rng().fill(bytes.as_mut_slice());
12361246
let mut u = Unstructured::new(&bytes);
12371247

@@ -1276,6 +1286,8 @@ mod test {
12761286

12771287
#[tokio::test]
12781288
async fn test_get_l2_block_info_by_number() {
1289+
reth_tracing::init_test_tracing();
1290+
12791291
// Set up the test database.
12801292
let db = setup_test_db().await;
12811293

@@ -1475,67 +1487,6 @@ mod test {
14751487
}
14761488
}
14771489

1478-
#[tokio::test]
1479-
async fn test_insert_block_upsert_behavior() {
1480-
reth_tracing::init_test_tracing();
1481-
1482-
// Set up the test database.
1483-
let db = setup_test_db().await;
1484-
1485-
// Generate unstructured bytes.
1486-
let mut bytes = [0u8; 1024];
1487-
rand::rng().fill(bytes.as_mut_slice());
1488-
let mut u = Unstructured::new(&bytes);
1489-
1490-
// Generate batches
1491-
let batch_data_1 = BatchCommitData { index: 100, ..Arbitrary::arbitrary(&mut u).unwrap() };
1492-
let batch_info_1: BatchInfo = batch_data_1.clone().into();
1493-
let batch_data_2 = BatchCommitData { index: 200, ..Arbitrary::arbitrary(&mut u).unwrap() };
1494-
let batch_info_2: BatchInfo = batch_data_2.clone().into();
1495-
1496-
db.insert_batch(batch_data_1).await.unwrap();
1497-
db.insert_batch(batch_data_2).await.unwrap();
1498-
1499-
// Insert initial block
1500-
let block_info = BlockInfo { number: 600, hash: B256::arbitrary(&mut u).unwrap() };
1501-
db.insert_blocks(vec![block_info], batch_info_1).await.unwrap();
1502-
1503-
// Verify initial insertion
1504-
let retrieved_block = db.get_l2_block_info_by_number(600).await.unwrap();
1505-
assert_eq!(retrieved_block, Some(block_info));
1506-
1507-
// Verify initial batch association using model conversion
1508-
let initial_l2_block_model = models::l2_block::Entity::find()
1509-
.filter(models::l2_block::Column::BlockNumber.eq(600))
1510-
.one(db.inner().get_connection())
1511-
.await
1512-
.unwrap()
1513-
.unwrap();
1514-
let (initial_block_info, initial_batch_info): (BlockInfo, BatchInfo) =
1515-
initial_l2_block_model.into();
1516-
assert_eq!(initial_block_info, block_info);
1517-
assert_eq!(initial_batch_info, batch_info_1);
1518-
1519-
// Update the same block with different batch info (upsert)
1520-
db.insert_blocks(vec![block_info], batch_info_2).await.unwrap();
1521-
1522-
// Verify the block still exists and was updated
1523-
let retrieved_block = db.get_l2_block_info_by_number(600).await.unwrap().unwrap();
1524-
assert_eq!(retrieved_block, block_info);
1525-
1526-
// Verify batch association was updated using model conversion
1527-
let updated_l2_block_model = models::l2_block::Entity::find()
1528-
.filter(models::l2_block::Column::BlockNumber.eq(600))
1529-
.one(db.inner().get_connection())
1530-
.await
1531-
.unwrap()
1532-
.unwrap();
1533-
let (updated_block_info, updated_batch_info): (BlockInfo, BatchInfo) =
1534-
updated_l2_block_model.into();
1535-
assert_eq!(updated_block_info, block_info);
1536-
assert_eq!(updated_batch_info, batch_info_2);
1537-
}
1538-
15391490
#[tokio::test]
15401491
async fn test_prepare_on_startup() {
15411492
let db = setup_test_db().await;

crates/database/db/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ pub use db::Database;
99
mod error;
1010
pub use error::DatabaseError;
1111

12+
mod maintenance;
13+
pub use maintenance::DatabaseMaintenance;
14+
1215
mod metrics;
1316

1417
mod models;
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use sea_orm::ConnectionTrait;
2+
3+
use crate::DatabaseConnectionProvider;
4+
5+
use super::Database;
6+
use std::sync::Arc;
7+
8+
/// The interval in seconds between optimization runs in seconds.
9+
const PERIODIC_MAINTENANCE_INTERVAL_SECS: u64 = 600;
10+
11+
/// Provides maintenance operations for the database.
12+
#[derive(Debug)]
13+
pub struct DatabaseMaintenance {
14+
db: Arc<Database>,
15+
}
16+
17+
impl DatabaseMaintenance {
18+
/// Creates a new `DatabaseMaintenance` instance.
19+
pub const fn new(db: Arc<Database>) -> Self {
20+
Self { db }
21+
}
22+
23+
/// Runs the maintenance tasks in a loop.
24+
pub async fn run(self) {
25+
self.startup_maintenance().await;
26+
27+
loop {
28+
tokio::time::sleep(std::time::Duration::from_secs(PERIODIC_MAINTENANCE_INTERVAL_SECS))
29+
.await;
30+
self.periodic_maintenance().await;
31+
}
32+
}
33+
34+
/// Runs maintenance tasks at startup.
35+
///
36+
/// This includes running `ANALYZE` and `PRAGMA optimize`.
37+
async fn startup_maintenance(&self) {
38+
let db = self.db.inner();
39+
let conn = db.get_connection();
40+
41+
tracing::info!(target: "scroll::db::maintenance", "running startup ANALYZE...");
42+
if let Err(err) = conn.execute_unprepared("ANALYZE;").await {
43+
tracing::warn!(target: "scroll::db::maintenance", "ANALYZE failed: {:?}", err);
44+
}
45+
46+
tracing::info!(target: "scroll::db::maintenance", "running PRAGMA optimize at startup...");
47+
if let Err(err) = conn.execute_unprepared("PRAGMA optimize;").await {
48+
tracing::warn!(target: "scroll::db::maintenance", "PRAGMA optimize failed: {:?}", err);
49+
}
50+
51+
tracing::info!(target: "scroll::db::maintenance", "startup maintenance complete.");
52+
}
53+
54+
/// Runs periodic maintenance tasks.
55+
///
56+
/// This includes running `PRAGMA optimize`.
57+
async fn periodic_maintenance(&self) {
58+
let db = self.db.inner();
59+
let conn = db.get_connection();
60+
61+
tracing::info!(target: "scroll::db::maintenance", "running periodic PRAGMA optimize...");
62+
if let Err(err) = conn.execute_unprepared("PRAGMA optimize;").await {
63+
tracing::warn!(target: "scroll::db::maintenance", "PRAGMA optimize failed: {:?}", err);
64+
} else {
65+
tracing::info!(target: "scroll::db::maintenance", "periodic PRAGMA optimize complete.");
66+
}
67+
}
68+
}

crates/database/db/src/metrics.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ pub(crate) enum DatabaseOperation {
6464
GetL2HeadBlockNumber,
6565
GetNL1Messages,
6666
GetNL2BlockDataHint,
67+
GetMaxBlockDataHintBlockNumber,
6768
GetL2BlockAndBatchInfoByHash,
6869
GetL2BlockInfoByNumber,
6970
GetLatestSafeL2Info,
@@ -132,6 +133,7 @@ impl DatabaseOperation {
132133
Self::GetL2HeadBlockNumber => "get_l2_head_block_number",
133134
Self::GetNL1Messages => "get_n_l1_messages",
134135
Self::GetNL2BlockDataHint => "get_n_l2_block_data_hint",
136+
Self::GetMaxBlockDataHintBlockNumber => "get_max_block_data_hint_block_number",
135137
Self::GetL2BlockAndBatchInfoByHash => "get_l2_block_and_batch_info_by_hash",
136138
Self::GetL2BlockInfoByNumber => "get_l2_block_info_by_number",
137139
Self::GetLatestSafeL2Info => "get_latest_safe_l2_info",

crates/database/db/src/operations.rs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -708,13 +708,15 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
708708
blocks.into_iter().map(|b| (b, batch_info).into()).collect();
709709
models::l2_block::Entity::insert_many(l2_blocks)
710710
.on_conflict(
711-
OnConflict::column(models::l2_block::Column::BlockHash)
712-
.update_columns([
713-
models::l2_block::Column::BlockNumber,
714-
models::l2_block::Column::BatchHash,
715-
models::l2_block::Column::BatchIndex,
716-
])
717-
.to_owned(),
711+
OnConflict::columns([
712+
models::l2_block::Column::BlockHash,
713+
models::l2_block::Column::BatchHash,
714+
])
715+
.update_columns([
716+
models::l2_block::Column::BlockNumber,
717+
models::l2_block::Column::BatchIndex,
718+
])
719+
.to_owned(),
718720
)
719721
.on_empty_do_nothing()
720722
.exec_without_returning(self.get_connection())
@@ -994,6 +996,9 @@ pub trait DatabaseReadOperations {
994996
n: usize,
995997
) -> Result<Vec<L1MessageEnvelope>, DatabaseError>;
996998

999+
/// Get the maximum block number for which we have stored extra data hints.
1000+
async fn get_max_block_data_hint_block_number(&self) -> Result<u64, DatabaseError>;
1001+
9971002
/// Get the extra data for n block, starting at the provided block number.
9981003
async fn get_n_l2_block_data_hint(
9991004
&self,
@@ -1364,6 +1369,7 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
13641369
) -> Result<Vec<BlockDataHint>, DatabaseError> {
13651370
Ok(models::block_data::Entity::find()
13661371
.filter(models::block_data::Column::Number.gte(block_number as i64))
1372+
.order_by_asc(models::block_data::Column::Number)
13671373
.limit(Some(n as u64))
13681374
.all(self.get_connection())
13691375
.await?
@@ -1372,6 +1378,18 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
13721378
.collect())
13731379
}
13741380

1381+
async fn get_max_block_data_hint_block_number(&self) -> Result<u64, DatabaseError> {
1382+
Ok(models::block_data::Entity::find()
1383+
.select_only()
1384+
.column_as(models::block_data::Column::Number.max(), "max_number")
1385+
.into_tuple::<Option<i64>>()
1386+
.one(self.get_connection())
1387+
.await?
1388+
.flatten()
1389+
.map(|n| n as u64)
1390+
.unwrap_or(0))
1391+
}
1392+
13751393
async fn get_l2_block_and_batch_info_by_hash(
13761394
&self,
13771395
block_hash: B256,

crates/database/db/src/service/retry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl<S: DatabaseService> DatabaseService for Retry<S> {
9090
};
9191

9292
attempt += 1;
93-
tracing::debug!(target: "scroll::chain_orchestrator", ?error, attempt, delay_ms, "Retrying database query");
93+
tracing::debug!(target: "scroll::db", ?error, attempt, delay_ms, "Retrying database query");
9494

9595
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
9696
}

crates/database/migration/src/lib.rs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,7 @@ mod m20250408_132123_add_header_metadata;
66
mod m20250408_150338_load_header_metadata;
77
mod m20250411_072004_add_l2_block;
88
mod m20250616_223947_add_metadata;
9-
mod m20250825_093350_remove_unsafe_l2_blocks;
10-
mod m20250829_042803_add_table_indexes;
119
mod m20250904_175949_block_signature;
12-
mod m20250923_135359_add_index_block_hash;
13-
mod m20250929_161536_add_additional_indexes;
14-
mod m20251001_125444_add_index_status;
15-
mod m20251005_160938_add_initial_l1_block_numbers;
16-
mod m20251013_140946_add_initial_l1_processed_block_number;
17-
mod m20251021_070729_add_skipped_column;
18-
mod m20251021_144852_add_queue_index_index;
19-
mod m20251027_090416_add_table_statistics;
2010
mod m20251028_110719_add_l1_block_table;
2111

2212
mod migration_info;
@@ -36,17 +26,7 @@ impl<MI: MigrationInfo + Send + Sync + 'static> MigratorTrait for Migrator<MI> {
3626
Box::new(m20250408_150338_load_header_metadata::Migration::<MI>(Default::default())),
3727
Box::new(m20250411_072004_add_l2_block::Migration::<MI>(Default::default())),
3828
Box::new(m20250616_223947_add_metadata::Migration),
39-
Box::new(m20250825_093350_remove_unsafe_l2_blocks::Migration),
40-
Box::new(m20250829_042803_add_table_indexes::Migration),
4129
Box::new(m20250904_175949_block_signature::Migration),
42-
Box::new(m20250923_135359_add_index_block_hash::Migration),
43-
Box::new(m20250929_161536_add_additional_indexes::Migration),
44-
Box::new(m20251001_125444_add_index_status::Migration),
45-
Box::new(m20251005_160938_add_initial_l1_block_numbers::Migration),
46-
Box::new(m20251013_140946_add_initial_l1_processed_block_number::Migration),
47-
Box::new(m20251021_070729_add_skipped_column::Migration),
48-
Box::new(m20251021_144852_add_queue_index_index::Migration),
49-
Box::new(m20251027_090416_add_table_statistics::Migration),
5030
Box::new(m20251028_110719_add_l1_block_table::Migration),
5131
]
5232
}

0 commit comments

Comments
 (0)