Skip to content

Commit 06f7ec0

Browse files
MatusKyseldavidtaikochaclaude
authored
fix(taiko-client-rs): align /status and fee range checks with go (#21335)
Co-authored-by: David <david@taiko.xyz> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b4627ea commit 06f7ec0

File tree

16 files changed

+1277
-1062
lines changed

16 files changed

+1277
-1062
lines changed

packages/taiko-client-rs/crates/rpc/src/beacon.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,11 @@ impl BeaconClient {
332332
Ok((timestamp - self.genesis_time) / self.seconds_per_slot)
333333
}
334334

335+
/// Convert a timestamp to a beacon epoch.
336+
pub fn timestamp_to_epoch(&self, timestamp: u64) -> Result<u64, BlobDataError> {
337+
Ok(self.timestamp_to_slot(timestamp)? / self.slots_per_epoch)
338+
}
339+
335340
/// Return the current beacon slot based on local wall-clock time.
336341
///
337342
/// Mirrors the Go driver implementation:

packages/taiko-client-rs/crates/whitelist-preconfirmation-driver/src/cache.rs

Lines changed: 151 additions & 105 deletions
Large diffs are not rendered by default.

packages/taiko-client-rs/crates/whitelist-preconfirmation-driver/src/error.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,6 @@ pub enum WhitelistPreconfirmationDriverError {
1919
/// Whitelist preconfirmation node task failed.
2020
#[error("whitelist preconfirmation node task failed: {0}")]
2121
NodeTaskFailed(String),
22-
/// Driver preconfirmation ingress is not ready.
23-
#[error("driver preconfirmation ingress not ready")]
24-
PreconfIngressNotReady,
25-
/// Unsupported chain for whitelist preconfirmation mode.
26-
#[error(
27-
"whitelist preconfirmation mode currently supports only chain_id={expected}, got {actual}"
28-
)]
29-
UnsupportedChain {
30-
/// Expected chain id.
31-
expected: u64,
32-
/// Actual chain id.
33-
actual: u64,
34-
},
35-
/// Missing execution payload in envelope.
36-
#[error("missing execution payload")]
37-
MissingExecutionPayload,
3822
/// Invalid or unsupported payload format.
3923
#[error("invalid payload format: {0}")]
4024
InvalidPayload(String),

