Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 38 additions & 64 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,7 @@ use scroll_engine::Engine;
use scroll_network::{
BlockImportOutcome, NewBlockWithPeer, ScrollNetwork, ScrollNetworkManagerEvent,
};
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
time::Instant,
vec,
};
use strum::IntoEnumIterator;
use std::{collections::VecDeque, sync::Arc, time::Instant, vec};
use tokio::sync::mpsc::{self, Receiver, UnboundedReceiver};

mod config;
Expand All @@ -50,7 +44,7 @@ mod consensus;
pub use consensus::{Consensus, NoopConsensus, SystemContractConsensus};

mod consolidation;
use consolidation::reconcile_batch;
use consolidation::{reconcile_batch, BlockConsolidationAction};

mod event;
pub use event::ChainOrchestratorEvent;
Expand All @@ -62,15 +56,26 @@ mod handle;
pub use handle::{ChainOrchestratorCommand, ChainOrchestratorHandle, DatabaseQuery};

mod metrics;
pub use metrics::{ChainOrchestratorItem, ChainOrchestratorMetrics};
use metrics::{MetricsHandler, Task};

mod sync;
pub use sync::{SyncMode, SyncState};

mod status;
pub use status::ChainOrchestratorStatus;

use crate::consolidation::BlockConsolidationAction;
/// Wraps a future, metering the completion of it.
macro_rules! metered {
($task:expr, $self:ident, $method:ident($($args:expr),*)) => {
{
let metric = $self.metric_handler.get($task).expect("metric exists").clone();
let now = Instant::now();
let res =$self.$method($($args),*).await;
metric.task_duration.record(now.elapsed().as_secs_f64());
res
}
};
}

/// The mask used to mask the L1 message queue hash.
const L1_MESSAGE_QUEUE_HASH_MASK: B256 =
Expand Down Expand Up @@ -102,8 +107,6 @@ pub struct ChainOrchestrator<
l2_client: Arc<L2P>,
/// The reference to database.
database: Arc<Database>,
/// The metrics for the chain orchestrator.
metrics: HashMap<ChainOrchestratorItem, ChainOrchestratorMetrics>,
/// The current sync state of the [`ChainOrchestrator`].
sync_state: SyncState,
/// A receiver for [`L1Notification`]s from the [`rollup_node_watcher::L1Watcher`].
Expand All @@ -122,6 +125,8 @@ pub struct ChainOrchestrator<
derivation_pipeline: DerivationPipeline,
/// Optional event sender for broadcasting events to listeners.
event_sender: Option<EventSender<ChainOrchestratorEvent>>,
/// The metrics handler.
metric_handler: MetricsHandler,
}

impl<
Expand Down Expand Up @@ -155,12 +160,6 @@ impl<
l2_client: Arc::new(l2_provider),
database,
config,
metrics: ChainOrchestratorItem::iter()
.map(|i| {
let label = i.as_str();
(i, ChainOrchestratorMetrics::new_with_labels(&[("item", label)]))
})
.collect(),
sync_state: SyncState::default(),
l1_notification_rx,
network,
Expand All @@ -171,6 +170,7 @@ impl<
derivation_pipeline,
handle_rx,
event_sender: None,
metric_handler: MetricsHandler::default(),
},
handle,
))
Expand Down Expand Up @@ -211,7 +211,7 @@ impl<
self.handle_outcome(res);
}
Some(batch) = self.derivation_pipeline.next() => {
let res = self.handle_derived_batch(batch).await;
let res = metered!(Task::BatchReconciliation, self, handle_derived_batch(batch));
self.handle_outcome(res);
}
Some(event) = self.network.events().next() => {
Expand Down Expand Up @@ -266,7 +266,7 @@ impl<
/// Handles an event from the sequencer.
async fn handle_sequencer_event(
&mut self,
event: rollup_node_sequencer::SequencerEvent,
event: SequencerEvent,
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
tracing::info!(target: "scroll::chain_orchestrator", ?event, "Handling sequencer event");
match event {
Expand All @@ -277,6 +277,7 @@ impl<
.map(|s| &s.address)
.expect("signer must be set if sequencer is present"),
) {
self.metric_handler.start_block_building_recording();
self.sequencer
.as_mut()
.expect("sequencer must be present")
Expand All @@ -300,6 +301,7 @@ impl<
.as_mut()
.expect("signer must be present")
.sign_block(block.clone())?;
self.metric_handler.finish_block_building_recording();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't allow empty blocks then the finalize method below will return None. We should handle that case.

self
                    .sequencer
                    .as_mut()
                    .expect("sequencer must be present")
                    .finalize_payload_building(payload_id, &mut self.engine)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, maybe it was your intention to skip it such that we don't report metrics for empty blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the idea was only to record the actual block building duration for blocks that get added to the chain. I think we could however also have a metric for block building regardless of whether they get added to the chain, and also for the duration between 2 consecutive block added to the chain.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these could both be valuable to have

return Ok(Some(ChainOrchestratorEvent::BlockSequenced(block)));
}
}
Expand Down Expand Up @@ -508,30 +510,38 @@ impl<
self.database.set_processed_l1_block_number(block_number).await?;
Ok(None)
}
L1Notification::Reorg(block_number) => self.handle_l1_reorg(*block_number).await,
L1Notification::Reorg(block_number) => {
metered!(Task::L1Reorg, self, handle_l1_reorg(*block_number))
}
L1Notification::Consensus(update) => {
self.consensus.update_config(update);
Ok(None)
}
L1Notification::NewBlock(block_number) => self.handle_l1_new_block(*block_number).await,
L1Notification::Finalized(block_number) => {
self.handle_l1_finalized(*block_number).await
metered!(Task::L1Finalization, self, handle_l1_finalized(*block_number))
}
L1Notification::BatchCommit(batch) => {
metered!(Task::BatchCommit, self, handle_batch_commit(batch.clone()))
}
L1Notification::BatchCommit(batch) => self.handle_batch_commit(batch.clone()).await,
L1Notification::L1Message { message, block_number, block_timestamp: _ } => {
self.handle_l1_message(message.clone(), *block_number).await
metered!(Task::L1Message, self, handle_l1_message(message.clone(), *block_number))
}
L1Notification::Synced => {
tracing::info!(target: "scroll::chain_orchestrator", "L1 is now synced");
self.sync_state.l1_mut().set_synced();
if self.sync_state.is_synced() {
self.consolidate_chain().await?;
metered!(Task::ChainConsolidation, self, consolidate_chain())?;
}
self.notify(ChainOrchestratorEvent::L1Synced);
Ok(None)
}
L1Notification::BatchFinalization { hash: _hash, index, block_number } => {
self.handle_l1_batch_finalization(*index, *block_number).await
metered!(
Task::BatchFinalization,
self,
handle_batch_finalization(*index, *block_number)
)
}
}
}
Expand All @@ -550,8 +560,6 @@ impl<
&mut self,
block_number: u64,
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
let metric = self.metrics.get(&ChainOrchestratorItem::L1Reorg).expect("metric exists");
let now = Instant::now();
let genesis_hash = self.config.chain_spec().genesis_hash();
let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } =
self.database.unwind(genesis_hash, block_number).await?;
Expand Down Expand Up @@ -594,8 +602,6 @@ impl<
self.engine.update_fcs(l2_head_block_info, l2_safe_block_info, None).await?;
}

