Skip to content

Commit c500e8a

Browse files
fix(taiko-client-rs): use trusted resume head for event sync (checkpoint head or head_l1_origin) (#21331)
1 parent d6d67ec commit c500e8a

File tree

13 files changed

+611
-144
lines changed

13 files changed

+611
-144
lines changed

packages/taiko-client-rs/crates/driver/src/error.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,17 @@ pub enum DriverError {
7979
#[error(transparent)]
8080
Other(#[from] AnyhowError),
8181
}
82+
83+
/// Map a [`DriverError`] into a target error type while preserving sync errors.
84+
///
85+
/// This ensures `DriverError::Sync` is converted through `From<SyncError>` instead of being
86+
/// wrapped as a generic driver error.
87+
pub fn map_driver_error<T>(err: DriverError) -> T
88+
where
89+
T: From<SyncError> + From<DriverError>,
90+
{
91+
match err {
92+
DriverError::Sync(sync_err) => sync_err.into(),
93+
other => other.into(),
94+
}
95+
}

packages/taiko-client-rs/crates/driver/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub mod sync;
1515

1616
pub use config::DriverConfig;
1717
pub use driver::Driver;
18-
pub use error::DriverError;
18+
pub use error::{DriverError, map_driver_error};
1919
pub use production::PreconfPayload;
2020
pub use sync::{CanonicalTipState, SyncPipeline, SyncStage, event::EventSyncer};
2121

packages/taiko-client-rs/crates/driver/src/sync/beacon.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Beacon sync logic.
22
3-
use std::{borrow::Cow, marker::PhantomData, time::Duration};
3+
use std::{borrow::Cow, marker::PhantomData, sync::Arc, time::Duration};
44

55
use alethia_reth_primitives::engine::types::TaikoExecutionDataSidecar;
66
use alloy::providers::Provider;
@@ -21,7 +21,7 @@ use rpc::{
2121
use tokio::time::{MissedTickBehavior, interval};
2222
use tracing::{debug, info, instrument, warn};
2323

24-
use super::{SyncError, SyncStage};
24+
use super::{SyncError, SyncStage, checkpoint_resume_head::CheckpointResumeHead};
2525
use crate::{config::DriverConfig, error::DriverError, metrics::DriverMetrics};
2626

2727
/// Default polling interval used when no retry interval is configured.
@@ -32,9 +32,15 @@ pub struct BeaconSyncer<P>
3232
where
3333
P: Provider + Clone,
3434
{
35+
/// Interval between beacon sync retries.
3536
retry_interval: Duration,
37+
/// RPC client used for local node engine and chain calls.
3638
rpc: Client<P>,
39+
/// Optional checkpoint provider used for remote catch-up blocks.
3740
checkpoint: Option<RootProvider>,
41+
/// Shared checkpoint head used to resume event sync after beacon sync.
42+
checkpoint_resume_head: Arc<CheckpointResumeHead>,
43+
/// Marker that ties this type to the generic provider parameter.
3844
_marker: PhantomData<P>,
3945
}
4046

@@ -44,11 +50,21 @@ where
4450
{
4551
/// Construct a new beacon syncer from the provided configuration and RPC client.
4652
#[instrument(skip(config, rpc))]
47-
pub fn new(config: &DriverConfig, rpc: Client<P>) -> Self {
53+
pub fn new(
54+
config: &DriverConfig,
55+
rpc: Client<P>,
56+
checkpoint_resume_head: Arc<CheckpointResumeHead>,
57+
) -> Self {
4858
let checkpoint =
4959
config.l2_checkpoint_url.as_ref().map(|url| connect_http_with_timeout(url.clone()));
5060

51-
Self { retry_interval: config.retry_interval, rpc, checkpoint, _marker: PhantomData }
61+
Self {
62+
retry_interval: config.retry_interval,
63+
rpc,
64+
checkpoint,
65+
checkpoint_resume_head,
66+
_marker: PhantomData,
67+
}
5268
}
5369

5470
/// Query the checkpoint node for its head L1 origin block number.
@@ -139,6 +155,10 @@ where
139155
/// missing blocks to the local execution engine.
140156
#[instrument(skip(self), name = "beacon_syncer_run")]
141157
async fn run(&self) -> Result<(), SyncError> {
158+
// Always clear stale state from previous attempts so event sync cannot accidentally
159+
// consume an old checkpoint head after a failed or skipped beacon sync run.
160+
self.checkpoint_resume_head.clear();
161+
142162
if self.checkpoint.is_none() {
143163
info!("no checkpoint endpoint configured; skipping beacon sync stage");
144164
return Ok(());
@@ -219,6 +239,10 @@ where
219239
})?;
220240
counter!(DriverMetrics::BEACON_SYNC_REMOTE_SUBMISSIONS_TOTAL).increment(1);
221241
} else {
242+
// Persist the checkpoint head we have confirmed local execution is synced to.
243+
// Event sync uses this exact value as its authoritative resume source when
244+
// checkpoint mode is enabled.
245+
self.checkpoint_resume_head.set(checkpoint_head);
222246
info!(checkpoint_head, local_head, "local engine at or ahead of checkpoint; done");
223247
break Ok(());
224248
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
//! Shared state for carrying beacon-sync checkpoint progress into event sync.
2+
3+
use std::sync::atomic::{AtomicU64, Ordering};
4+
5+
/// Sentinel value used to represent an unset checkpoint head.
6+
const CHECKPOINT_HEAD_UNSET: u64 = u64::MAX;
7+
8+
/// Stores the L2 block number that beacon sync has confirmed as its catch-up head.
9+
///
10+
/// `u64::MAX` is reserved as "not set", allowing a single-atomic representation while preserving
11+
/// `Option<u64>` semantics.
12+
#[derive(Debug)]
13+
pub struct CheckpointResumeHead {
14+
/// Stored checkpoint head block number.
15+
value: AtomicU64,
16+
}
17+
18+
impl Default for CheckpointResumeHead {
19+
/// Initializes with no checkpoint head set.
20+
fn default() -> Self {
21+
Self { value: AtomicU64::new(CHECKPOINT_HEAD_UNSET) }
22+
}
23+
}
24+
25+
impl CheckpointResumeHead {
26+
/// Clears the stored checkpoint head.
27+
pub fn clear(&self) {
28+
self.value.store(CHECKPOINT_HEAD_UNSET, Ordering::Release);
29+
}
30+
31+
/// Stores a checkpoint head block number.
32+
pub fn set(&self, block_number: u64) {
33+
debug_assert_ne!(
34+
block_number, CHECKPOINT_HEAD_UNSET,
35+
"u64::MAX is reserved as an unset checkpoint-head sentinel"
36+
);
37+
self.value.store(block_number, Ordering::Release);
38+
}
39+
40+
/// Returns the stored checkpoint head when present.
41+
pub fn get(&self) -> Option<u64> {
42+
let value = self.value.load(Ordering::Acquire);
43+
if value == CHECKPOINT_HEAD_UNSET { None } else { Some(value) }
44+
}
45+
}
46+
47+
#[cfg(test)]
48+
mod tests {
49+
use super::*;
50+
use std::mem::size_of;
51+
52+
#[test]
53+
fn checkpoint_resume_head_uses_single_atomic_storage() {
54+
assert_eq!(size_of::<CheckpointResumeHead>(), size_of::<AtomicU64>());
55+
}
56+
57+
#[test]
58+
fn checkpoint_resume_head_round_trip() {
59+
let head = CheckpointResumeHead::default();
60+
assert_eq!(head.get(), None);
61+
62+
head.set(42);
63+
assert_eq!(head.get(), Some(42));
64+
65+
head.clear();
66+
assert_eq!(head.get(), None);
67+
}
68+
}

packages/taiko-client-rs/crates/driver/src/sync/error.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,22 @@ pub enum SyncError {
3838
#[error("execution engine returned no latest block")]
3939
MissingLatestExecutionBlock,
4040

41+
/// Event sync: checkpoint mode enabled, but beacon sync did not publish a resume head.
42+
#[error("checkpoint mode enabled but no checkpoint resume head is available")]
43+
MissingCheckpointResumeHead,
44+
45+
/// Event sync: no-checkpoint mode requires local head L1 origin to choose a safe resume head.
46+
#[error("head_l1_origin is missing; cannot derive event resume head without checkpoint")]
47+
MissingHeadL1OriginResume,
48+
4149
/// Event sync: execution engine missing a specific block.
4250
#[error("execution engine returned no block {number}")]
4351
MissingExecutionBlock { number: u64 },
4452

53+
/// Event sync: finalized L1 block is unavailable; resume must fail closed.
54+
#[error("finalized l1 block is unavailable")]
55+
MissingFinalizedL1Block,
56+
4557
/// Event sync: execution engine missing batch-to-block mapping.
4658
#[error("no execution block found for batch {proposal_id}")]
4759
MissingExecutionBlockForBatch { proposal_id: u64 },

0 commit comments

Comments
 (0)