|
1 | | -use crate::{ |
2 | | - common::{self, BASE_RETRY_DELAY, MAX_RETRIES}, |
3 | | - message_relayer::{ |
4 | | - common::{AuthoritySetId, GearBlockNumber, RelayedMerkleRoot}, |
5 | | - eth_to_gear::api_provider::ApiProviderConnection, |
6 | | - }, |
| 1 | +use crate::message_relayer::{ |
| 2 | + common::{AuthoritySetId, GearBlockNumber, RelayedMerkleRoot}, |
| 3 | + eth_to_gear::api_provider::ApiProviderConnection, |
7 | 4 | }; |
8 | 5 | use alloy::{ |
9 | 6 | providers::{PendingTransactionBuilder, Provider}, |
10 | 7 | sol_types::SolEvent, |
11 | 8 | }; |
12 | 9 | use ethereum_client::{abi::IMessageQueue::MerkleRoot, EthApi}; |
13 | 10 | use futures::StreamExt; |
| 11 | +use gear_rpc_client::GearApi; |
| 12 | +use primitive_types::H256; |
14 | 13 | use prometheus::IntGauge; |
15 | 14 | use tokio::sync::mpsc::UnboundedSender; |
16 | 15 | use utils_prometheus::{impl_metered_service, MeteredService}; |
@@ -63,61 +62,73 @@ impl MerkleRootExtractor { |
63 | 62 | pub fn spawn(self) { |
64 | 63 | tokio::task::spawn(task(self)); |
65 | 64 | } |
66 | | -} |
67 | 65 |
|
68 | | -async fn task(mut this: MerkleRootExtractor) { |
69 | | - let mut attempts = 0; |
| 66 | + async fn fetch_hash_auth_id( |
| 67 | + &mut self, |
| 68 | + block_number_gear: u32, |
| 69 | + ) -> Option<(H256, AuthoritySetId)> { |
| 70 | + let gear_api = self.api_provider.client(); |
| 71 | + loop { |
| 72 | + match self::fetch_hash_auth_id(&gear_api, block_number_gear).await { |
| 73 | + Ok(result) => return Some(result), |
| 74 | + |
| 75 | + Err(e) => { |
| 76 | + log::error!(r#"Merkle root extractor failed to fetch block_hash: "{e:?}""#); |
| 77 | + log::trace!( |
| 78 | + r#"e.downcast_ref::<gsdk::Error>(): "{:?}""#, |
| 79 | + e.downcast_ref::<gsdk::Error>() |
| 80 | + ); |
| 81 | + log::trace!( |
| 82 | + r#"e.downcast_ref::<subxt::Error>(): "{:?}""#, |
| 83 | + e.downcast_ref::<subxt::Error>() |
| 84 | + ); |
| 85 | + for cause in e.chain() { |
| 86 | + log::trace!(r#"cause: "{cause:?}""#); |
| 87 | + } |
| 88 | + } |
| 89 | + } |
70 | 90 |
|
71 | | - loop { |
72 | | - let res = task_inner(&this).await; |
73 | | - if let Err(err) = res { |
74 | | - attempts += 1; |
75 | | - log::error!( |
76 | | - "Merkle root extractor failed (attempt {}/{}): {}. Retrying in {:?}...", |
77 | | - attempts, |
78 | | - MAX_RETRIES, |
79 | | - err, |
80 | | - BASE_RETRY_DELAY * 2u32.pow(attempts - 1), |
81 | | - ); |
82 | | - if attempts >= MAX_RETRIES { |
83 | | - log::error!("Merkle root extractor failed {attempts} times: {err}"); |
84 | | - break; |
| 91 | + if let Err(e) = self.api_provider.reconnect().await { |
| 92 | + log::error!(r#"Merkle root extractor unable to reconnect: "{e}""#); |
| 93 | + return None; |
85 | 94 | } |
86 | 95 |
|
87 | | - tokio::time::sleep(BASE_RETRY_DELAY * 2u32.pow(attempts - 1)).await; |
| 96 | + log::debug!("API provider reconnected"); |
| 97 | + } |
| 98 | + } |
| 99 | +} |
88 | 100 |
|
89 | | - match this.api_provider.reconnect().await { |
90 | | - Ok(()) => { |
91 | | - log::info!("API provider reconnected"); |
92 | | - } |
| 101 | +async fn fetch_hash_auth_id( |
| 102 | + gear_api: &GearApi, |
| 103 | + block_number_gear: u32, |
| 104 | +) -> anyhow::Result<(H256, AuthoritySetId)> { |
| 105 | + let block_hash = gear_api.block_number_to_hash(block_number_gear).await?; |
93 | 106 |
|
94 | | - Err(err) => { |
95 | | - log::error!("Merkle root extractor unable to reconnect: {err}"); |
96 | | - return; |
97 | | - } |
98 | | - } |
| 107 | + let authority_set_id = AuthoritySetId(gear_api.signed_by_authority_set_id(block_hash).await?); |
99 | 108 |
|
100 | | - if common::is_transport_error_recoverable(&err) { |
101 | | - this.eth_api = match this.eth_api.reconnect().await { |
102 | | - Ok(eth_api) => eth_api, |
103 | | - Err(err) => { |
104 | | - log::error!("Failed to reconnect to Ethereum: {err}"); |
105 | | - break; |
106 | | - } |
107 | | - }; |
108 | | - } else { |
109 | | - log::error!("Merkle root extractor failed: {err}"); |
110 | | - break; |
111 | | - } |
112 | | - } else { |
| 109 | + Ok((block_hash, authority_set_id)) |
| 110 | +} |
| 111 | + |
| 112 | +async fn task(mut this: MerkleRootExtractor) { |
| 113 | + loop { |
| 114 | + let Err(err) = task_inner(&mut this).await else { |
113 | 115 | log::info!("Exiting"); |
114 | 116 | break; |
115 | | - } |
| 117 | + }; |
| 118 | + |
| 119 | + log::error!(r#"Merkle root extractor failed: "{err:?}""#); |
| 120 | + |
| 121 | + this.eth_api = match this.eth_api.reconnect().await { |
| 122 | + Ok(eth_api) => eth_api, |
| 123 | + Err(err) => { |
| 124 | + log::error!(r#"Failed to reconnect to Ethereum: "{err}""#); |
| 125 | + break; |
| 126 | + } |
| 127 | + }; |
116 | 128 | } |
117 | 129 | } |
118 | 130 |
|
119 | | -async fn task_inner(this: &MerkleRootExtractor) -> anyhow::Result<()> { |
120 | | - let gear_api = this.api_provider.client(); |
| 131 | +async fn task_inner(this: &mut MerkleRootExtractor) -> anyhow::Result<()> { |
121 | 132 | let subscription = this.eth_api.subscribe_logs().await?; |
122 | 133 |
|
123 | 134 | let mut stream = subscription.into_result_stream(); |
@@ -185,24 +196,25 @@ async fn task_inner(this: &MerkleRootExtractor) -> anyhow::Result<()> { |
185 | 196 | .latest_merkle_root_for_block |
186 | 197 | .set(block_number_gear as i64); |
187 | 198 |
|
188 | | - let block_hash = gear_api.block_number_to_hash(block_number_gear).await?; |
189 | | - |
190 | | - let authority_set_id = |
191 | | - AuthoritySetId(gear_api.signed_by_authority_set_id(block_hash).await?); |
| 199 | + let Some((block_hash, authority_set_id)) = this.fetch_hash_auth_id(block_number_gear).await else { |
| 200 | + return Ok(()); |
| 201 | + }; |
192 | 202 |
|
193 | 203 | log::info!( |
194 | 204 | "Merkle root {:?} is for era #{authority_set_id}", |
195 | 205 | (root.blockNumber, root.merkleRoot), |
196 | 206 | ); |
197 | 207 |
|
198 | | - this.sender.send(RelayedMerkleRoot { |
| 208 | + if let Err(e) = this.sender.send(RelayedMerkleRoot { |
199 | 209 | block: GearBlockNumber(block_number_gear), |
200 | 210 | block_hash, |
201 | 211 | authority_set_id, |
202 | 212 | merkle_root: root.merkleRoot.0.into(), |
203 | 213 | timestamp: block_timestamp, |
204 | | - })?; |
205 | | - |
| 214 | + }) { |
| 215 | + log::error!(r#"Sender channel closed: "{e:?}"."#); |
| 216 | + return Ok(()); |
| 217 | + } |
206 | 218 | } |
207 | 219 | } |
208 | 220 | } |
|
0 commit comments