metric.task_duration.record(now.elapsed().as_secs_f64());

let event = ChainOrchestratorEvent::L1Reorg {
l1_block_number,
queue_index,
Expand All @@ -612,10 +618,6 @@ impl<
&mut self,
block_number: u64,
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
let metric =
self.metrics.get(&ChainOrchestratorItem::L1Finalization).expect("metric exists");
let now = Instant::now();

let finalized_batches = self
.database
.tx_mut(move |tx| async move {
Expand All @@ -632,8 +634,6 @@ impl<
self.derivation_pipeline.push_batch(Arc::new(*batch)).await;
}

metric.task_duration.record(now.elapsed().as_secs_f64());

Ok(Some(ChainOrchestratorEvent::L1BlockFinalized(block_number, finalized_batches)))
}

Expand All @@ -642,9 +642,6 @@ impl<
&self,
batch: BatchCommitData,
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
let metric = self.metrics.get(&ChainOrchestratorItem::BatchCommit).expect("metric exists");
let now = Instant::now();

let event = self
.database
.tx_mut(move |tx| {
Expand Down Expand Up @@ -682,13 +679,11 @@ impl<
})
.await?;

metric.task_duration.record(now.elapsed().as_secs_f64());

Ok(event)
}

/// Handles a batch finalization event by updating the batch input in the database.
async fn handle_l1_batch_finalization(
async fn handle_batch_finalization(
&mut self,
batch_index: u64,
block_number: u64,
Expand Down Expand Up @@ -732,9 +727,6 @@ impl<
l1_message: TxL1Message,
l1_block_number: u64,
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
let metric = self.metrics.get(&ChainOrchestratorItem::L1Message).expect("metric exists");
let now = Instant::now();

let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index);
let queue_hash = compute_l1_message_queue_hash(
&self.database,
Expand Down Expand Up @@ -770,35 +762,17 @@ impl<
})
.await?;

metric.task_duration.record(now.elapsed().as_secs_f64());

Ok(Some(event))
}

// /// Wraps a pending chain orchestrator future, metering the completion of it.
// pub fn handle_metered(
// &mut self,
// item: ChainOrchestratorItem,
// chain_orchestrator_fut: PendingChainOrchestratorFuture,
// ) -> PendingChainOrchestratorFuture {
// let metric = self.metrics.get(&item).expect("metric exists").clone();
// let fut_wrapper = Box::pin(async move {
// let now = Instant::now();
// let res = chain_orchestrator_fut.await;
// metric.task_duration.record(now.elapsed().as_secs_f64());
// res
// });
// fut_wrapper
// }

async fn handle_network_event(
&mut self,
event: ScrollNetworkManagerEvent,
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
match event {
ScrollNetworkManagerEvent::NewBlock(block_with_peer) => {
self.notify(ChainOrchestratorEvent::NewBlockReceived(block_with_peer.clone()));
Ok(self.handle_block_from_peer(block_with_peer).await?)
metered!(Task::L2BlockImport, self, handle_block_from_peer(block_with_peer))
}
}
}
Expand Down
Loading
Loading