diff --git a/ethereum/README.md b/ethereum/README.md index 2389a8437..3ff32a0a0 100644 --- a/ethereum/README.md +++ b/ethereum/README.md @@ -102,8 +102,9 @@ $ forge script script/upgrades/WrappedVara.s.sol:WrappedVaraScript --rpc-url $HO 4. generate an update message with the help of `governance-tool` (see `tools/governance/README.md`): ```bash - NEW_IMPLEMENTATION="0x0000000000000000000000000000000000000000" # must exist on https://etherscan.io - cargo run --package governance-tool --release -- --rpc-url $MAINNET_RPC_URL GovernanceAdmin UpgradeProxy MessageQueue $NEW_IMPLEMENTATION $(cast calldata "function reinitialize()") + # must exist on https://etherscan.io + NEW_IMPLEMENTATION="0x0000000000000000000000000000000000000000" + cargo run --package governance-tool --release -- --ethereum-endpoint $MAINNET_RPC_URL GovernanceAdmin UpgradeProxy MessageQueue $NEW_IMPLEMENTATION $(cast calldata "function reinitialize()") ``` 5. send the extrinsic `gearEthBridge::sendEthMessage` in behalf of `governance admin` diff --git a/relayer/src/message_relayer/common/ethereum/merkle_root_extractor.rs b/relayer/src/message_relayer/common/ethereum/merkle_root_extractor.rs index 558940844..94581ada7 100644 --- a/relayer/src/message_relayer/common/ethereum/merkle_root_extractor.rs +++ b/relayer/src/message_relayer/common/ethereum/merkle_root_extractor.rs @@ -1,9 +1,6 @@ -use crate::{ - common::{self, BASE_RETRY_DELAY, MAX_RETRIES}, - message_relayer::{ - common::{AuthoritySetId, GearBlockNumber, RelayedMerkleRoot}, - eth_to_gear::api_provider::ApiProviderConnection, - }, +use crate::message_relayer::{ + common::{AuthoritySetId, GearBlockNumber, RelayedMerkleRoot}, + eth_to_gear::api_provider::ApiProviderConnection, }; use alloy::{ providers::{PendingTransactionBuilder, Provider}, @@ -11,6 +8,8 @@ use alloy::{ }; use ethereum_client::{abi::IMessageQueue::MerkleRoot, EthApi}; use futures::StreamExt; +use gear_rpc_client::GearApi; +use primitive_types::H256; use prometheus::IntGauge; use tokio::sync::mpsc::UnboundedSender; use utils_prometheus::{impl_metered_service, MeteredService}; @@ -63,61 +62,73 @@ impl MerkleRootExtractor { pub fn spawn(self) { tokio::task::spawn(task(self)); } -} -async fn task(mut this: MerkleRootExtractor) { - let mut attempts = 0; + async fn fetch_hash_auth_id( + &mut self, + block_number_gear: u32, + ) -> Option<(H256, AuthoritySetId)> { + let gear_api = self.api_provider.client(); + loop { + match self::fetch_hash_auth_id(&gear_api, block_number_gear).await { + Ok(result) => return Some(result), + + Err(e) => { + log::error!(r#"Merkle root extractor failed to fetch block_hash: "{e:?}""#); + log::trace!( + r#"e.downcast_ref::(): "{:?}""#, + e.downcast_ref::() + ); + log::trace!( + r#"e.downcast_ref::(): "{:?}""#, + e.downcast_ref::() + ); + for cause in e.chain() { + log::trace!(r#"cause: "{cause:?}""#); + } + } + } - loop { - let res = task_inner(&this).await; - if let Err(err) = res { - attempts += 1; - log::error!( - "Merkle root extractor failed (attempt {}/{}): {}. Retrying in {:?}...", - attempts, - MAX_RETRIES, - err, - BASE_RETRY_DELAY * 2u32.pow(attempts - 1), - ); - if attempts >= MAX_RETRIES { - log::error!("Merkle root extractor failed {attempts} times: {err}"); - break; + if let Err(e) = self.api_provider.reconnect().await { + log::error!(r#"Merkle root extractor unable to reconnect: "{e}""#); + return None; } - tokio::time::sleep(BASE_RETRY_DELAY * 2u32.pow(attempts - 1)).await; + log::debug!("API provider reconnected"); + } + } +} - match this.api_provider.reconnect().await { - Ok(()) => { - log::info!("API provider reconnected"); - } +async fn fetch_hash_auth_id( + gear_api: &GearApi, + block_number_gear: u32, +) -> anyhow::Result<(H256, AuthoritySetId)> { + let block_hash = gear_api.block_number_to_hash(block_number_gear).await?; - Err(err) => { - log::error!("Merkle root extractor unable to reconnect: {err}"); - return; - } - } + let authority_set_id = AuthoritySetId(gear_api.signed_by_authority_set_id(block_hash).await?); - if common::is_transport_error_recoverable(&err) { - this.eth_api = match this.eth_api.reconnect().await { - Ok(eth_api) => eth_api, - Err(err) => { - log::error!("Failed to reconnect to Ethereum: {err}"); - break; - } - }; - } else { - log::error!("Merkle root extractor failed: {err}"); - break; - } - } else { + Ok((block_hash, authority_set_id)) +} + +async fn task(mut this: MerkleRootExtractor) { + loop { + let Err(err) = task_inner(&mut this).await else { log::info!("Exiting"); break; - } + }; + + log::error!(r#"Merkle root extractor failed: "{err:?}""#); + + this.eth_api = match this.eth_api.reconnect().await { + Ok(eth_api) => eth_api, + Err(err) => { + log::error!(r#"Failed to reconnect to Ethereum: "{err}""#); + break; + } + }; } } -async fn task_inner(this: &MerkleRootExtractor) -> anyhow::Result<()> { - let gear_api = this.api_provider.client(); +async fn task_inner(this: &mut MerkleRootExtractor) -> anyhow::Result<()> { let subscription = this.eth_api.subscribe_logs().await?; let mut stream = subscription.into_result_stream(); @@ -185,24 +196,25 @@ async fn task_inner(this: &MerkleRootExtractor) -> anyhow::Result<()> { .latest_merkle_root_for_block .set(block_number_gear as i64); - let block_hash = gear_api.block_number_to_hash(block_number_gear).await?; - - let authority_set_id = - AuthoritySetId(gear_api.signed_by_authority_set_id(block_hash).await?); + let Some((block_hash, authority_set_id)) = this.fetch_hash_auth_id(block_number_gear).await else { + return Ok(()); + }; log::info!( "Merkle root {:?} is for era #{authority_set_id}", (root.blockNumber, root.merkleRoot), ); - this.sender.send(RelayedMerkleRoot { + if let Err(e) = this.sender.send(RelayedMerkleRoot { block: GearBlockNumber(block_number_gear), block_hash, authority_set_id, merkle_root: root.merkleRoot.0.into(), timestamp: block_timestamp, - })?; - + }) { + log::error!(r#"Sender channel closed: "{e:?}"."#); + return Ok(()); + } } } } diff --git a/relayer/src/message_relayer/gear_to_eth/paid_token_transfers.rs b/relayer/src/message_relayer/gear_to_eth/paid_token_transfers.rs index af1b87518..dff3116b6 100644 --- a/relayer/src/message_relayer/gear_to_eth/paid_token_transfers.rs +++ b/relayer/src/message_relayer/gear_to_eth/paid_token_transfers.rs @@ -1,5 +1,5 @@ use crate::{ - common::MAX_RETRIES, + common::{BlockRange, MAX_RETRIES}, message_relayer::{ common::{ ethereum::{ @@ -203,44 +203,62 @@ async fn fetch_merkle_roots_inner( sender: UnboundedSender, ) -> AnyResult<()> { const COUNT: u64 = 2_000; + const COUNT_STEP: u64 = 50; let block_finalized = eth_api.finalized_block_number().await?; + let block_latest = eth_api.latest_block_number().await?; let gear_api = api_provider.client(); - for i in 0..50 { + for i in 0..COUNT_STEP { let block_range = crate::common::create_range( (block_finalized - (i + 1) * COUNT).into(), block_finalized - i * COUNT, ); - let merkle_roots = eth_api - .fetch_merkle_roots_in_range(block_range.from, block_range.to) - .await?; - - let len = merkle_roots.len(); - log::trace!("Found {len} entry(ies) with merkle roots (i = {i})"); - for (root, _block_number_eth) in merkle_roots - .into_iter() - .filter_map(|(root, block)| block.map(|block| (root, block))) - { - let timestamp = eth_api.get_block_timestamp(_block_number_eth).await?; - let block_hash = gear_api - .block_number_to_hash(root.block_number as u32) - .await?; - let authority_set_id = gear_api.signed_by_authority_set_id(block_hash).await?; - - sender.send(RelayedMerkleRoot { - block: GearBlockNumber(root.block_number as u32), - block_hash, - authority_set_id: AuthoritySetId(authority_set_id), - merkle_root: root.merkle_root, - timestamp, - })?; - } - log::trace!("Successfuly sent {len} merkle root entry(ies) (i = {i})"); + fetch_merkle_roots_in_range(ð_api, &gear_api, &sender, i, block_range).await?; time::sleep(time::Duration::from_secs(5)).await; } + // to that moment block_latest should have the required number of confirmations + let block_range = crate::common::create_range((block_finalized + 1).into(), block_latest); + + fetch_merkle_roots_in_range(ð_api, &gear_api, &sender, COUNT_STEP, block_range).await +} + +async fn fetch_merkle_roots_in_range( + eth_api: &EthApi, + gear_api: &gear_rpc_client::GearApi, + sender: &UnboundedSender, + i: u64, + block_range: BlockRange, +) -> AnyResult<()> { + let merkle_roots = eth_api + .fetch_merkle_roots_in_range(block_range.from, block_range.to) + .await?; + + let len = merkle_roots.len(); + log::trace!("Found {len} entry(ies) with merkle roots (i = {i})"); + for (root, _block_number_eth) in merkle_roots + .into_iter() + .filter_map(|(root, block)| block.map(|block| (root, block))) + { + let timestamp = eth_api.get_block_timestamp(_block_number_eth).await?; + let block_hash = gear_api + .block_number_to_hash(root.block_number as u32) + .await?; + let authority_set_id = gear_api.signed_by_authority_set_id(block_hash).await?; + + sender.send(RelayedMerkleRoot { + block: GearBlockNumber(root.block_number as u32), + block_hash, + authority_set_id: AuthoritySetId(authority_set_id), + merkle_root: root.merkle_root, + timestamp, + })?; + } + + log::trace!("Successfuly sent {len} merkle root entry(ies) (i = {i})"); + Ok(()) }