Skip to content

Commit 21532b6

Browse files
authored
feat: Lander EVM, periodically update finalized nonce and detect any messages that need reprocessing (#7197)
1 parent 4011a45 commit 21532b6

File tree

8 files changed

+216
-7
lines changed

8 files changed

+216
-7
lines changed

rust/main/lander/src/adapter/chains/ethereum/adapter.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -671,6 +671,60 @@ impl AdaptsChain for EthereumAdapter {
671671
Ok(reverted)
672672
}
673673

674+
fn reprocess_txs_poll_rate(&self) -> Option<Duration> {
675+
// if the block time is too short, we want to cap it at 5s because we don't want
676+
// to query the nonce too much. 5s should be quick enough for a reorg
677+
Some((*self.estimated_block_time()).max(Duration::from_secs(5)))
678+
}
679+
680+
async fn get_reprocess_txs(&self) -> Result<Vec<Transaction>, LanderError> {
681+
let old_finalized_nonce = self
682+
.nonce_manager
683+
.state
684+
.get_finalized_nonce()
685+
.await?
686+
.unwrap_or_default();
687+
self.nonce_manager.nonce_updater.update_boundaries().await?;
688+
let new_finalized_nonce = self
689+
.nonce_manager
690+
.state
691+
.get_finalized_nonce()
692+
.await?
693+
.unwrap_or_default();
694+
695+
if new_finalized_nonce >= old_finalized_nonce {
696+
return Ok(Vec::new());
697+
}
698+
699+
warn!(
700+
?old_finalized_nonce,
701+
?new_finalized_nonce,
702+
"New finalized nonce is lower than old finalized nonce"
703+
);
704+
705+
let mut txs = Vec::new();
706+
let mut nonce = new_finalized_nonce.saturating_add(U256::one());
707+
while nonce <= old_finalized_nonce {
708+
let tx_uuid = self.nonce_manager.state.get_tracked_tx_uuid(&nonce).await?;
709+
if tx_uuid == TransactionUuid::default() {
710+
debug!(
711+
?nonce,
712+
"No tracked transaction UUID for nonce in reorg range"
713+
);
714+
} else if let Some(tx) = self.nonce_manager.state.get_tracked_tx(&tx_uuid).await? {
715+
txs.push(tx);
716+
} else {
717+
debug!(
718+
?nonce,
719+
?tx_uuid,
720+
"No transaction found for nonce in reorg range"
721+
);
722+
}
723+
nonce = nonce.saturating_add(U256::one());
724+
}
725+
Ok(txs)
726+
}
727+
674728
fn estimated_block_time(&self) -> &std::time::Duration {
675729
&self.estimated_block_time
676730
}

rust/main/lander/src/adapter/chains/ethereum/nonce.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub(crate) use manager::NonceManager;
2+
pub(crate) use updater::NonceUpdater;
23

34
mod db;
45
mod error;
@@ -11,8 +12,6 @@ mod updater;
1112
pub(crate) use db::NonceDb;
1213
#[cfg(test)]
1314
pub(crate) use state::NonceManagerState;
14-
#[cfg(test)]
15-
pub(crate) use updater::NonceUpdater;
1615

1716
#[cfg(test)]
1817
mod tests;

rust/main/lander/src/adapter/chains/ethereum/nonce/manager.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use ethers::signers::Signer;
45
use ethers_core::types::Address;
@@ -41,8 +42,13 @@ impl NonceManager {
4142
let tx_db = db.clone() as Arc<dyn TransactionDb>;
4243
let state = Arc::new(NonceManagerState::new(nonce_db, tx_db, address, metrics));
4344

44-
let nonce_updater =
45-
NonceUpdater::new(address, reorg_period, block_time, provider, state.clone());
45+
let nonce_updater = NonceUpdater::new(
46+
address,
47+
reorg_period,
48+
block_time,
49+
provider.clone(),
50+
state.clone(),
51+
);
4652

4753
let manager = Self {
4854
address,

rust/main/lander/src/adapter/chains/ethereum/nonce/state/db.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ impl NonceManagerState {
1212
Ok((finalized_nonce, upper_nonce))
1313
}
1414

15-
pub(super) async fn get_tracked_tx(
15+
pub async fn get_tracked_tx(
1616
&self,
1717
tx_uuid: &TransactionUuid,
1818
) -> NonceResult<Option<Transaction>> {
@@ -47,7 +47,7 @@ impl NonceManagerState {
4747
Ok(())
4848
}
4949

50-
pub(super) async fn get_tracked_tx_uuid(&self, nonce: &U256) -> NonceResult<TransactionUuid> {
50+
pub async fn get_tracked_tx_uuid(&self, nonce: &U256) -> NonceResult<TransactionUuid> {
5151
let tx_uuid = self
5252
.nonce_db
5353
.retrieve_transaction_uuid_by_nonce_and_signer_address(nonce, &self.address)
@@ -65,7 +65,7 @@ impl NonceManagerState {
6565
Ok(())
6666
}
6767

68-
pub(super) async fn get_finalized_nonce(&self) -> NonceResult<Option<U256>> {
68+
pub async fn get_finalized_nonce(&self) -> NonceResult<Option<U256>> {
6969
let finalized_nonce = self
7070
.nonce_db
7171
.retrieve_finalized_nonce_by_signer_address(&self.address)

rust/main/lander/src/adapter/core.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,23 @@ pub trait AdaptsChain: Send + Sync {
114114
async fn replace_tx(&self, _tx: &Transaction) -> Result<(), LanderError> {
115115
todo!()
116116
}
117+
118+
/// Returns the polling interval for checking if transactions need reprocessing.
119+
///
120+
/// Returns `None` if the adapter does not support transaction reprocessing,
121+
/// or `Some(Duration)` specifying how frequently to poll.
122+
fn reprocess_txs_poll_rate(&self) -> Option<Duration> {
123+
None
124+
}
125+
126+
/// Get a list of transactions that need to be reprocessed.
127+
///
128+
/// Returns an empty vector if no transactions need reprocessing or if the adapter
129+
/// does not support reprocessing.
130+
///
131+
/// Note: Implementations may update internal state (e.g., finalized nonce boundaries)
132+
/// as part of determining which transactions need reprocessing.
133+
async fn get_reprocess_txs(&self) -> Result<Vec<Transaction>, LanderError> {
134+
Ok(Vec::new())
135+
}
117136
}

rust/main/lander/src/dispatcher/stages/inclusion_stage.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ impl InclusionStage {
6464
domain,
6565
} = self;
6666
let futures = vec![
67+
tokio::spawn(
68+
Self::receive_reprocess_txs(domain.clone(), pool.clone(), state.clone())
69+
.instrument(info_span!("receive_reprocess_txs")),
70+
),
6771
tokio::spawn(
6872
Self::receive_txs(tx_receiver, pool.clone(), state.clone(), domain.clone())
6973
.instrument(info_span!("receive_txs")),
@@ -180,6 +184,45 @@ impl InclusionStage {
180184
Ok(())
181185
}
182186

187+
#[instrument(skip_all, fields(domain))]
188+
pub async fn receive_reprocess_txs(
189+
domain: String,
190+
pool: InclusionStagePool,
191+
state: DispatcherState,
192+
) -> Result<(), LanderError> {
193+
let poll_rate = match state.adapter.reprocess_txs_poll_rate() {
194+
Some(s) => s,
195+
// if no poll rate, then that means we don't worry about reprocessing txs
196+
None => return Ok(()),
197+
};
198+
loop {
199+
state.metrics.update_liveness_metric(
200+
format!("{}::receive_reprocess_txs", STAGE_NAME).as_str(),
201+
&domain,
202+
);
203+
204+
tokio::time::sleep(poll_rate).await;
205+
tracing::debug!(
206+
domain,
207+
"Checking for any transactions that needs reprocessing"
208+
);
209+
210+
let txs = match state.adapter.get_reprocess_txs().await {
211+
Ok(s) => s,
212+
_ => continue,
213+
};
214+
if txs.is_empty() {
215+
continue;
216+
}
217+
218+
tracing::debug!(?txs, "Reprocessing transactions");
219+
let mut locked_pool = pool.lock().await;
220+
for tx in txs {
221+
locked_pool.insert(tx.uuid.clone(), tx);
222+
}
223+
}
224+
}
225+
183226
fn tx_ready_for_processing(
184227
base_interval: Duration,
185228
now: DateTime<Utc>,

rust/main/lander/src/dispatcher/stages/inclusion_stage/tests.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ async fn test_processing_included_txs() {
2020
const TXS_TO_PROCESS: usize = 3;
2121

2222
let mut mock_adapter = MockAdapter::new();
23+
mock_adapter
24+
.expect_reprocess_txs_poll_rate()
25+
.returning(|| None);
2326
mock_adapter
2427
.expect_estimated_block_time()
2528
.return_const(Duration::from_millis(400));
@@ -88,6 +91,9 @@ async fn test_failed_simulation() {
8891
const TXS_TO_PROCESS: usize = 3;
8992

9093
let mut mock_adapter = MockAdapter::new();
94+
mock_adapter
95+
.expect_reprocess_txs_poll_rate()
96+
.returning(|| None);
9197
mock_adapter
9298
.expect_estimated_block_time()
9399
.return_const(Duration::from_millis(400));
@@ -125,6 +131,9 @@ async fn test_failed_estimation() {
125131
const TXS_TO_PROCESS: usize = 3;
126132

127133
let mut mock_adapter = MockAdapter::new();
134+
mock_adapter
135+
.expect_reprocess_txs_poll_rate()
136+
.returning(|| None);
128137
mock_adapter
129138
.expect_estimated_block_time()
130139
.return_const(Duration::from_millis(400));
@@ -198,6 +207,9 @@ async fn test_channel_closed_before_any_tx() {
198207
#[tokio::test]
199208
async fn test_transaction_status_dropped() {
200209
let mut mock_adapter = MockAdapter::new();
210+
mock_adapter
211+
.expect_reprocess_txs_poll_rate()
212+
.returning(|| None);
201213
mock_adapter
202214
.expect_estimated_block_time()
203215
.return_const(Duration::from_millis(400));
@@ -249,6 +261,9 @@ async fn test_transaction_not_ready_for_resubmission() {
249261
#[tokio::test]
250262
async fn test_failed_submission_after_simulation_and_estimation() {
251263
let mut mock_adapter = MockAdapter::new();
264+
mock_adapter
265+
.expect_reprocess_txs_poll_rate()
266+
.returning(|| None);
252267
mock_adapter
253268
.expect_estimated_block_time()
254269
.return_const(Duration::from_millis(400));
@@ -283,6 +298,9 @@ async fn test_failed_submission_after_simulation_and_estimation() {
283298
#[tokio::test]
284299
async fn test_transaction_included_immediately() {
285300
let mut mock_adapter = MockAdapter::new();
301+
mock_adapter
302+
.expect_reprocess_txs_poll_rate()
303+
.returning(|| None);
286304
mock_adapter
287305
.expect_estimated_block_time()
288306
.return_const(Duration::from_millis(400));
@@ -307,6 +325,9 @@ async fn test_transaction_included_immediately() {
307325
#[tokio::test]
308326
async fn test_transaction_pending_then_included() {
309327
let mut mock_adapter = MockAdapter::new();
328+
mock_adapter
329+
.expect_reprocess_txs_poll_rate()
330+
.returning(|| None);
310331
mock_adapter
311332
.expect_estimated_block_time()
312333
.return_const(Duration::from_millis(400));
@@ -558,3 +579,68 @@ async fn test_reasonable_receipt_query_frequency() {
558579
queries_per_second_per_tx
559580
);
560581
}
582+
583+
#[tokio::test]
584+
async fn test_processing_reprocess_txs() {
585+
let txs_to_process = 4;
586+
let (payload_db, tx_db, _) = tmp_dbs();
587+
let (_sender, building_stage_receiver) = mpsc::channel(txs_to_process);
588+
let (finality_stage_sender, _receiver) = mpsc::channel(txs_to_process);
589+
let txs_created = create_random_txs_and_store_them(
590+
txs_to_process,
591+
&payload_db,
592+
&tx_db,
593+
TransactionStatus::PendingInclusion,
594+
)
595+
.await;
596+
597+
let mut mock_adapter = MockAdapter::new();
598+
mock_adapter
599+
.expect_estimated_block_time()
600+
.return_const(Duration::from_millis(400));
601+
mock_adapter
602+
.expect_tx_status()
603+
.returning(|_| Ok(TransactionStatus::PendingInclusion));
604+
mock_adapter
605+
.expect_tx_ready_for_resubmission()
606+
.returning(|_| false);
607+
608+
mock_adapter
609+
.expect_reprocess_txs_poll_rate()
610+
.return_const(Some(Duration::from_millis(50)));
611+
let mut txs_created_option = Some(txs_created.clone());
612+
mock_adapter.expect_get_reprocess_txs().returning(move || {
613+
if let Some(txs) = txs_created_option.take() {
614+
Ok(txs)
615+
} else {
616+
Ok(Vec::new())
617+
}
618+
});
619+
620+
let state = DispatcherState::new(
621+
payload_db.clone(),
622+
tx_db.clone(),
623+
Arc::new(mock_adapter),
624+
DispatcherMetrics::dummy_instance(),
625+
"test".to_string(),
626+
);
627+
let inclusion_stage = InclusionStage::new(
628+
building_stage_receiver,
629+
finality_stage_sender,
630+
state,
631+
"test".to_string(),
632+
);
633+
let pool = inclusion_stage.pool.clone();
634+
635+
let stage = tokio::spawn(async move { inclusion_stage.run().await });
636+
let _ = tokio::select! {
637+
// this arm runs indefinitely
638+
_ = stage => {
639+
},
640+
// this arm is the timeout - increased to accommodate adaptive polling
641+
_ = sleep(Duration::from_millis(500)) => {
642+
}
643+
};
644+
645+
assert!(are_all_txs_in_pool(txs_created.clone(), &pool).await);
646+
}

rust/main/lander/src/tests/test_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ mockall::mock! {
3535
fn update_vm_specific_metrics(&self, _tx: &Transaction, _metrics: &DispatcherMetrics);
3636
async fn nonce_gap_exists(&self) -> bool;
3737
async fn replace_tx(&self, _tx: &Transaction) -> Result<(), LanderError>;
38+
fn reprocess_txs_poll_rate(&self) -> Option<std::time::Duration>;
39+
async fn get_reprocess_txs(&self) -> Result<Vec<Transaction>, LanderError>;
3840
}
3941
}
4042

0 commit comments

Comments
 (0)