1+ use crate :: message_relayer:: eth_to_gear:: api_provider:: ApiProviderConnection ;
12use alloy_primitives:: FixedBytes ;
23use eth_events_electra_client:: EthToVaraEvent ;
34use futures:: executor:: block_on;
45use gclient:: GearApi ;
6+ use gear_common:: UNITS ;
57use historical_proxy_client:: { traits:: HistoricalProxy as _, HistoricalProxy } ;
68use primitive_types:: H256 ;
79use prometheus:: {
@@ -14,15 +16,13 @@ use sails_rs::{
1416 Encode ,
1517} ;
1618use tokio:: {
17- sync:: mpsc:: { unbounded_channel, UnboundedReceiver , UnboundedSender } ,
19+ sync:: mpsc:: { error :: TryRecvError , unbounded_channel, UnboundedReceiver , UnboundedSender } ,
1820 task:: spawn_blocking,
1921} ;
2022use utils_prometheus:: { impl_metered_service, MeteredService } ;
2123use uuid:: Uuid ;
2224use vft_manager_client:: vft_manager:: io:: SubmitReceipt ;
2325
24- use crate :: message_relayer:: eth_to_gear:: api_provider:: ApiProviderConnection ;
25-
2626pub struct MessageSenderIo {
2727 requests_channel : UnboundedSender < Request > ,
2828 responses_channel : UnboundedReceiver < Response > ,
@@ -55,8 +55,8 @@ impl MessageSenderIo {
5555 . is_ok ( )
5656 }
5757
58- pub async fn recv ( & mut self ) -> Option < Response > {
59- self . responses_channel . recv ( ) . await
58+ pub fn try_recv ( & mut self ) -> Result < Response , TryRecvError > {
59+ self . responses_channel . try_recv ( )
6060 }
6161}
6262
@@ -161,6 +161,7 @@ impl MessageSender {
161161 responses : & mut UnboundedSender < Response > ,
162162 ) -> anyhow:: Result < ( ) > {
163163 let gear_api = self . api_provider . gclient_client ( & self . suri ) ?;
164+ self . update_balance_metric ( & gear_api) . await ?;
164165
165166 if let Some ( request) = self . last_request . take ( ) {
166167 match self . process ( responses, & gear_api, & request) . await {
@@ -329,6 +330,7 @@ impl MessageSender {
329330 }
330331 }
331332 }
333+
332334 Ok ( true )
333335 }
334336
@@ -338,7 +340,7 @@ impl MessageSender {
338340 . await
339341 . map_err ( |e| anyhow:: anyhow!( "Unable to get total balance: {e:?}" ) ) ?;
340342
341- let balance = balance / 1_000_000_000_000 ;
343+ let balance = balance / UNITS ;
342344 let balance: i64 = balance. try_into ( ) . unwrap_or ( i64:: MAX ) ;
343345
344346 self . metrics . fee_payer_balance . set ( balance) ;
@@ -358,24 +360,20 @@ async fn task(
358360 break ;
359361 }
360362
361- match this. run_inner ( & mut requests, & mut responses) . await {
362- Ok ( ( ) ) => {
363- log:: warn!( "Transaction manager connection terminated, exiting..." ) ;
364- break ;
363+ let Err ( err) = this. run_inner ( & mut requests, & mut responses) . await else {
364+ log:: warn!( "Transaction manager connection terminated, exiting..." ) ;
365+ break ;
366+ } ;
367+
368+ log:: error!( "Gear message sender got an error: {err:?}" ) ;
369+ match this. api_provider . reconnect ( ) . await {
370+ Ok ( _) => {
371+ log:: info!( "Reconnected to Gear API" ) ;
365372 }
366373
367374 Err ( err) => {
368- log:: error!( "Gear message sender got an error: {err:?}" ) ;
369- match this. api_provider . reconnect ( ) . await {
370- Ok ( ( ) ) => {
371- log:: info!( "Reconnected to Gear API" ) ;
372- }
373-
374- Err ( err) => {
375- log:: error!( "Failed to reconnect to Gear API: {err:?}" ) ;
376- break ;
377- }
378- }
375+ log:: error!( "Failed to reconnect to Gear API: {err:?}" ) ;
376+ break ;
379377 }
380378 }
381379 }
0 commit comments