Skip to content

Commit 387ed9f

Browse files
jonastheisgreged93
andauthored
Improve L1 message reorg testing (#253)
* Refactor `can_handle_reorgs_while_sequencing` test to also include a follower node and add a bunch of convenience methods * feat: attach L1 block number to ScrollPayloadAttributes * feat: correctly handle L1 reorgs in driver * adjust can_handle_l1_message_reorg test to assert correct reorg conditions * fix linter errors * make sequencer not issue blocks by default in tests * improve test * fixes after merge * improve test * fixes after merge * improve test helper functions * feat: answer comments --------- Co-authored-by: Gregory Edison <[email protected]>
1 parent 0c4e4fb commit 387ed9f

File tree

14 files changed

+518
-140
lines changed

14 files changed

+518
-140
lines changed

Cargo.lock

Lines changed: 46 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/database/db/src/operations.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
586586
///
587587
/// It can either be an index, which is the queue index of the first message to return, or a hash,
588588
/// which is the hash of the first message to return.
589-
#[derive(Debug, Clone)]
589+
#[derive(Debug, Clone, PartialEq, Eq)]
590590
pub enum L1MessageStart {
591591
/// Start from the provided queue index.
592592
Index(u64),

crates/derivation-pipeline/src/lib.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use core::{
2424
};
2525
use futures::{FutureExt, Stream};
2626
use rollup_node_primitives::{
27-
BatchCommitData, BatchInfo, ScrollPayloadAttributesWithBatchInfo, WithFinalizedBatchInfo,
28-
WithFinalizedBlockNumber,
27+
BatchCommitData, BatchInfo, ScrollPayloadAttributesWithBatchInfo, WithBlockNumber,
28+
WithFinalizedBatchInfo, WithFinalizedBlockNumber,
2929
};
3030
use rollup_node_providers::{BlockDataProvider, L1Provider};
3131
use scroll_alloy_rpc_types_engine::{BlockDataHint, ScrollPayloadAttributes};
@@ -186,7 +186,7 @@ impl<P> Stream for DerivationPipeline<P>
186186
where
187187
P: L1Provider + Clone + Unpin + Send + Sync + 'static,
188188
{
189-
type Item = ScrollPayloadAttributesWithBatchInfo;
189+
type Item = WithBlockNumber<ScrollPayloadAttributesWithBatchInfo>;
190190

191191
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192192
let this = self.get_mut();
@@ -198,7 +198,7 @@ where
198198

199199
// return attributes from the queue if any.
200200
if let Some(attribute) = this.attributes_queue.pop_front() {
201-
return Poll::Ready(Some(attribute.inner))
201+
return Poll::Ready(Some(attribute))
202202
}
203203

204204
// if future is None and the batch queue is empty, store the waker and return.
@@ -487,8 +487,10 @@ mod tests {
487487

488488
// check the correctness of the last attribute.
489489
let mut attribute = ScrollPayloadAttributes::default();
490-
while let Some(ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. }) =
491-
pipeline.next().await
490+
while let Some(WithBlockNumber {
491+
inner: ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. },
492+
..
493+
}) = pipeline.next().await
492494
{
493495
if a.payload_attributes.timestamp == 1696935657 {
494496
attribute = a;
@@ -545,8 +547,10 @@ mod tests {
545547

546548
// check the correctness of the last attribute.
547549
let mut attribute = ScrollPayloadAttributes::default();
548-
while let Some(ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. }) =
549-
pipeline.next().await
550+
while let Some(WithBlockNumber {
551+
inner: ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. },
552+
..
553+
}) = pipeline.next().await
550554
{
551555
if a.payload_attributes.timestamp == 1696935657 {
552556
attribute = a;

crates/engine/src/driver.rs

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::{
88
use alloy_provider::Provider;
99
use futures::{ready, task::AtomicWaker, FutureExt, Stream};
1010
use rollup_node_primitives::{
11-
BlockInfo, ChainImport, MeteredFuture, ScrollPayloadAttributesWithBatchInfo,
11+
BlockInfo, ChainImport, MeteredFuture, ScrollPayloadAttributesWithBatchInfo, WithBlockNumber,
1212
};
1313
use scroll_alloy_hardforks::ScrollHardforks;
1414
use scroll_alloy_network::Scroll;
@@ -38,7 +38,7 @@ pub struct EngineDriver<EC, CS, P> {
3838
/// Block building duration.
3939
block_building_duration: Duration,
4040
/// The pending payload attributes derived from batches on L1.
41-
l1_payload_attributes: VecDeque<ScrollPayloadAttributesWithBatchInfo>,
41+
l1_payload_attributes: VecDeque<WithBlockNumber<ScrollPayloadAttributesWithBatchInfo>>,
4242
/// The pending block imports received over the network.
4343
chain_imports: VecDeque<ChainImport>,
4444
/// The latest optimistic sync target.
@@ -121,6 +121,67 @@ where
121121
}
122122
}
123123

124+
/// Handle L1 reorg, with the L1 block number reorged to, and whether this reorged the head or
125+
/// batches.
126+
pub fn handle_l1_reorg(
127+
&mut self,
128+
l1_block_number: u64,
129+
reorged_unsafe_head: Option<BlockInfo>,
130+
reorged_safe_head: Option<BlockInfo>,
131+
) {
132+
// On an unsafe head reorg.
133+
if let Some(l2_head_block_info) = reorged_unsafe_head {
134+
// clear the payload building future.
135+
self.payload_building_future = None;
136+
137+
// retain only blocks from chain imports for which the block number <= L2 reorged
138+
// number.
139+
for chain_import in &mut self.chain_imports {
140+
chain_import.chain.retain(|block| block.number <= l2_head_block_info.number);
141+
}
142+
143+
// reset the unsafe head.
144+
self.set_head_block_info(l2_head_block_info);
145+
146+
// drop the engine future if it's a `NewPayload` or `BlockImport` with block number >
147+
// L2 reorged number.
148+
if let Some(MeteredFuture { fut, .. }) = self.engine_future.as_ref() {
149+
match fut {
150+
EngineFuture::ChainImport(WithBlockNumber { number, .. })
151+
if number > &l2_head_block_info.number =>
152+
{
153+
self.engine_future = None
154+
}
155+
// `NewPayload` future is ONLY instantiated when the payload building future is
156+
// done, and we want to issue the payload to the EN. Thus, we also clear it on a
157+
// L2 reorg.
158+
EngineFuture::NewPayload(_) => self.engine_future = None,
159+
_ => {}
160+
}
161+
}
162+
}
163+
164+
// On a safe head reorg: reset the safe head.
165+
if let Some(safe_block_info) = reorged_safe_head {
166+
self.set_safe_block_info(safe_block_info);
167+
}
168+
169+
// drop the engine future if it's a `L1Consolidation` future associated with a L1 block
170+
// number > l1_block_number.
171+
if matches!(
172+
self.engine_future.as_ref(),
173+
Some(MeteredFuture {
174+
fut: EngineFuture::L1Consolidation(WithBlockNumber { number, .. }),
175+
..
176+
}) if number > &l1_block_number
177+
) {
178+
self.engine_future = None;
179+
}
180+
181+
// retain the L1 payload attributes with block number <= L1 block.
182+
self.l1_payload_attributes.retain(|attribute| attribute.number <= l1_block_number);
183+
}
184+
124185
/// Handles a block import request by adding it to the queue and waking up the driver.
125186
pub fn handle_chain_import(&mut self, chain_import: ChainImport) {
126187
tracing::trace!(target: "scroll::engine", head = %chain_import.chain.last().unwrap().hash_slow(), "new chain import request received");
@@ -145,7 +206,10 @@ where
145206

146207
/// Handles a [`ScrollPayloadAttributes`] sourced from L1 by initiating a task sending the
147208
/// attribute to the EN via the [`EngineDriver`].
148-
pub fn handle_l1_consolidation(&mut self, attributes: ScrollPayloadAttributesWithBatchInfo) {
209+
pub fn handle_l1_consolidation(
210+
&mut self,
211+
attributes: WithBlockNumber<ScrollPayloadAttributesWithBatchInfo>,
212+
) {
149213
self.l1_payload_attributes.push_back(attributes);
150214
self.waker.wake();
151215
}
@@ -193,7 +257,7 @@ where
193257
self.metrics.block_import_duration.record(duration.as_secs_f64());
194258

195259
// Return the block import outcome
196-
return block_import_outcome.map(EngineDriverEvent::BlockImportOutcome)
260+
return block_import_outcome.map(EngineDriverEvent::BlockImportOutcome);
197261
}
198262
Err(err) => {
199263
tracing::error!(target: "scroll::engine", ?err, "failed to import block");
@@ -223,7 +287,7 @@ where
223287
// record the metric.
224288
self.metrics.l1_consolidation_duration.record(duration.as_secs_f64());
225289

226-
return Some(EngineDriverEvent::L1BlockConsolidated(consolidation_outcome))
290+
return Some(EngineDriverEvent::L1BlockConsolidated(consolidation_outcome));
227291
}
228292
Err(err) => {
229293
tracing::error!(target: "scroll::engine", ?err, "failed to consolidate block derived from L1");
@@ -250,7 +314,7 @@ where
250314
self.metrics.build_new_payload_duration.record(duration.as_secs_f64());
251315
self.metrics.gas_per_block.record(block.gas_used as f64);
252316

253-
return Some(EngineDriverEvent::NewPayload(block))
317+
return Some(EngineDriverEvent::NewPayload(block));
254318
}
255319
Err(err) => {
256320
tracing::error!(target: "scroll::engine", ?err, "failed to build new payload");

crates/engine/src/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use alloy_rpc_types_engine::PayloadError;
2-
use rollup_node_primitives::ScrollPayloadAttributesWithBatchInfo;
2+
use rollup_node_primitives::{ScrollPayloadAttributesWithBatchInfo, WithBlockNumber};
33
use scroll_alloy_provider::ScrollEngineApiError;
44
use scroll_alloy_rpc_types_engine::ScrollPayloadAttributes;
55

@@ -21,7 +21,7 @@ pub enum EngineDriverError {
2121
/// The payload id field is missing in the forkchoice update response for an L1 consolidation
2222
/// job.
2323
#[error("Forkchoice update response missing payload id for L1 consolidation job")]
24-
L1ConsolidationMissingPayloadId(ScrollPayloadAttributesWithBatchInfo),
24+
L1ConsolidationMissingPayloadId(WithBlockNumber<ScrollPayloadAttributesWithBatchInfo>),
2525
/// The payload id field is missing in the forkchoice update response for a payload building
2626
/// job.
2727
#[error("Forkchoice update response missing payload id for payload building job")]

crates/engine/src/future/mod.rs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use reth_scroll_engine_primitives::try_into_block;
1111
use reth_scroll_primitives::ScrollBlock;
1212
use rollup_node_primitives::{
1313
BatchInfo, BlockInfo, ChainImport, L2BlockInfoWithL1Messages, MeteredFuture,
14-
ScrollPayloadAttributesWithBatchInfo,
14+
ScrollPayloadAttributesWithBatchInfo, WithBlockNumber,
1515
};
1616
use scroll_alloy_hardforks::ScrollHardforks;
1717
use scroll_alloy_network::Scroll;
@@ -47,7 +47,7 @@ type L1ConsolidationFuture =
4747
Pin<Box<dyn Future<Output = Result<ConsolidationOutcome, EngineDriverError>> + Send>>;
4848

4949
/// An enum that represents the different outcomes of an L1 consolidation job.
50-
#[derive(Debug, Clone)]
50+
#[derive(Debug, Clone, PartialEq, Eq)]
5151
pub enum ConsolidationOutcome {
5252
/// Represents a successful consolidation outcome with the consolidated block info and batch
5353
/// info.
@@ -97,8 +97,8 @@ pub(crate) type OptimisticSyncFuture =
9797
/// An enum that represents the different types of futures that can be executed on the engine API.
9898
/// It can be a block import job, an L1 consolidation job, or a new payload processing.
9999
pub(crate) enum EngineFuture {
100-
ChainImport(ChainImportFuture),
101-
L1Consolidation(L1ConsolidationFuture),
100+
ChainImport(WithBlockNumber<ChainImportFuture>),
101+
L1Consolidation(WithBlockNumber<L1ConsolidationFuture>),
102102
NewPayload(NewPayloadFuture),
103103
OptimisticSync(OptimisticSyncFuture),
104104
}
@@ -112,7 +112,11 @@ impl EngineFuture {
112112
where
113113
EC: ScrollEngineApi + Unpin + Send + Sync + 'static,
114114
{
115-
Self::ChainImport(Box::pin(handle_chain_import(client, chain_import, fcs)))
115+
let highest_block_number = chain_import.chain.last().unwrap().number;
116+
Self::ChainImport(WithBlockNumber::new(
117+
highest_block_number,
118+
Box::pin(handle_chain_import(client, chain_import, fcs)),
119+
))
116120
}
117121

118122
pub(crate) fn optimistic_sync<EC>(client: Arc<EC>, fcs: AlloyForkchoiceState) -> Self
@@ -127,18 +131,21 @@ impl EngineFuture {
127131
client: Arc<EC>,
128132
execution_payload_provider: P,
129133
fcs: ForkchoiceState,
130-
payload_attributes: ScrollPayloadAttributesWithBatchInfo,
134+
payload_attributes: WithBlockNumber<ScrollPayloadAttributesWithBatchInfo>,
131135
) -> Self
132136
where
133137
EC: ScrollEngineApi + Unpin + Send + Sync + 'static,
134138
P: Provider<Scroll> + Unpin + Send + Sync + 'static,
135139
{
136-
Self::L1Consolidation(Box::pin(handle_payload_attributes(
137-
client,
138-
execution_payload_provider,
139-
fcs,
140-
payload_attributes,
141-
)))
140+
Self::L1Consolidation(WithBlockNumber::new(
141+
payload_attributes.number,
142+
Box::pin(handle_payload_attributes(
143+
client,
144+
execution_payload_provider,
145+
fcs,
146+
payload_attributes,
147+
)),
148+
))
142149
}
143150

144151
/// Creates a new [`EngineFuture::NewPayload`] future from the provided parameters.
@@ -162,8 +169,8 @@ impl Future for EngineFuture {
162169
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<EngineDriverFutureResult> {
163170
let this = self.get_mut();
164171
match this {
165-
Self::ChainImport(fut) => fut.as_mut().poll(cx).map(Into::into),
166-
Self::L1Consolidation(fut) => fut.as_mut().poll(cx).map(Into::into),
172+
Self::ChainImport(fut) => fut.inner.as_mut().poll(cx).map(Into::into),
173+
Self::L1Consolidation(fut) => fut.inner.as_mut().poll(cx).map(Into::into),
167174
Self::NewPayload(fut) => fut.as_mut().poll(cx).map(Into::into),
168175
Self::OptimisticSync(fut) => fut.as_mut().poll(cx).map(Into::into),
169176
}
@@ -258,7 +265,7 @@ async fn handle_payload_attributes<EC, P>(
258265
client: Arc<EC>,
259266
provider: P,
260267
fcs: ForkchoiceState,
261-
payload_attributes_with_batch_info: ScrollPayloadAttributesWithBatchInfo,
268+
payload_attributes_with_batch_info: WithBlockNumber<ScrollPayloadAttributesWithBatchInfo>,
262269
) -> Result<ConsolidationOutcome, EngineDriverError>
263270
where
264271
EC: ScrollEngineApi + Unpin + Send + Sync + 'static,
@@ -267,7 +274,7 @@ where
267274
tracing::trace!(target: "scroll::engine::future", ?fcs, ?payload_attributes_with_batch_info, "handling payload attributes");
268275

269276
let ScrollPayloadAttributesWithBatchInfo { mut payload_attributes, batch_info } =
270-
payload_attributes_with_batch_info.clone();
277+
payload_attributes_with_batch_info.inner.clone();
271278

272279
let maybe_execution_payload = provider
273280
.get_block((fcs.safe_block_info().number + 1).into())

0 commit comments

Comments
 (0)