Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ethereum/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ $ forge script script/upgrades/WrappedVara.s.sol:WrappedVaraScript --rpc-url $HO
4. generate an update message with the help of `governance-tool` (see `tools/governance/README.md`):

```bash
NEW_IMPLEMENTATION="0x0000000000000000000000000000000000000000" # must exist on https://etherscan.io
cargo run --package governance-tool --release -- --rpc-url $MAINNET_RPC_URL GovernanceAdmin UpgradeProxy MessageQueue $NEW_IMPLEMENTATION $(cast calldata "function reinitialize()")
# must exist on https://etherscan.io
NEW_IMPLEMENTATION="0x0000000000000000000000000000000000000000"
cargo run --package governance-tool --release -- --ethereum-endpoint $MAINNET_RPC_URL GovernanceAdmin UpgradeProxy MessageQueue $NEW_IMPLEMENTATION $(cast calldata "function reinitialize()")
```

5. send the extrinsic `gearEthBridge::sendEthMessage` in behalf of `governance admin`
Expand Down
124 changes: 68 additions & 56 deletions relayer/src/message_relayer/common/ethereum/merkle_root_extractor.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use crate::{
common::{self, BASE_RETRY_DELAY, MAX_RETRIES},
message_relayer::{
common::{AuthoritySetId, GearBlockNumber, RelayedMerkleRoot},
eth_to_gear::api_provider::ApiProviderConnection,
},
use crate::message_relayer::{
common::{AuthoritySetId, GearBlockNumber, RelayedMerkleRoot},
eth_to_gear::api_provider::ApiProviderConnection,
};
use alloy::{
providers::{PendingTransactionBuilder, Provider},
sol_types::SolEvent,
};
use ethereum_client::{abi::IMessageQueue::MerkleRoot, EthApi};
use futures::StreamExt;
use gear_rpc_client::GearApi;
use primitive_types::H256;
use prometheus::IntGauge;
use tokio::sync::mpsc::UnboundedSender;
use utils_prometheus::{impl_metered_service, MeteredService};
Expand Down Expand Up @@ -63,61 +62,73 @@ impl MerkleRootExtractor {
pub fn spawn(self) {
tokio::task::spawn(task(self));
}
}

async fn task(mut this: MerkleRootExtractor) {
let mut attempts = 0;
async fn fetch_hash_auth_id(
&mut self,
block_number_gear: u32,
) -> Option<(H256, AuthoritySetId)> {
let gear_api = self.api_provider.client();
loop {
match self::fetch_hash_auth_id(&gear_api, block_number_gear).await {
Ok(result) => return Some(result),

Err(e) => {
log::error!(r#"Merkle root extractor failed to fetch block_hash: "{e:?}""#);
log::trace!(
r#"e.downcast_ref::<gsdk::Error>(): "{:?}""#,
e.downcast_ref::<gsdk::Error>()
);
log::trace!(
r#"e.downcast_ref::<subxt::Error>(): "{:?}""#,
e.downcast_ref::<subxt::Error>()
);
for cause in e.chain() {
log::trace!(r#"cause: "{cause:?}""#);
}
}
}

loop {
let res = task_inner(&this).await;
if let Err(err) = res {
attempts += 1;
log::error!(
"Merkle root extractor failed (attempt {}/{}): {}. Retrying in {:?}...",
attempts,
MAX_RETRIES,
err,
BASE_RETRY_DELAY * 2u32.pow(attempts - 1),
);
if attempts >= MAX_RETRIES {
log::error!("Merkle root extractor failed {attempts} times: {err}");
break;
if let Err(e) = self.api_provider.reconnect().await {
log::error!(r#"Merkle root extractor unable to reconnect: "{e}""#);
return None;
}

tokio::time::sleep(BASE_RETRY_DELAY * 2u32.pow(attempts - 1)).await;
log::debug!("API provider reconnected");
}
}
}

match this.api_provider.reconnect().await {
Ok(()) => {
log::info!("API provider reconnected");
}
async fn fetch_hash_auth_id(
gear_api: &GearApi,
block_number_gear: u32,
) -> anyhow::Result<(H256, AuthoritySetId)> {
let block_hash = gear_api.block_number_to_hash(block_number_gear).await?;

Err(err) => {
log::error!("Merkle root extractor unable to reconnect: {err}");
return;
}
}
let authority_set_id = AuthoritySetId(gear_api.signed_by_authority_set_id(block_hash).await?);

if common::is_transport_error_recoverable(&err) {
this.eth_api = match this.eth_api.reconnect().await {
Ok(eth_api) => eth_api,
Err(err) => {
log::error!("Failed to reconnect to Ethereum: {err}");
break;
}
};
} else {
log::error!("Merkle root extractor failed: {err}");
break;
}
} else {
Ok((block_hash, authority_set_id))
}

async fn task(mut this: MerkleRootExtractor) {
loop {
let Err(err) = task_inner(&mut this).await else {
log::info!("Exiting");
break;
}
};

log::error!(r#"Merkle root extractor failed: "{err:?}""#);

this.eth_api = match this.eth_api.reconnect().await {
Ok(eth_api) => eth_api,
Err(err) => {
log::error!(r#"Failed to reconnect to Ethereum: "{err}""#);
break;
}
};
}
}

async fn task_inner(this: &MerkleRootExtractor) -> anyhow::Result<()> {
let gear_api = this.api_provider.client();
async fn task_inner(this: &mut MerkleRootExtractor) -> anyhow::Result<()> {
let subscription = this.eth_api.subscribe_logs().await?;

let mut stream = subscription.into_result_stream();
Expand Down Expand Up @@ -185,24 +196,25 @@ async fn task_inner(this: &MerkleRootExtractor) -> anyhow::Result<()> {
.latest_merkle_root_for_block
.set(block_number_gear as i64);

let block_hash = gear_api.block_number_to_hash(block_number_gear).await?;

let authority_set_id =
AuthoritySetId(gear_api.signed_by_authority_set_id(block_hash).await?);
let Some((block_hash, authority_set_id)) = this.fetch_hash_auth_id(block_number_gear).await else {
return Ok(());
};

log::info!(
"Merkle root {:?} is for era #{authority_set_id}",
(root.blockNumber, root.merkleRoot),
);

this.sender.send(RelayedMerkleRoot {
if let Err(e) = this.sender.send(RelayedMerkleRoot {
block: GearBlockNumber(block_number_gear),
block_hash,
authority_set_id,
merkle_root: root.merkleRoot.0.into(),
timestamp: block_timestamp,
})?;

}) {
log::error!(r#"Sender channel closed: "{e:?}"."#);
return Ok(());
}
}
}
}
Expand Down
72 changes: 45 additions & 27 deletions relayer/src/message_relayer/gear_to_eth/paid_token_transfers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
common::MAX_RETRIES,
common::{BlockRange, MAX_RETRIES},
message_relayer::{
common::{
ethereum::{
Expand Down Expand Up @@ -203,44 +203,62 @@ async fn fetch_merkle_roots_inner(
sender: UnboundedSender<RelayedMerkleRoot>,
) -> AnyResult<()> {
const COUNT: u64 = 2_000;
const COUNT_STEP: u64 = 50;

let block_finalized = eth_api.finalized_block_number().await?;
let block_latest = eth_api.latest_block_number().await?;
let gear_api = api_provider.client();

for i in 0..50 {
for i in 0..COUNT_STEP {
let block_range = crate::common::create_range(
(block_finalized - (i + 1) * COUNT).into(),
block_finalized - i * COUNT,
);
let merkle_roots = eth_api
.fetch_merkle_roots_in_range(block_range.from, block_range.to)
.await?;

let len = merkle_roots.len();
log::trace!("Found {len} entry(ies) with merkle roots (i = {i})");
for (root, _block_number_eth) in merkle_roots
.into_iter()
.filter_map(|(root, block)| block.map(|block| (root, block)))
{
let timestamp = eth_api.get_block_timestamp(_block_number_eth).await?;
let block_hash = gear_api
.block_number_to_hash(root.block_number as u32)
.await?;
let authority_set_id = gear_api.signed_by_authority_set_id(block_hash).await?;

sender.send(RelayedMerkleRoot {
block: GearBlockNumber(root.block_number as u32),
block_hash,
authority_set_id: AuthoritySetId(authority_set_id),
merkle_root: root.merkle_root,
timestamp,
})?;
}

log::trace!("Successfuly sent {len} merkle root entry(ies) (i = {i})");
fetch_merkle_roots_in_range(&eth_api, &gear_api, &sender, i, block_range).await?;

time::sleep(time::Duration::from_secs(5)).await;
}

// to that moment block_latest should have the required number of confirmations
let block_range = crate::common::create_range((block_finalized + 1).into(), block_latest);

fetch_merkle_roots_in_range(&eth_api, &gear_api, &sender, COUNT_STEP, block_range).await
}

async fn fetch_merkle_roots_in_range(
eth_api: &EthApi,
gear_api: &gear_rpc_client::GearApi,
sender: &UnboundedSender<RelayedMerkleRoot>,
i: u64,
block_range: BlockRange,
) -> AnyResult<()> {
let merkle_roots = eth_api
.fetch_merkle_roots_in_range(block_range.from, block_range.to)
.await?;

let len = merkle_roots.len();
log::trace!("Found {len} entry(ies) with merkle roots (i = {i})");
for (root, _block_number_eth) in merkle_roots
.into_iter()
.filter_map(|(root, block)| block.map(|block| (root, block)))
{
let timestamp = eth_api.get_block_timestamp(_block_number_eth).await?;
let block_hash = gear_api
.block_number_to_hash(root.block_number as u32)
.await?;
let authority_set_id = gear_api.signed_by_authority_set_id(block_hash).await?;

sender.send(RelayedMerkleRoot {
block: GearBlockNumber(root.block_number as u32),
block_hash,
authority_set_id: AuthoritySetId(authority_set_id),
merkle_root: root.merkle_root,
timestamp,
})?;
}

log::trace!("Successfuly sent {len} merkle root entry(ies) (i = {i})");

Ok(())
}
Loading