Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ parking_lot = "0.12"
rand = { version = "0.9" }
reqwest = "0.12"
serde = { version = "1.0" }
serde_json = { version = "1.0" }
sea-orm = { version = "1.1.0" }
thiserror = "2.0"
tokio = { version = "1.39", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion crates/chain-orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,5 @@ futures.workspace = true
parking_lot.workspace = true
rand.workspace = true
reqwest.workspace = true
serde_json = { version = "1.0" }
serde_json.workspace = true
tokio.workspace = true
8 changes: 4 additions & 4 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,10 @@ impl<
let block_info: L2BlockInfoWithL1Messages = (&block_with_peer.block).into();
Self::do_handle_block_from_peer(ctx, block_with_peer).await?;
Retry::default()
.retry("update_l1_messages_with_l2_block", || async {
.retry("handle_sequenced_block", || async {
let tx = database.tx_mut().await?;
tx.update_l1_messages_with_l2_block(block_info.clone()).await?;
tx.set_l2_head_block_info(block_info.block_info).await?;
tx.commit().await?;
Ok::<_, ChainOrchestratorError>(())
})
Expand Down Expand Up @@ -483,9 +484,7 @@ impl<
Retry::default()
.retry("insert_block", || async {
let tx = database.tx_mut().await?;
for block in block_infos.clone() {
tx.insert_block(block, batch_info).await?;
}
tx.insert_blocks(block_infos.clone(), batch_info).await?;
tx.commit().await?;
Ok::<_, ChainOrchestratorError>(())
})
Expand Down Expand Up @@ -537,6 +536,7 @@ impl<
.retry("update_l1_messages_from_l2_blocks", || async {
let tx = database.tx_mut().await?;
tx.update_l1_messages_from_l2_blocks(block_info.clone()).await?;
tx.set_l2_head_block_info(head.block_info).await?;
tx.commit().await?;
Ok::<_, ChainOrchestratorError>(())
})
Expand Down
1 change: 1 addition & 0 deletions crates/database/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ futures.workspace = true
metrics.workspace = true
metrics-derive.workspace = true
sea-orm = { workspace = true, features = ["sqlx-sqlite", "runtime-tokio-native-tls", "macros"] }
serde_json.workspace = true
tempfile = { version = "3.20.0", optional = true }
thiserror.workspace = true
tokio = { workspace = true, features = ["macros", "sync"] }
Expand Down
29 changes: 29 additions & 0 deletions crates/database/db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ impl Database {
db.tmp_dir = Some(dir);
Ok(db)
}

/// Returns a reference to the database tmp dir.
#[cfg(feature = "test-utils")]
pub const fn tmp_dir(&self) -> Option<&tempfile::TempDir> {
self.tmp_dir.as_ref()
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -961,4 +967,27 @@ mod test {
assert!(retried_block_3.is_none());
assert!(retried_block_4.is_none());
}

#[tokio::test]
async fn test_l2_block_head_roundtrip() {
// Set up the test database.
let db = setup_test_db().await;
let tx = db.tx_mut().await.unwrap();

// Generate unstructured bytes.
let mut bytes = [0u8; 40];
rand::rng().fill(bytes.as_mut_slice());
let mut u = Unstructured::new(&bytes);

// Generate and insert a block info as the head.
let block_info = BlockInfo::arbitrary(&mut u).unwrap();
tx.set_l2_head_block_info(block_info).await.unwrap();
tx.commit().await.unwrap();

// Retrieve and verify the head block info.
let tx = db.tx().await.unwrap();
let head_block_info = tx.get_l2_head_block_info().await.unwrap().unwrap();

assert_eq!(head_block_info, block_info);
}
}
3 changes: 3 additions & 0 deletions crates/database/db/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub enum DatabaseError {
/// A generic error occurred.
#[error("parse signature error: {0}")]
ParseSignatureError(String),
/// Failed to serde the metadata value.
#[error("failed to serde metadata value: {0}")]
MetadataSerdeError(#[from] serde_json::Error),
/// The L1 message was not found in database.
#[error("L1 message at index [{0}] not found in database")]
L1MessageNotFound(L1MessageStart),
Expand Down
12 changes: 1 addition & 11 deletions crates/database/db/src/models/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,6 @@ impl ActiveModelBehavior for ActiveModel {}

impl From<Metadata> for ActiveModel {
fn from(metadata: Metadata) -> Self {
Self {
key: ActiveValue::Set("l1_finalized_block".to_owned()),
value: ActiveValue::Set(metadata.l1_finalized_block.to_string()),
}
}
}

impl From<Model> for Metadata {
fn from(value: Model) -> Self {
debug_assert!(value.key == "l1_finalized_block");
Self { l1_finalized_block: value.value.parse().expect("invalid value") }
Self { key: ActiveValue::Set(metadata.key), value: ActiveValue::Set(metadata.value) }
}
}
38 changes: 36 additions & 2 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,27 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati
) -> Result<(), DatabaseError> {
tracing::trace!(target: "scroll::db", block_number, "Updating the latest finalized L1 block number in the database.");
let metadata: models::metadata::ActiveModel =
Metadata { l1_finalized_block: block_number }.into();
Metadata { key: "l1_finalized_block".to_string(), value: block_number.to_string() }
.into();
Ok(models::metadata::Entity::insert(metadata)
.on_conflict(
OnConflict::column(models::metadata::Column::Key)
.update_column(models::metadata::Column::Value)
.to_owned(),
)
.exec(self.get_connection())
.await
.map(|_| ())?)
}

/// Set the L2 head block info.
async fn set_l2_head_block_info(&self, block_info: BlockInfo) -> Result<(), DatabaseError> {
tracing::trace!(target: "scroll::db", ?block_info, "Updating the L2 head block info in the database.");
let metadata: models::metadata::ActiveModel = Metadata {
key: "l2_head_block".to_string(),
value: serde_json::to_string(&block_info)?,
}
.into();
Ok(models::metadata::Entity::insert(metadata)
.on_conflict(
OnConflict::column(models::metadata::Column::Key)
Expand Down Expand Up @@ -444,6 +464,18 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync {
.map(|x| x.and_then(|x| x.parse::<u64>().ok()))?)
}

/// Get the latest L2 head block info.
async fn get_l2_head_block_info(&self) -> Result<Option<BlockInfo>, DatabaseError> {
Ok(models::metadata::Entity::find()
.filter(models::metadata::Column::Key.eq("l2_head_block"))
.select_only()
.column(models::metadata::Column::Value)
.into_tuple::<String>()
.one(self.get_connection())
.await
.map(|x| x.and_then(|x| serde_json::from_str(&x).ok()))?)
}

/// Get an iterator over all [`BatchCommitData`]s in the database.
async fn get_batches<'a>(
&'a self,
Expand Down Expand Up @@ -571,7 +603,9 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync {
})?)
}

/// Get the latest safe L2 ([`BlockInfo`], [`BatchInfo`]) from the database.
/// Get the latest safe/finalized L2 ([`BlockInfo`], [`BatchInfo`]) from the database. Until we
/// update the batch handling logic with issue #273, we don't differentiate between safe and
/// finalized l2 blocks.
async fn get_latest_safe_l2_info(
&self,
) -> Result<Option<(BlockInfo, BatchInfo)>, DatabaseError> {
Expand Down
2 changes: 2 additions & 0 deletions crates/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ rollup-node-providers.workspace = true
rollup-node-signer.workspace = true

# scroll
scroll-db.workspace = true
scroll-network.workspace = true

# misc
Expand Down Expand Up @@ -70,6 +71,7 @@ test-utils = [
"rollup-node-providers/test-utils",
"reth-chainspec/test-utils",
"reth-primitives-traits/test-utils",
"scroll-db/test-utils",
]
serde = [
"alloy-eips/serde",
Expand Down
6 changes: 3 additions & 3 deletions crates/engine/src/fcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl ForkchoiceState {

/// Creates a new [`ForkchoiceState`] instance setting the `head`, `safe` and `finalized` block
/// info to the provided `genesis` hash.
pub const fn head_from_genesis(genesis: B256) -> Self {
pub const fn from_genesis(genesis: B256) -> Self {
Self::new(
BlockInfo { hash: genesis, number: 0 },
BlockInfo { hash: genesis, number: 0 },
Expand All @@ -43,7 +43,7 @@ impl ForkchoiceState {

/// Creates a [`ForkchoiceState`] instance setting the `head`, `safe` and `finalized` hash to
/// the appropriate genesis values by reading from the provider.
pub async fn head_from_provider<P: Provider<Scroll>>(provider: P) -> Option<Self> {
pub async fn from_provider<P: Provider<Scroll>>(provider: &P) -> Option<Self> {
let latest_block =
provider.get_block(BlockId::Number(BlockNumberOrTag::Latest)).await.ok()??;
let safe_block =
Expand All @@ -65,7 +65,7 @@ impl ForkchoiceState {
pub fn head_from_chain_spec<CS: EthChainSpec<Header: BlockHeader>>(
chain_spec: CS,
) -> Option<Self> {
Some(Self::head_from_genesis(genesis_hash_from_chain_spec(chain_spec)?))
Some(Self::from_genesis(genesis_hash_from_chain_spec(chain_spec)?))
}

/// Updates the `head` block info.
Expand Down
13 changes: 2 additions & 11 deletions crates/manager/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub struct RollupNodeManager<
database: Arc<Database>,
/// The original block time configuration for restoring automatic sequencing.
block_time_config: Option<u64>,
// metrics for the rollup node manager.
/// Metrics for the rollup node manager.
metrics: RollupNodeManagerMetrics,
}

Expand Down Expand Up @@ -381,26 +381,17 @@ where
event_sender.notify(RollupManagerEvent::Reorg(l1_block_number));
}
}
ChainOrchestratorEvent::ChainExtended(chain_import) => {
self.metrics
.handle_chain_import_block_number
.set(chain_import.chain.last().unwrap().number as f64);
trace!(target: "scroll::node::manager", head = ?chain_import.chain.last().unwrap().header.clone(), peer_id = ?chain_import.peer_id.clone(), "Received chain extension from peer");
// Issue the new chain to the engine driver for processing.
self.engine.handle_chain_import(chain_import)
}
ChainOrchestratorEvent::ChainExtended(chain_import) |
ChainOrchestratorEvent::ChainReorged(chain_import) => {
self.metrics
.handle_chain_import_block_number
.set(chain_import.chain.last().unwrap().number as f64);
trace!(target: "scroll::node::manager", head = ?chain_import.chain.last().unwrap().header, ?chain_import.peer_id, "Received chain reorg from peer");

// Issue the new chain to the engine driver for processing.
self.engine.handle_chain_import(chain_import)
}
ChainOrchestratorEvent::OptimisticSync(block) => {
let block_info: BlockInfo = (&block).into();
trace!(target: "scroll::node::manager", ?block_info, "Received optimistic sync from peer");

self.metrics.handle_optimistic_syncing_block_number.set(block_info.number as f64);

Expand Down
3 changes: 2 additions & 1 deletion crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,13 @@ reth-provider.workspace = true
reth-primitives-traits.workspace = true
reth-rpc-server-types.workspace = true
reth-scroll-node = { workspace = true, features = ["test-utils"] }
reth-storage-api.workspace = true
reth-tasks.workspace = true
reth-tokio-util.workspace = true
reth-tracing.workspace = true
rollup-node = { workspace = true, features = ["test-utils"] }
scroll-alloy-rpc-types-engine.workspace = true
serde_json = { version = "1.0.94", default-features = false, features = ["alloc"] }
serde_json = { workspace = true, features = ["alloc"] }
color-eyre = "0.6"
alloy-rpc-types-eth = { workspace = true }

Expand Down
44 changes: 25 additions & 19 deletions crates/node/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,31 @@ impl ScrollRollupNodeConfig {
ForkchoiceState::head_from_chain_spec(chain_spec.clone())
.expect("failed to derive forkchoice state from chain spec")
};
let mut fcs = ForkchoiceState::head_from_provider(l2_provider.clone())
.await
.unwrap_or_else(chain_spec_fcs);
let mut fcs =
ForkchoiceState::from_provider(&l2_provider).await.unwrap_or_else(chain_spec_fcs);

// On startup we replay the latest batch of blocks from the database as such we set the safe
// block hash to the latest block hash associated with the previous consolidated
// batch in the database.
let tx = db.tx_mut().await?;
let (startup_safe_block, l1_start_block_number) =
tx.prepare_on_startup(chain_spec.genesis_hash()).await?;
tx.commit().await?;
if let Some(block_info) = startup_safe_block {
fcs.update_safe_block_info(block_info);
} else {
fcs.update_safe_block_info(BlockInfo {
hash: genesis_hash_from_chain_spec(chain_spec.clone()).unwrap(),
number: 0,
});
}

// Update the head block info if available and ahead of finalized.
if let Some(latest_block) = db.tx().await?.get_l2_head_block_info().await? {
if latest_block > *fcs.finalized_block_info() {
fcs.update_head_block_info(latest_block);
}
}

let chain_spec = Arc::new(chain_spec.clone());

Expand All @@ -277,22 +299,6 @@ impl ScrollRollupNodeConfig {
authorized_signer,
);

// On startup we replay the latest batch of blocks from the database as such we set the safe
// block hash to the latest block hash associated with the previous consolidated
// batch in the database.
let tx = db.tx_mut().await?;
let (startup_safe_block, l1_start_block_number) =
tx.prepare_on_startup(chain_spec.genesis_hash()).await?;
tx.commit().await?;
if let Some(block_info) = startup_safe_block {
fcs.update_safe_block_info(block_info);
} else {
fcs.update_safe_block_info(BlockInfo {
hash: genesis_hash_from_chain_spec(chain_spec.clone()).unwrap(),
number: 0,
});
}

tracing::info!(target: "scroll::node::args", fcs = ?fcs, payload_building_duration = ?self.sequencer_args.payload_building_duration, "Starting engine driver");
let engine = EngineDriver::new(
Arc::new(engine_api),
Expand Down
Loading
Loading