|
1 | 1 | use { |
2 | 2 | anyhow::{anyhow, Result}, |
3 | 3 | ethers::{contract::ContractCall, middleware::Middleware}, |
| 4 | + ethers::types::U256, |
| 5 | + ethers::types::TransactionReceipt, |
4 | 6 | std::sync::Arc, |
5 | 7 | tracing, |
| 8 | + std::sync::atomic::AtomicU64, |
| 9 | + crate::config::EscalationPolicyConfig, |
| 10 | + tokio::time::{timeout, Duration}, |
| 11 | + backoff::ExponentialBackoff, |
6 | 12 | }; |
7 | 13 |
|
| 14 | +const TX_CONFIRMATION_TIMEOUT_SECS: u64 = 30; |
| 15 | + |
| 16 | +pub struct SubmitTxResult { |
| 17 | + pub num_retries: u64, |
| 18 | + pub gas_multiplier: u64, |
| 19 | + pub fee_multiplier: u64, |
| 20 | + pub duration: Duration, |
| 21 | + pub receipt: Result<TransactionReceipt, anyhow::Error>, |
| 22 | +} |
| 23 | + |
8 | 24 | pub async fn send_and_confirm<A: Middleware>(contract_call: ContractCall<A, ()>) -> Result<()> { |
9 | 25 | let call_name = contract_call.function.name.as_str(); |
10 | 26 | let pending_tx = contract_call |
@@ -64,3 +80,170 @@ pub async fn estimate_tx_cost<T: Middleware + 'static>( |
64 | 80 |
|
65 | 81 | Ok(gas_price * gas_used) |
66 | 82 | } |
| 83 | + |
| 84 | +pub async fn submit_tx_with_backoff<T: Middleware + 'static>( |
| 85 | + middleware: Arc<T>, |
| 86 | + call: ContractCall<T, ()>, |
| 87 | + gas_limit: U256, |
| 88 | + escalation_policy: EscalationPolicyConfig, |
| 89 | +) -> Result<SubmitTxResult> { |
| 90 | + let start_time = std::time::Instant::now(); |
| 91 | + |
| 92 | + tracing::info!("Started processing event"); |
| 93 | + let backoff = ExponentialBackoff { |
| 94 | + max_elapsed_time: Some(Duration::from_secs(300)), // retry for 5 minutes |
| 95 | + ..Default::default() |
| 96 | + }; |
| 97 | + |
| 98 | + let num_retries = Arc::new(AtomicU64::new(0)); |
| 99 | + |
| 100 | + let success = backoff::future::retry_notify( |
| 101 | + backoff, |
| 102 | + || async { |
| 103 | + let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed); |
| 104 | + |
| 105 | + let gas_multiplier_pct = escalation_policy.get_gas_multiplier_pct(num_retries); |
| 106 | + let fee_multiplier_pct = escalation_policy.get_fee_multiplier_pct(num_retries); |
| 107 | + submit_tx( |
| 108 | + middleware.clone(), |
| 109 | + &call, |
| 110 | + gas_limit, |
| 111 | + gas_multiplier_pct, |
| 112 | + fee_multiplier_pct, |
| 113 | + ).await |
| 114 | + }, |
| 115 | + |e, dur| { |
| 116 | + let retry_number = num_retries.load(std::sync::atomic::Ordering::Relaxed); |
| 117 | + tracing::error!( |
| 118 | + "Error on retry {} at duration {:?}: {}", |
| 119 | + retry_number, |
| 120 | + dur, |
| 121 | + e |
| 122 | + ); |
| 123 | + num_retries.store(retry_number + 1, std::sync::atomic::Ordering::Relaxed); |
| 124 | + }, |
| 125 | + ) |
| 126 | + .await; |
| 127 | + |
| 128 | + let duration = start_time.elapsed(); |
| 129 | + let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed); |
| 130 | + |
| 131 | + Ok(SubmitTxResult { |
| 132 | + num_retries: num_retries, |
| 133 | + gas_multiplier: escalation_policy.get_gas_multiplier_pct(num_retries), |
| 134 | + fee_multiplier: escalation_policy.get_fee_multiplier_pct(num_retries), |
| 135 | + duration, |
| 136 | + receipt: success, |
| 137 | + }) |
| 138 | +} |
| 139 | + |
| 140 | +/// Process a callback on a chain. It estimates the gas for the reveal with callback and |
| 141 | +/// submits the transaction if the gas estimate is below the gas limit. |
| 142 | +/// It will return a permanent or transient error depending on the error type and whether |
| 143 | +/// retry is possible or not. |
| 144 | +pub async fn submit_tx<T: Middleware + 'static>( |
| 145 | + client: Arc<T>, |
| 146 | + call: &ContractCall<T, ()>, |
| 147 | + gas_limit: U256, |
| 148 | + // A value of 100 submits the tx with the same gas/fee as the estimate. |
| 149 | + gas_estimate_multiplier_pct: u64, |
| 150 | + fee_estimate_multiplier_pct: u64, |
| 151 | +) -> Result<TransactionReceipt, backoff::Error<anyhow::Error>> { |
| 152 | + |
| 153 | + let gas_estimate_res = call.estimate_gas().await; |
| 154 | + |
| 155 | + let gas_estimate = gas_estimate_res.map_err(|e| { |
| 156 | + // we consider the error transient even if it is a contract revert since |
| 157 | + // it can be because of routing to a lagging RPC node. Retrying such errors will |
| 158 | + // incur a few additional RPC calls, but it is fine. |
| 159 | + backoff::Error::transient(anyhow!("Error estimating gas for reveal: {:?}", e)) |
| 160 | + })?; |
| 161 | + |
| 162 | + // The gas limit on the simulated transaction is the configured gas limit on the chain, |
| 163 | + // but we are willing to pad the gas a bit to ensure reliable submission. |
| 164 | + if gas_estimate > gas_limit { |
| 165 | + return Err(backoff::Error::permanent(anyhow!( |
| 166 | + "Gas estimate for reveal with callback is higher than the gas limit {} > {}", |
| 167 | + gas_estimate, |
| 168 | + gas_limit |
| 169 | + ))); |
| 170 | + } |
| 171 | + |
| 172 | + // Pad the gas estimate after checking it against the simulation gas limit, ensuring that |
| 173 | + // the padded gas estimate doesn't exceed the maximum amount of gas we are willing to use. |
| 174 | + let gas_estimate = gas_estimate.saturating_mul(gas_estimate_multiplier_pct.into()) / 100; |
| 175 | + |
| 176 | + let call = call.gas(gas_estimate); |
| 177 | + |
| 178 | + let mut transaction = call.tx.clone(); |
| 179 | + |
| 180 | + // manually fill the tx with the gas info, so we can log the details in case of error |
| 181 | + client |
| 182 | + .fill_transaction(&mut transaction, None) |
| 183 | + .await |
| 184 | + .map_err(|e| { |
| 185 | + backoff::Error::transient(anyhow!("Error filling the reveal transaction: {:?}", e)) |
| 186 | + })?; |
| 187 | + |
| 188 | + // Apply the fee escalation policy. Note: the unwrap_or_default should never default as we have a gas oracle |
| 189 | + // in the client that sets the gas price. |
| 190 | + transaction.set_gas_price( |
| 191 | + transaction |
| 192 | + .gas_price() |
| 193 | + .unwrap_or_default() |
| 194 | + .saturating_mul(fee_estimate_multiplier_pct.into()) |
| 195 | + / 100, |
| 196 | + ); |
| 197 | + |
| 198 | + let pending_tx = client |
| 199 | + .send_transaction(transaction.clone(), None) |
| 200 | + .await |
| 201 | + .map_err(|e| { |
| 202 | + backoff::Error::transient(anyhow!( |
| 203 | + "Error submitting the reveal transaction. Tx:{:?}, Error:{:?}", |
| 204 | + transaction, |
| 205 | + e |
| 206 | + )) |
| 207 | + })?; |
| 208 | + |
| 209 | + let reset_nonce = || { |
| 210 | + let nonce_manager = client.inner().inner(); |
| 211 | + nonce_manager.reset(); |
| 212 | + }; |
| 213 | + |
| 214 | + let pending_receipt = timeout( |
| 215 | + Duration::from_secs(TX_CONFIRMATION_TIMEOUT_SECS), |
| 216 | + pending_tx, |
| 217 | + ) |
| 218 | + .await |
| 219 | + .map_err(|_| { |
| 220 | + // Tx can get stuck in mempool without any progress if the nonce is too high |
| 221 | + // in this case ethers internal polling will not reduce the number of retries |
| 222 | + // and keep retrying indefinitely. So we set a manual timeout here and reset the nonce. |
| 223 | + reset_nonce(); |
| 224 | + backoff::Error::transient(anyhow!( |
| 225 | + "Tx stuck in mempool. Resetting nonce. Tx:{:?}", |
| 226 | + transaction |
| 227 | + )) |
| 228 | + })?; |
| 229 | + |
| 230 | + let receipt = pending_receipt |
| 231 | + .map_err(|e| { |
| 232 | + backoff::Error::transient(anyhow!( |
| 233 | + "Error waiting for transaction receipt. Tx:{:?} Error:{:?}", |
| 234 | + transaction, |
| 235 | + e |
| 236 | + )) |
| 237 | + })? |
| 238 | + .ok_or_else(|| { |
| 239 | + // RPC may not return an error on tx submission if the nonce is too high. |
| 240 | + // But we will never get a receipt. So we reset the nonce manager to get the correct nonce. |
| 241 | + reset_nonce(); |
| 242 | + backoff::Error::transient(anyhow!( |
| 243 | + "Can't verify the reveal, probably dropped from mempool. Resetting nonce. Tx:{:?}", |
| 244 | + transaction |
| 245 | + )) |
| 246 | + })?; |
| 247 | + |
| 248 | + Ok(receipt) |
| 249 | +} |
0 commit comments