packages/taiko-client-rs/crates/whitelist-preconfirmation-driver/src/importer/cache_import.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ where
1616
P: alloy_provider::Provider + Clone + Send + Sync + 'static,
1717
{
1818
/// Attempt to import cached envelopes if sync is ready.
19-
pub(super) async fn maybe_import_from_cache(&mut self) -> Result<()> {
19+
pub(crate) async fn maybe_import_from_cache(&mut self) -> Result<()> {
2020
let _ = self.refresh_sync_ready().await?;
2121
if !self.sync_ready || self.cache.is_empty() {
2222
return Ok(());

packages/taiko-client-rs/crates/whitelist-preconfirmation-driver/src/importer/ingress.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::sync::Arc;
22

33
use alloy_primitives::B256;
44
use alloy_provider::Provider;
5+
use tracing::debug;
56

67
use crate::{
78
codec::{
@@ -61,9 +62,31 @@ where
6162
}
6263
let envelope = Arc::new(envelope);
6364
self.cache.insert(envelope.clone());
64-
self.recent_cache.insert_recent(envelope);
65+
self.recent_cache.insert_recent(envelope.clone());
6566
self.update_cache_gauges();
6667

68+
if envelope.end_of_sequencing.unwrap_or(false) {
69+
match self.beacon_client.timestamp_to_epoch(envelope.execution_payload.timestamp) {
70+
Ok(epoch) => {
71+
debug!(
72+
epoch,
73+
hash = %envelope.execution_payload.block_hash,
74+
"recording end-of-sequencing envelope for epoch on payload ingress"
75+
);
76+
self.cache_state
77+
.record_end_of_sequencing(epoch, envelope.execution_payload.block_hash)
78+
.await;
79+
}
80+
Err(err) => {
81+
tracing::warn!(
82+
timestamp = envelope.execution_payload.timestamp,
83+
error = %err,
84+
"failed to derive epoch from payload timestamp for EOS recording"
85+
);
86+
}
87+
}
88+
}
89+
6790
Ok(())
6891
}
6992

@@ -120,8 +143,31 @@ where
120143

121144
let envelope = Arc::new(envelope);
122145
self.cache.insert(envelope.clone());
123-
self.recent_cache.insert_recent(envelope);
146+
self.recent_cache.insert_recent(envelope.clone());
124147
self.update_cache_gauges();
148+
149+
if envelope.end_of_sequencing.unwrap_or(false) {
150+
match self.beacon_client.timestamp_to_epoch(envelope.execution_payload.timestamp) {
151+
Ok(epoch) => {
152+
debug!(
153+
epoch,
154+
hash = %envelope.execution_payload.block_hash,
155+
"recording end-of-sequencing envelope for epoch on response ingress"
156+
);
157+
self.cache_state
158+
.record_end_of_sequencing(epoch, envelope.execution_payload.block_hash)
159+
.await;
160+
}
161+
Err(err) => {
162+
tracing::warn!(
163+
timestamp = envelope.execution_payload.timestamp,
164+
error = %err,
165+
"failed to derive epoch from response timestamp for EOS recording"
166+
);
167+
}
168+
}
169+
}
170+
125171
Ok(())
126172
}
127173

packages/taiko-client-rs/crates/whitelist-preconfirmation-driver/src/importer/mod.rs

Lines changed: 109 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,19 @@
22
33
use std::sync::Arc;
44

5-
use alloy_eips::BlockNumberOrTag;
65
use alloy_primitives::{Address, B256};
76
use alloy_provider::Provider;
8-
use bindings::preconf_whitelist::PreconfWhitelist::PreconfWhitelistInstance;
97
use driver::sync::event::EventSyncer;
10-
use rpc::client::Client;
8+
use rpc::{beacon::BeaconClient, client::Client};
119
use tokio::sync::{Mutex, mpsc};
1210
use tracing::{debug, info, warn};
1311

1412
use crate::{
15-
cache::{
16-
EnvelopeCache, L1_EPOCH_DURATION_SECS, RecentEnvelopeCache, RequestThrottle,
17-
WhitelistSequencerCache,
18-
},
13+
cache::{EnvelopeCache, RecentEnvelopeCache, RequestThrottle, SharedPreconfCacheState},
1914
error::{Result, WhitelistPreconfirmationDriverError},
2015
metrics::WhitelistPreconfirmationDriverMetrics,
2116
network::{NetworkCommand, NetworkEvent},
17+
whitelist_fetcher::WhitelistSequencerFetcher,
2218
};
2319

2420
/// Cache re-import flow for out-of-order envelopes once parents arrive.
@@ -45,6 +41,29 @@ pub(crate) const MAX_COMPRESSED_TX_LIST_BYTES: usize = 131_072 * 6;
4541
///
4642
/// Align with the preconfirmation tx-list cap to avoid zlib bomb expansion on untrusted payloads.
4743
pub(crate) const MAX_DECOMPRESSED_TX_LIST_BYTES: usize = 8 * 1024 * 1024;
44+
/// Dependency bundle for constructing [`WhitelistPreconfirmationImporter`].
45+
pub(crate) struct WhitelistPreconfirmationImporterParams<P>
46+
where
47+
P: Provider + Clone + Send + Sync + 'static,
48+
{
49+
/// Event syncer used to submit validated preconfirmation payloads.
50+
pub(crate) event_syncer: Arc<EventSyncer<P>>,
51+
/// RPC client used for L1/L2 reads and head-origin updates.
52+
pub(crate) rpc: Client<P>,
53+
/// Whitelist contract address used for signer validation.
54+
pub(crate) whitelist_address: Address,
55+
/// Chain id used for preconfirmation signature domain separation.
56+
pub(crate) chain_id: u64,
57+
/// Command channel used to publish P2P requests/responses.
58+
pub(crate) network_command_tx: mpsc::Sender<NetworkCommand>,
59+
/// Shared cache state used by status and EOS signaling.
60+
pub(crate) cache_state: SharedPreconfCacheState,
61+
/// Beacon client used for EOS epoch validation.
62+
pub(crate) beacon_client: Arc<BeaconClient>,
63+
/// Shared highest unsafe L2 payload block ID (updated on P2P import when REST server enabled).
64+
pub(crate) highest_unsafe_l2_payload_block_id: Option<Arc<Mutex<u64>>>,
65+
}
66+
4867
/// Imports whitelist preconfirmation payloads into the driver after event sync catches up.
4968
pub(crate) struct WhitelistPreconfirmationImporter<P>
5069
where
@@ -54,18 +73,20 @@ where
5473
event_syncer: Arc<EventSyncer<P>>,
5574
/// RPC client used for L1/L2 reads and head-origin updates.
5675
rpc: Client<P>,
57-
/// On-chain whitelist contract instance used to validate sequencer signers.
58-
whitelist: PreconfWhitelistInstance<P>,
5976
/// Chain id used for preconfirmation signature domain separation.
6077
chain_id: u64,
78+
/// Shared cache state used by status and EOS signaling.
79+
cache_state: SharedPreconfCacheState,
80+
/// Beacon client used for EOS epoch validation.
81+
beacon_client: Arc<BeaconClient>,
6182
/// Out-of-order payload cache waiting for parent availability.
6283
cache: EnvelopeCache,
6384
/// Recently accepted envelopes that can be served over response topic requests.
6485
recent_cache: RecentEnvelopeCache,
6586
/// Cooldown gate for repeated missing-parent requests.
6687
request_throttle: RequestThrottle,
67-
/// TTL cache for current/next whitelist sequencer addresses.
68-
sequencer_cache: WhitelistSequencerCache,
88+
/// Shared sequencer fetcher for whitelist validation and epoch cache management.
89+
sequencer_fetcher: WhitelistSequencerFetcher<P>,
6990
/// Command channel used to publish P2P requests/responses.
7091
network_command_tx: mpsc::Sender<NetworkCommand>,
7192
/// Shared highest unsafe L2 payload block ID (updated on P2P import when REST server enabled).
@@ -81,26 +102,31 @@ where
81102
P: Provider + Clone + Send + Sync + 'static,
82103
{
83104
/// Build an importer.
84-
pub(crate) fn new(
85-
event_syncer: Arc<EventSyncer<P>>,
86-
rpc: Client<P>,
87-
whitelist_address: Address,
88-
chain_id: u64,
89-
network_command_tx: mpsc::Sender<NetworkCommand>,
90-
highest_unsafe_l2_payload_block_id: Option<Arc<Mutex<u64>>>,
91-
) -> Self {
92-
let whitelist = PreconfWhitelistInstance::new(whitelist_address, rpc.l1_provider.clone());
105+
pub(crate) fn new(params: WhitelistPreconfirmationImporterParams<P>) -> Self {
106+
let WhitelistPreconfirmationImporterParams {
107+
event_syncer,
108+
rpc,
109+
whitelist_address,
110+
chain_id,
111+
network_command_tx,
112+
cache_state,
113+
beacon_client,
114+
highest_unsafe_l2_payload_block_id,
115+
} = params;
116+
let sequencer_fetcher =
117+
WhitelistSequencerFetcher::new(whitelist_address, rpc.l1_provider.clone());
93118
let anchor_address = *rpc.shasta.anchor.address();
94119

95120
let importer = Self {
96121
event_syncer,
97122
rpc,
98-
whitelist,
99123
chain_id,
124+
cache_state,
125+
beacon_client,
100126
cache: EnvelopeCache::default(),
101127
recent_cache: RecentEnvelopeCache::default(),
102128
request_throttle: RequestThrottle::default(),
103-
sequencer_cache: WhitelistSequencerCache::default(),
129+
sequencer_fetcher,
104130
network_command_tx,
105131
highest_unsafe_l2_payload_block_id,
106132
sync_ready: false,
@@ -175,22 +201,60 @@ where
175201
}
176202
}
177203
NetworkEvent::EndOfSequencingRequest { from, epoch } => {
178-
if let Some(envelope) = self.recent_cache.latest_end_of_sequencing() {
179-
debug!(
180-
peer = %from,
181-
epoch,
182-
hash = %envelope.execution_payload.block_hash,
183-
"serving end-of-sequencing whitelist preconfirmation response from recent cache"
184-
);
185-
self.publish_unsafe_response(envelope).await;
186-
metrics::counter!(
187-
WhitelistPreconfirmationDriverMetrics::IMPORTER_EVENTS_TOTAL,
188-
"event_type" => "end_of_sequencing_request",
189-
"result" => "served",
190-
)
191-
.increment(1);
204+
if let Some(hash) = self.cache_state.end_of_sequencing_for_epoch(epoch).await {
205+
if let Some(envelope) = self.recent_cache.get_recent(&hash) {
206+
debug!(
207+
peer = %from,
208+
epoch,
209+
hash = %envelope.execution_payload.block_hash,
210+
"serving end-of-sequencing whitelist preconfirmation response from recent cache"
211+
);
212+
self.publish_unsafe_response(envelope).await;
213+
metrics::counter!(
214+
WhitelistPreconfirmationDriverMetrics::IMPORTER_EVENTS_TOTAL,
215+
"event_type" => "end_of_sequencing_request",
216+
"result" => "served",
217+
)
218+
.increment(1);
219+
} else {
220+
debug!(
221+
peer = %from,
222+
epoch,
223+
hash = %hash,
224+
"end-of-sequencing hash known for epoch but envelope not in recent cache; rebuilding from L2"
225+
);
226+
227+
if let Some(mut envelope) =
228+
self.build_response_envelope_from_l2(hash).await?
229+
{
230+
envelope.end_of_sequencing = Some(true);
231+
let envelope = Arc::new(envelope);
232+
self.recent_cache.insert_recent(envelope.clone());
233+
self.update_cache_gauges();
234+
self.publish_unsafe_response(envelope).await;
235+
metrics::counter!(
236+
WhitelistPreconfirmationDriverMetrics::IMPORTER_EVENTS_TOTAL,
237+
"event_type" => "end_of_sequencing_request",
238+
"result" => "served_l2_fallback",
239+
)
240+
.increment(1);
241+
} else {
242+
debug!(
243+
peer = %from,
244+
epoch,
245+
hash = %hash,
246+
"end-of-sequencing hash known for epoch but unavailable from recent cache and L2"
247+
);
248+
metrics::counter!(
249+
WhitelistPreconfirmationDriverMetrics::IMPORTER_EVENTS_TOTAL,
250+
"event_type" => "end_of_sequencing_request",
251+
"result" => "miss",
252+
)
253+
.increment(1);
254+
}
255+
}
192256
} else {
193-
debug!(peer = %from, epoch, "no recent end-of-sequencing envelope to serve");
257+
debug!(peer = %from, epoch, "no end-of-sequencing block found for epoch");
194258
metrics::counter!(
195259
WhitelistPreconfirmationDriverMetrics::IMPORTER_EVENTS_TOTAL,
196260
"event_type" => "end_of_sequencing_request",
@@ -204,35 +268,14 @@ where
204268
self.maybe_import_from_cache().await
205269
}
206270

207-
/// Event-sync progress signal.
208-
///
209-
/// Drop cached sequencers only after crossing an epoch boundary to avoid unnecessary
210-
/// re-fetches while still preventing stale signer acceptance across epochs.
211-
pub(crate) async fn on_sync_ready_signal(&mut self) -> Result<()> {
212-
if !self.refresh_sync_ready().await? {
213-
return Ok(());
214-
}
215-
216-
if self.sequencer_cache.current_epoch_start_timestamp().is_some() {
217-
let latest_l1_timestamp = self.latest_l1_block_timestamp().await?;
218-
if self
219-
.sequencer_cache
220-
.should_invalidate_for_l1_timestamp(latest_l1_timestamp, L1_EPOCH_DURATION_SECS)
221-
{
222-
self.sequencer_cache.invalidate();
223-
}
224-
}
225-
226-
if self.cache.is_empty() {
227-
return Ok(());
271+
/// Invalidate the sequencer cache if the L1 head has crossed an epoch boundary.
272+
pub(crate) async fn maybe_invalidate_sequencer_cache_for_epoch(&mut self) {
273+
if let Err(err) = self.sequencer_fetcher.maybe_invalidate_for_epoch_advance().await {
274+
warn!(
275+
error = %err,
276+
"failed to check epoch boundary for sequencer cache invalidation"
277+
);
228278
}
229-
230-
self.import_from_cache().await.inspect_err(|_err| {
231-
metrics::counter!(
232-
WhitelistPreconfirmationDriverMetrics::SYNC_READY_IMPORT_FAILURES_TOTAL
233-
)
234-
.increment(1);
235-
})
236279
}
237280

238281
/// Refresh whether sync is ready.
@@ -255,26 +298,11 @@ where
255298
Ok(self.rpc.head_l1_origin().await?.map(|head| head.block_id.to::<u64>()))
256299
}
257300

258-
/// Read latest L1 block timestamp.
259-
pub(super) async fn latest_l1_block_timestamp(&self) -> Result<u64> {
260-
self.rpc
261-
.l1_provider
262-
.get_block_by_number(BlockNumberOrTag::Latest)
263-
.await
264-
.map_err(provider_err)?
265-
.map(|block| block.header.timestamp)
266-
.ok_or_else(|| {
267-
WhitelistPreconfirmationDriverError::WhitelistLookup(
268-
"missing latest L1 block while updating sequencer cache".to_string(),
269-
)
270-
})
271-
}
272-
273301
/// Get the block hash by block number.
274302
pub(super) async fn block_hash_by_number(&self, block_number: u64) -> Result<Option<B256>> {
275303
self.rpc
276304
.l2_provider
277-
.get_block_by_number(BlockNumberOrTag::Number(block_number))
305+
.get_block_by_number(alloy_eips::BlockNumberOrTag::Number(block_number))
278306
.await
279307
.map(|opt| opt.map(|block| block.hash()))
280308
.map_err(provider_err)

0 commit comments

Comments
 (0)