Skip to content

Commit 8226e12

Browse files
authored
Feat/multiple l1 rpcs (#567)
* Propagating tx to many RPC nodes Signed-off-by: Maciej Skrzypkowski <mskr@gmx.com> * tests * v1.1.0 * fixed error logs * updated env var L1_RPC_URLS * updated .env.sample --------- Signed-off-by: Maciej Skrzypkowski <mskr@gmx.com>
1 parent 86a9109 commit 8226e12

File tree

9 files changed

+86
-20
lines changed

9 files changed

+86
-20
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ resolver = "2"
1010
default-members = ["node"]
1111

1212
[workspace.package]
13-
version = "1.0.1"
13+
version = "1.1.0"
1414
edition = "2024"
1515
repository = "https://github.com/NethermindEth/Catalyst"
1616
license = "MIT"

node/.env.sample

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ PRECONF_ROUTER_ADDRESS=0x8dDEA87FfA296B951881aDacbAe8011090C54cAC
44
TAIKO_INBOX_ADDRESS=0xc2DD6e8DC8d0558F00Cc1FA6A16FFF1A62Cc436B
55
TAIKO_ANCHOR_ADDRESS=0x1670100000000000000000000000000000010001
66
VALIDATOR_INDEX=1
7-
TAIKO_GETH_WS_RPC_URL=ws://127.0.0.1:8546
7+
TAIKO_GETH_RPC_URL=ws://127.0.0.1:8546
88
TAIKO_GETH_AUTH_RPC_URL=http://127.0.0.1:8551
99
TAIKO_DRIVER_URL=http://127.0.0.1:1235
10-
L1_WS_RPC_URL=ws://127.0.0.1:32003
10+
L1_RPC_URLS=ws://127.0.0.1:32003,wss://123.123.123.123:32001
1111
L1_BEACON_URL=http://127.0.0.1:33001
1212
RUST_LOG=debug,reqwest=info,hyper=info,alloy_transport=info,alloy_rpc_client=info,alloy_provider=info
1313
JWT_SECRET_FILE_PATH=/some/path/jwtsecret.hex

node/src/ethereum_l1/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ impl TryFrom<L1ContractAddresses> for ContractAddresses {
3636
}
3737

3838
pub struct EthereumL1Config {
39-
pub execution_rpc_url: String,
39+
pub execution_rpc_urls: Vec<String>,
4040
pub contract_addresses: ContractAddresses,
4141
pub consensus_rpc_url: String,
4242
pub min_priority_fee_per_gas_wei: u64,

node/src/ethereum_l1/execution_layer.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ impl ExecutionLayer {
5151
) -> Result<Self, Error> {
5252
let (provider, preconfer_address) = alloy_tools::construct_alloy_provider(
5353
&config.signer,
54-
&config.execution_rpc_url,
54+
config
55+
.execution_rpc_urls
56+
.first()
57+
.ok_or_else(|| anyhow!("L1 RPC URL is required"))?,
5558
config.preconfer_address,
5659
)
5760
.await?;
@@ -508,7 +511,7 @@ impl ExecutionLayer {
508511
let metrics = Arc::new(Metrics::new());
509512

510513
let ethereum_l1_config = EthereumL1Config {
511-
execution_rpc_url: ws_rpc_url,
514+
execution_rpc_urls: vec![ws_rpc_url],
512515
contract_addresses: ContractAddresses {
513516
taiko_inbox: Address::ZERO,
514517
taiko_token: OnceCell::new(),

node/src/ethereum_l1/monitor_transaction.rs

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
use super::{config::EthereumL1Config, tools, transaction_error::TransactionError};
2-
use crate::metrics::Metrics;
2+
use crate::{
3+
metrics::Metrics,
4+
shared::{alloy_tools, signer::Signer},
5+
};
36
use alloy::{
47
consensus::TxType,
58
network::{Network, ReceiptResponse, TransactionBuilder, TransactionBuilder4844},
6-
primitives::B256,
9+
primitives::{Address, B256},
710
providers::{
811
DynProvider, PendingTransactionBuilder, PendingTransactionError, Provider, RootProvider,
912
WatchTxError,
@@ -34,6 +37,9 @@ pub struct TransactionMonitorConfig {
3437
max_attempts_to_send_tx: u64,
3538
max_attempts_to_wait_tx: u64,
3639
delay_between_tx_attempts: Duration,
40+
execution_rpc_urls: Vec<String>,
41+
preconfer_address: Option<Address>,
42+
signer: Arc<Signer>,
3743
}
3844

3945
pub struct TransactionMonitorThread {
@@ -74,6 +80,9 @@ impl TransactionMonitor {
7480
delay_between_tx_attempts: Duration::from_secs(
7581
config.delay_between_tx_attempts_sec,
7682
),
83+
execution_rpc_urls: config.execution_rpc_urls.clone(),
84+
preconfer_address: config.preconfer_address,
85+
signer: config.signer.clone(),
7786
},
7887
join_handle: Mutex::new(None),
7988
error_notification_channel,
@@ -229,7 +238,7 @@ impl TransactionMonitorThread {
229238
root_provider = Some(pending_tx.provider().clone());
230239
}
231240

232-
debug!(
241+
info!(
233242
"{} tx nonce: {}, attempt: {}, l1_block: {}, hash: {}, max_fee_per_gas: {}, max_priority_fee_per_gas: {}, max_fee_per_blob_gas: {:?}",
234243
if sending_attempt == 0 {
235244
"🟢 Send"
@@ -379,8 +388,11 @@ impl TransactionMonitorThread {
379388
previous_tx_hashes: &Vec<B256>,
380389
sending_attempt: u64,
381390
) -> Option<PendingTransactionBuilder<alloy::network::Ethereum>> {
382-
match self.provider.send_transaction(tx).await {
383-
Ok(tx) => Some(tx),
391+
match self.provider.send_transaction(tx.clone()).await {
392+
Ok(pending_tx) => {
393+
self.propagate_transaction_to_other_backup_nodes(tx).await;
394+
Some(pending_tx)
395+
}
384396
Err(e) => {
385397
self.handle_rpc_error(e, previous_tx_hashes, sending_attempt)
386398
.await;
@@ -389,6 +401,41 @@ impl TransactionMonitorThread {
389401
}
390402
}
391403

404+
/// Recreates each backup node every time to avoid connection issues
405+
async fn propagate_transaction_to_other_backup_nodes(&self, tx: TransactionRequest) {
406+
// Skip the first RPC URL since it is the main one
407+
for url in self.config.execution_rpc_urls.iter().skip(1) {
408+
let provider = alloy_tools::construct_alloy_provider(
409+
&self.config.signer,
410+
url,
411+
self.config.preconfer_address,
412+
)
413+
.await;
414+
match provider {
415+
Ok(provider) => {
416+
let tx = provider.0.send_transaction(tx.clone()).await;
417+
if let Err(e) = tx {
418+
if e.to_string().contains("AlreadyKnown")
419+
|| e.to_string().to_lowercase().contains("already known")
420+
{
421+
debug!("Transaction already known to backup node {}", url);
422+
} else {
423+
warn!("Failed to send transaction to backup node {}: {}", url, e);
424+
}
425+
} else {
426+
info!("Transaction sent to backup node {}", url);
427+
}
428+
}
429+
Err(e) => {
430+
warn!(
431+
"Failed to construct alloy provider for backup node {}: {}",
432+
url, e
433+
);
434+
}
435+
}
436+
}
437+
}
438+
392439
async fn handle_rpc_error(
393440
&self,
394441
e: RpcError<TransportErrorKind>,

node/src/main.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ async fn main() -> Result<(), Error> {
7272

7373
let ethereum_l1 = ethereum_l1::EthereumL1::new(
7474
ethereum_l1::config::EthereumL1Config {
75-
execution_rpc_url: config.l1_rpc_url.clone(),
75+
execution_rpc_urls: config.l1_rpc_urls.clone(),
7676
contract_addresses: config.contract_addresses.clone().try_into()?,
7777
consensus_rpc_url: config.l1_beacon_url,
7878
slot_duration_sec: config.l1_slot_duration_sec,
@@ -172,7 +172,11 @@ async fn main() -> Result<(), Error> {
172172

173173
let chain_monitor = Arc::new(
174174
chain_monitor::ChainMonitor::new(
175-
config.l1_rpc_url,
175+
config
176+
.l1_rpc_urls
177+
.first()
178+
.expect("L1 RPC URL is required")
179+
.clone(),
176180
config.taiko_geth_rpc_url,
177181
config.contract_addresses.taiko_inbox,
178182
cancel_token.clone(),

node/src/shared/signer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::web3signer::Web3Signer;
22
use std::sync::Arc;
33

4+
#[derive(Debug)]
45
pub enum Signer {
56
Web3signer(Arc<Web3Signer>),
67
PrivateKey(String),

node/src/utils/config.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub struct Config {
1010
pub taiko_driver_url: String,
1111
pub catalyst_node_ecdsa_private_key: Option<String>,
1212
pub mev_boost_url: String,
13-
pub l1_rpc_url: String,
13+
pub l1_rpc_urls: Vec<String>,
1414
pub l1_beacon_url: String,
1515
pub web3signer_l1_url: Option<String>,
1616
pub web3signer_l2_url: Option<String>,
@@ -366,7 +366,11 @@ impl Config {
366366
catalyst_node_ecdsa_private_key,
367367
mev_boost_url: std::env::var("MEV_BOOST_URL")
368368
.unwrap_or("http://127.0.0.1:8080".to_string()),
369-
l1_rpc_url: std::env::var("L1_RPC_URL").unwrap_or("wss://127.0.0.1".to_string()),
369+
l1_rpc_urls: std::env::var("L1_RPC_URLS")
370+
.unwrap_or("wss://127.0.0.1".to_string())
371+
.split(",")
372+
.map(|s| s.to_string())
373+
.collect(),
370374
l1_beacon_url: std::env::var("L1_BEACON_URL")
371375
.unwrap_or("http://127.0.0.1:4000".to_string()),
372376
web3signer_l1_url,
@@ -459,7 +463,14 @@ propose_forced_inclusion: {}
459463
config.taiko_geth_auth_rpc_url,
460464
config.taiko_driver_url,
461465
config.mev_boost_url,
462-
config.l1_rpc_url,
466+
match config.l1_rpc_urls.split_first() {
467+
Some((first, rest)) => {
468+
let mut urls = vec![format!("{} (main)", first)];
469+
urls.extend(rest.iter().cloned());
470+
urls.join(", ")
471+
}
472+
None => String::new(),
473+
},
463474
config.l1_beacon_url,
464475
config.web3signer_l1_url.as_deref().unwrap_or("not set"),
465476
config.web3signer_l2_url.as_deref().unwrap_or("not set"),

0 commit comments

Comments
 (0)