Skip to content

Commit e50bfcb

Browse files
kariyclaude
andcommitted
refactor(sync): enforce block range invariant in StageExecutionInput (#308)
Strengthens the `Stage` trait contract by enforcing that `StageExecutionInput.to >= StageExecutionInput.from`. `Stage` implementors can now rely on receiving valid block ranges. --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 56e07d8 commit e50bfcb

File tree

6 files changed

+72
-43
lines changed

6 files changed

+72
-43
lines changed

crates/sync/pipeline/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
317317
info!(target: "pipeline", %id, from = %checkpoint, %to, "Executing stage.");
318318

319319
// plus 1 because the checkpoint is inclusive
320-
let input = StageExecutionInput { from: checkpoint + 1, to };
320+
let input = StageExecutionInput::new(checkpoint + 1, to);
321321
stage.execute(&input).await.map_err(|error| Error::StageExecution { id, error })?;
322322
self.provider.set_checkpoint(id, to)?;
323323

crates/sync/pipeline/tests/pipeline.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ impl Stage for TrackingStage {
5757
}
5858

5959
fn execute<'a>(&'a mut self, input: &'a StageExecutionInput) -> BoxFuture<'a, StageResult> {
60-
let from = input.from;
61-
let to = input.to;
62-
let executions = self.executions.clone();
6360
Box::pin(async move {
64-
executions.lock().unwrap().push(ExecutionRecord { from, to });
61+
self.executions
62+
.lock()
63+
.unwrap()
64+
.push(ExecutionRecord { from: input.from(), to: input.to() });
6565
Ok(())
6666
})
6767
}

crates/sync/stage/src/blocks/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,9 @@ where
4747

4848
fn execute<'a>(&'a mut self, input: &'a StageExecutionInput) -> BoxFuture<'a, StageResult> {
4949
Box::pin(async move {
50-
// TODO: Implement range validation in the `Pipeline` level - or maybe in each stage as
51-
// well?
52-
debug_assert!(input.from <= input.to);
53-
5450
let blocks = self
5551
.downloader
56-
.download_blocks(input.from, input.to)
52+
.download_blocks(input.from(), input.to())
5753
.await
5854
.map_err(Error::Gateway)?;
5955

crates/sync/stage/src/classes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ where
6666

6767
fn execute<'a>(&'a mut self, input: &'a StageExecutionInput) -> BoxFuture<'a, StageResult> {
6868
Box::pin(async move {
69-
let declared_classes = self.get_declared_classes(input.from, input.to)?;
69+
let declared_classes = self.get_declared_classes(input.from(), input.to())?;
7070

7171
if !declared_classes.is_empty() {
7272
// fetch the classes artifacts

crates/sync/stage/src/lib.rs

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,41 @@ pub use sequencing::Sequencing;
1616
/// The result type of a stage execution. See [Stage::execute].
1717
pub type StageResult = Result<(), Error>;
1818

19-
#[derive(Debug, Default, Clone)]
19+
/// Input parameters for stage execution.
20+
///
21+
/// # Invariant
22+
///
23+
/// The `to` field must always be greater than or equal to the `from` field (`to >= from`).
24+
/// This invariant is enforced at construction time via the [`new`](Self::new) method and
25+
/// maintained by keeping the fields private.
26+
#[derive(Debug, Clone, Default)]
2027
pub struct StageExecutionInput {
21-
pub from: BlockNumber,
22-
pub to: BlockNumber,
28+
from: BlockNumber,
29+
to: BlockNumber,
30+
}
31+
32+
impl StageExecutionInput {
33+
/// Creates a new [`StageExecutionInput`] with the given range.
34+
///
35+
/// # Panics
36+
///
37+
/// Panics if `to < from`, as this violates the type's invariant.
38+
pub fn new(from: BlockNumber, to: BlockNumber) -> Self {
39+
assert!(to >= from, "Invalid block range: `to` ({to}) must be >= `from` ({from})");
40+
Self { from, to }
41+
}
42+
43+
/// Returns the starting block number (inclusive).
44+
#[inline]
45+
pub fn from(&self) -> BlockNumber {
46+
self.from
47+
}
48+
49+
/// Returns the ending block number (inclusive).
50+
#[inline]
51+
pub fn to(&self) -> BlockNumber {
52+
self.to
53+
}
2354
}
2455

2556
#[derive(Debug, Default)]
@@ -50,6 +81,10 @@ pub enum Error {
5081
/// in the synchronization process (e.g., downloading blocks, downloading classes, executing
5182
/// transactions).
5283
///
84+
/// Stages are responsible for processing a range of blocks. Each stage implementation can assume
85+
/// that the block range provided in [`StageExecutionInput`] is valid (i.e., `input.to >=
86+
/// input.from`).
87+
///
5388
/// # Implementation Note
5489
///
5590
/// The [`execute`](Stage::execute) method returns a [`BoxFuture`] instead of `impl Future` to
@@ -62,7 +97,7 @@ pub trait Stage: Send + Sync {
6297
/// Returns the id which uniquely identifies the stage.
6398
fn id(&self) -> &'static str;
6499

65-
/// Executes the stage.
100+
/// Executes the stage for the given block range.
66101
///
67102
/// # Arguments
68103
///
@@ -71,5 +106,26 @@ pub trait Stage: Send + Sync {
71106
/// # Returns
72107
///
73108
/// A [`BoxFuture`] that resolves to a [`StageResult`] upon completion.
109+
///
110+
/// # Block Range
111+
///
112+
/// Implementors can rely on the following guarantees:
113+
/// - The `input.to` field will always be greater than or equal to `input.from`
114+
/// - The block range `[input.from, input.to]` represents an inclusive range
115+
///
116+
/// Implementors are expected to perform any necessary processings on all blocks in the range
117+
/// `[input.from, input.to]`.
74118
fn execute<'a>(&'a mut self, input: &'a StageExecutionInput) -> BoxFuture<'a, StageResult>;
75119
}
120+
121+
#[cfg(test)]
122+
mod tests {
123+
use crate::StageExecutionInput;
124+
125+
#[tokio::test]
126+
#[should_panic(expected = "Invalid block range")]
127+
async fn invalid_range_panics() {
128+
// When from > to, the range is invalid and should panic at construction time
129+
let _ = StageExecutionInput::new(100, 99);
130+
}
131+
}

crates/sync/stage/tests/block.rs

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ async fn download_and_store_blocks(
223223
}
224224

225225
let mut stage = Blocks::new(provider.clone(), downloader.clone());
226-
let input = StageExecutionInput { from: from_block, to: to_block };
226+
let input = StageExecutionInput::new(from_block, to_block);
227227

228228
let result = stage.execute(&input).await;
229229
assert!(result.is_ok());
@@ -244,7 +244,7 @@ async fn download_failure_returns_error() {
244244
let provider = MockBlockWriter::new();
245245

246246
let mut stage = Blocks::new(provider.clone(), downloader.clone());
247-
let input = StageExecutionInput { from: block_number, to: block_number };
247+
let input = StageExecutionInput::new(block_number, block_number);
248248

249249
let result = stage.execute(&input).await;
250250

@@ -274,7 +274,7 @@ async fn storage_failure_returns_error() {
274274
let provider = MockBlockWriter::new().with_insert_error(error_msg.clone());
275275

276276
let mut stage = Blocks::new(provider.clone(), downloader.clone());
277-
let input = StageExecutionInput { from: block_number, to: block_number };
277+
let input = StageExecutionInput::new(block_number, block_number);
278278

279279
let result = stage.execute(&input).await;
280280

@@ -294,29 +294,6 @@ async fn storage_failure_returns_error() {
294294
assert_eq!(provider.stored_block_count(), 0);
295295
}
296296

297-
// This test is only testing the debug sanity check in Blocks::execute(). Becase the
298-
// `BlockDownloader` implementation could theoretically return whatever based on the block input
299-
// because the input of `BlockDownloader::download_blocks` doesn't prohibit invalid block range.
300-
// Maybe that's a good reason to change its method signature to `fn download_blocks(&self, range:
301-
// Range<BlockNumber>)` ??
302-
#[tokio::test]
303-
#[ignore = "Stage input validation should be done on the `Pipeline` level"]
304-
async fn empty_range_downloads_nothing() {
305-
// When from > to, the range is empty
306-
let downloader = MockBlockDownloader::new();
307-
let provider = MockBlockWriter::new();
308-
309-
let mut stage = Blocks::new(provider.clone(), downloader.clone());
310-
let input = StageExecutionInput { from: 100, to: 99 };
311-
312-
let result = stage.execute(&input).await;
313-
assert!(result.is_ok());
314-
315-
// No downloads should occur for empty range
316-
assert_eq!(downloader.requested_blocks().len(), 0);
317-
assert_eq!(provider.stored_block_count(), 0);
318-
}
319-
320297
#[tokio::test]
321298
async fn partial_download_failure_stops_execution() {
322299
let from_block = 100;
@@ -332,7 +309,7 @@ async fn partial_download_failure_stops_execution() {
332309
let provider = MockBlockWriter::new();
333310
let mut stage = Blocks::new(provider.clone(), downloader.clone());
334311

335-
let input = StageExecutionInput { from: from_block, to: to_block };
312+
let input = StageExecutionInput::new(from_block, to_block);
336313
let result = stage.execute(&input).await;
337314

338315
// Should fail on block 103
@@ -355,7 +332,7 @@ async fn fetch_blocks_from_gateway() {
355332

356333
let mut stage = Blocks::new(&provider, downloader);
357334

358-
let input = StageExecutionInput { from: from_block, to: to_block };
335+
let input = StageExecutionInput::new(from_block, to_block);
359336
stage.execute(&input).await.expect("failed to execute stage");
360337

361338
// check provider storage

0 commit comments

Comments
 (0)