From bda40433435d1ba76def1b32f1656add0a3e76ea Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Tue, 5 Nov 2024 17:56:10 +0530 Subject: [PATCH 1/5] pipe function for watcher.rs and replaced usage --- common/src/watcher.rs | 18 ++++++++++ tap-agent/src/agent/sender_account.rs | 26 +++++++------- .../src/agent/sender_accounts_manager.rs | 36 ++++++++----------- 3 files changed, 45 insertions(+), 35 deletions(-) diff --git a/common/src/watcher.rs b/common/src/watcher.rs index 95e57fd83..20d0a94d3 100644 --- a/common/src/watcher.rs +++ b/common/src/watcher.rs @@ -11,6 +11,7 @@ use std::{future::Future, time::Duration}; use tokio::{ select, sync::watch, + task::JoinHandle, time::{self, sleep}, }; use tracing::warn; @@ -84,3 +85,20 @@ where }); rx } + +// Replacement for pipe_async function in eventuals +// Listen to the changes in a receiver and runs parametric function +pub fn watch_pipe(rx: watch::Receiver, function: F) -> JoinHandle<()> +where + T: Clone + Send + Sync + 'static, + F: Fn(watch::Receiver) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, +{ + tokio::spawn(async move { + let mut rx = rx; + function(rx.clone()).await; + while rx.changed().await.is_ok() { + function(rx.clone()).await; + } + }) +} diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 675aae38d..fc458ef0c 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -9,6 +9,7 @@ use bigdecimal::ToPrimitive; use futures::{stream, StreamExt}; use graphql_client::GraphQLQuery; +use indexer_common::watcher::watch_pipe; use jsonrpsee::http_client::HttpClientBuilder; use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; use reqwest::Url; @@ -496,30 +497,27 @@ impl Actor for SenderAccount { }: Self::Arguments, ) -> std::result::Result { let myself_clone = myself.clone(); - let _indexer_allocations_handle = tokio::spawn(async move { - let mut indexer_allocations = indexer_allocations.clone(); - loop { - let allocation_ids = indexer_allocations.borrow().clone(); + let _indexer_allocations_handle = watch_pipe(indexer_allocations, move |rx| { + let myself = myself_clone.clone(); + async move { + let allocation_ids = rx.borrow().clone(); // Update the allocation_ids - myself_clone + myself .cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids)) .unwrap_or_else(|e| { error!("Error while updating allocation_ids: {:?}", e); }); - if indexer_allocations.changed().await.is_err() { - break; - } } }); let myself_clone = myself.clone(); let pgpool_clone = pgpool.clone(); - let mut accounts_clone = escrow_accounts.clone(); - let _escrow_account_monitor = tokio::spawn(async move { - while accounts_clone.changed().await.is_ok() { - let escrow_account = accounts_clone.borrow().clone(); - let myself = myself_clone.clone(); - let pgpool = pgpool_clone.clone(); + let accounts_clone = escrow_accounts.clone(); + let _escrow_account_monitor = watch_pipe(accounts_clone, move |rx| { + let escrow_account = rx.borrow().clone(); + let myself = myself_clone.clone(); + let pgpool = pgpool_clone.clone(); + async move { // Get balance or default value for sender // this balance already takes into account thawing let balance = escrow_account diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index af6c38e2d..75b543a27 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -5,6 +5,9 @@ use std::collections::HashSet; use std::time::Duration; use std::{collections::HashMap, str::FromStr}; +use super::sender_account::{ + SenderAccount, SenderAccountArgs, SenderAccountConfig, SenderAccountMessage, +}; use crate::agent::sender_allocation::SenderAllocationMessage; use crate::lazy_static; use alloy::dyn_abi::Eip712Domain; @@ -14,6 +17,8 @@ use anyhow::{anyhow, bail}; use futures::{stream, StreamExt}; use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::prelude::{Allocation, SubgraphClient}; +use indexer_common::watcher::watch_pipe; +use prometheus::{register_counter_vec, CounterVec}; use ractor::concurrency::JoinHandle; use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent}; use reqwest::Url; @@ -23,12 +28,6 @@ use tokio::select; use tokio::sync::watch::{self, Receiver}; use tracing::{error, warn}; -use prometheus::{register_counter_vec, CounterVec}; - -use super::sender_account::{ - SenderAccount, SenderAccountArgs, SenderAccountConfig, SenderAccountMessage, -}; - lazy_static! { static ref RECEIPTS_CREATED: CounterVec = register_counter_vec!( "tap_receipts_received_total", @@ -106,17 +105,11 @@ impl Actor for SenderAccountsManager { }: Self::Arguments, ) -> std::result::Result { let (allocations_tx, allocations_rx) = watch::channel(HashSet::
::new()); - tokio::spawn(async move { - let mut indexer_allocations = indexer_allocations.clone(); - while indexer_allocations.changed().await.is_ok() { + watch_pipe(indexer_allocations.clone(), move |rx| { + let allocations_tx = allocations_tx.clone(); + async move { allocations_tx - .send( - indexer_allocations - .borrow() - .keys() - .cloned() - .collect::>(), - ) + .send(rx.borrow().keys().cloned().collect::>()) .expect("Failed to update indexer_allocations_set channel"); } }); @@ -129,12 +122,13 @@ impl Actor for SenderAccountsManager { 'scalar_tap_receipt_notification'", ); let myself_clone = myself.clone(); - let mut accounts_clone = escrow_accounts.clone(); - let _eligible_allocations_senders_handle = tokio::spawn(async move { - while accounts_clone.changed().await.is_ok() { - myself_clone + let accounts_clone = escrow_accounts.clone(); + let _eligible_allocations_senders_handle = watch_pipe(accounts_clone, move |rx| { + let myself = myself_clone.clone(); + async move { + myself .cast(SenderAccountsManagerMessage::UpdateSenderAccounts( - accounts_clone.borrow().get_senders(), + rx.borrow().get_senders(), )) .unwrap_or_else(|e| { error!("Error while updating sender_accounts: {:?}", e); From cac2dbafc451d85bcfe89aa83e103101b47fe3c2 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Thu, 7 Nov 2024 10:48:47 +0530 Subject: [PATCH 2/5] Fn(T)->Fut --- common/src/watcher.rs | 19 +++++++++--- tap-agent/src/agent/sender_account.rs | 6 ++-- .../src/agent/sender_accounts_manager.rs | 29 ++++++++++--------- 3 files changed, 32 insertions(+), 22 deletions(-) diff --git a/common/src/watcher.rs b/common/src/watcher.rs index 20d0a94d3..213d383a4 100644 --- a/common/src/watcher.rs +++ b/common/src/watcher.rs @@ -91,14 +91,25 @@ where pub fn watch_pipe(rx: watch::Receiver, function: F) -> JoinHandle<()> where T: Clone + Send + Sync + 'static, - F: Fn(watch::Receiver) -> Fut + Send + Sync + 'static, + F: Fn(T) -> Fut + Send + Sync + 'static, Fut: Future + Send + 'static, { tokio::spawn(async move { let mut rx = rx; - function(rx.clone()).await; - while rx.changed().await.is_ok() { - function(rx.clone()).await; + let value = rx.borrow().clone(); + function(value).await; + loop { + let res = rx.changed().await; + match res { + Ok(_) => { + let value = rx.borrow().clone(); + function(value).await; + } + Err(err) => { + warn!("{err}"); + break; + } + }; } }) } diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index fc458ef0c..f06a76e0e 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -497,10 +497,9 @@ impl Actor for SenderAccount { }: Self::Arguments, ) -> std::result::Result { let myself_clone = myself.clone(); - let _indexer_allocations_handle = watch_pipe(indexer_allocations, move |rx| { + let _indexer_allocations_handle = watch_pipe(indexer_allocations, move |allocation_ids| { let myself = myself_clone.clone(); async move { - let allocation_ids = rx.borrow().clone(); // Update the allocation_ids myself .cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids)) @@ -513,8 +512,7 @@ impl Actor for SenderAccount { let myself_clone = myself.clone(); let pgpool_clone = pgpool.clone(); let accounts_clone = escrow_accounts.clone(); - let _escrow_account_monitor = watch_pipe(accounts_clone, move |rx| { - let escrow_account = rx.borrow().clone(); + let _escrow_account_monitor = watch_pipe(accounts_clone, move |escrow_account| { let myself = myself_clone.clone(); let pgpool = pgpool_clone.clone(); async move { diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index 75b543a27..9d63c2c27 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -105,11 +105,12 @@ impl Actor for SenderAccountsManager { }: Self::Arguments, ) -> std::result::Result { let (allocations_tx, allocations_rx) = watch::channel(HashSet::
::new()); - watch_pipe(indexer_allocations.clone(), move |rx| { + watch_pipe(indexer_allocations.clone(), move |allocation_id| { let allocations_tx = allocations_tx.clone(); + let allocation_set = allocation_id.keys().cloned().collect::>(); async move { allocations_tx - .send(rx.borrow().keys().cloned().collect::>()) + .send(allocation_set) .expect("Failed to update indexer_allocations_set channel"); } }); @@ -123,18 +124,18 @@ impl Actor for SenderAccountsManager { ); let myself_clone = myself.clone(); let accounts_clone = escrow_accounts.clone(); - let _eligible_allocations_senders_handle = watch_pipe(accounts_clone, move |rx| { - let myself = myself_clone.clone(); - async move { - myself - .cast(SenderAccountsManagerMessage::UpdateSenderAccounts( - rx.borrow().get_senders(), - )) - .unwrap_or_else(|e| { - error!("Error while updating sender_accounts: {:?}", e); - }); - } - }); + let _eligible_allocations_senders_handle = + watch_pipe(accounts_clone, move |escrow_accounts| { + let myself = myself_clone.clone(); + let senders = escrow_accounts.get_senders(); + async move { + myself + .cast(SenderAccountsManagerMessage::UpdateSenderAccounts(senders)) + .unwrap_or_else(|e| { + error!("Error while updating sender_accounts: {:?}", e); + }); + } + }); let mut state = State { config, From 9ed5fb014457e386f9192dc45dd262cac6dd201f Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Thu, 7 Nov 2024 11:04:59 +0530 Subject: [PATCH 3/5] Fn(Ref<'_,T>)->Fut --- common/src/watcher.rs | 8 ++++---- tap-agent/src/agent/sender_account.rs | 2 ++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/common/src/watcher.rs b/common/src/watcher.rs index 213d383a4..6d715510f 100644 --- a/common/src/watcher.rs +++ b/common/src/watcher.rs @@ -10,7 +10,7 @@ use std::{future::Future, time::Duration}; use tokio::{ select, - sync::watch, + sync::watch::{self, Ref}, task::JoinHandle, time::{self, sleep}, }; @@ -91,18 +91,18 @@ where pub fn watch_pipe(rx: watch::Receiver, function: F) -> JoinHandle<()> where T: Clone + Send + Sync + 'static, - F: Fn(T) -> Fut + Send + Sync + 'static, + F: Fn(Ref<'_, T>) -> Fut + Send + Sync + 'static, Fut: Future + Send + 'static, { tokio::spawn(async move { let mut rx = rx; - let value = rx.borrow().clone(); + let value = rx.borrow(); function(value).await; loop { let res = rx.changed().await; match res { Ok(_) => { - let value = rx.borrow().clone(); + let value = rx.borrow(); function(value).await; } Err(err) => { diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index f06a76e0e..ed6075604 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -499,6 +499,7 @@ impl Actor for SenderAccount { let myself_clone = myself.clone(); let _indexer_allocations_handle = watch_pipe(indexer_allocations, move |allocation_ids| { let myself = myself_clone.clone(); + let allocation_ids = allocation_ids.clone(); async move { // Update the allocation_ids myself @@ -515,6 +516,7 @@ impl Actor for SenderAccount { let _escrow_account_monitor = watch_pipe(accounts_clone, move |escrow_account| { let myself = myself_clone.clone(); let pgpool = pgpool_clone.clone(); + let escrow_account = escrow_account.clone(); async move { // Get balance or default value for sender // this balance already takes into account thawing From 0ca19b02625de27899ccd43a3e5858b94a85c202 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Fri, 8 Nov 2024 23:11:14 +0530 Subject: [PATCH 4/5] optimise --- tap-agent/src/agent/sender_account.rs | 26 +++++++++---------- .../src/agent/sender_accounts_manager.rs | 22 +++++++--------- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index ed6075604..2a80f0ea7 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -500,14 +500,13 @@ impl Actor for SenderAccount { let _indexer_allocations_handle = watch_pipe(indexer_allocations, move |allocation_ids| { let myself = myself_clone.clone(); let allocation_ids = allocation_ids.clone(); - async move { - // Update the allocation_ids - myself - .cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids)) - .unwrap_or_else(|e| { - error!("Error while updating allocation_ids: {:?}", e); - }); - } + // Update the allocation_ids + myself + .cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids)) + .unwrap_or_else(|e| { + error!("Error while updating allocation_ids: {:?}", e); + }); + async {} }); let myself_clone = myself.clone(); @@ -516,13 +515,12 @@ impl Actor for SenderAccount { let _escrow_account_monitor = watch_pipe(accounts_clone, move |escrow_account| { let myself = myself_clone.clone(); let pgpool = pgpool_clone.clone(); - let escrow_account = escrow_account.clone(); + // Get balance or default value for sender + // this balance already takes into account thawing + let balance = escrow_account + .get_balance_for_sender(&sender_id) + .unwrap_or_default(); async move { - // Get balance or default value for sender - // this balance already takes into account thawing - let balance = escrow_account - .get_balance_for_sender(&sender_id) - .unwrap_or_default(); let last_non_final_ravs = sqlx::query!( r#" SELECT allocation_id, value_aggregate diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index 9d63c2c27..cf170ea65 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -108,11 +108,10 @@ impl Actor for SenderAccountsManager { watch_pipe(indexer_allocations.clone(), move |allocation_id| { let allocations_tx = allocations_tx.clone(); let allocation_set = allocation_id.keys().cloned().collect::>(); - async move { - allocations_tx - .send(allocation_set) - .expect("Failed to update indexer_allocations_set channel"); - } + allocations_tx + .send(allocation_set) + .expect("Failed to update indexer_allocations_set channel"); + async {} }); let mut pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap(); pglistener @@ -128,13 +127,12 @@ impl Actor for SenderAccountsManager { watch_pipe(accounts_clone, move |escrow_accounts| { let myself = myself_clone.clone(); let senders = escrow_accounts.get_senders(); - async move { - myself - .cast(SenderAccountsManagerMessage::UpdateSenderAccounts(senders)) - .unwrap_or_else(|e| { - error!("Error while updating sender_accounts: {:?}", e); - }); - } + myself + .cast(SenderAccountsManagerMessage::UpdateSenderAccounts(senders)) + .unwrap_or_else(|e| { + error!("Error while updating sender_accounts: {:?}", e); + }); + async {} }); let mut state = State { From 0cbe9b132f3dff49860bfce2569da8fbbba1fbf8 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Sat, 9 Nov 2024 06:08:59 +0530 Subject: [PATCH 5/5] watch_pipe --- common/src/watcher.rs | 4 ++-- tap-agent/src/agent/sender_account.rs | 3 +-- tap-agent/src/agent/sender_accounts_manager.rs | 4 +--- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/common/src/watcher.rs b/common/src/watcher.rs index 6d715510f..b011b2796 100644 --- a/common/src/watcher.rs +++ b/common/src/watcher.rs @@ -14,7 +14,7 @@ use tokio::{ task::JoinHandle, time::{self, sleep}, }; -use tracing::warn; +use tracing::{error, warn}; /// Creates a new watcher that auto initializes it with initial_value /// and updates it given an interval @@ -106,7 +106,7 @@ where function(value).await; } Err(err) => { - warn!("{err}"); + error!("There was an error piping the watcher results: {err}"); break; } }; diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 2a80f0ea7..3140eddce 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -498,10 +498,9 @@ impl Actor for SenderAccount { ) -> std::result::Result { let myself_clone = myself.clone(); let _indexer_allocations_handle = watch_pipe(indexer_allocations, move |allocation_ids| { - let myself = myself_clone.clone(); let allocation_ids = allocation_ids.clone(); // Update the allocation_ids - myself + myself_clone .cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids)) .unwrap_or_else(|e| { error!("Error while updating allocation_ids: {:?}", e); diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index cf170ea65..e194609c7 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -106,7 +106,6 @@ impl Actor for SenderAccountsManager { ) -> std::result::Result { let (allocations_tx, allocations_rx) = watch::channel(HashSet::
::new()); watch_pipe(indexer_allocations.clone(), move |allocation_id| { - let allocations_tx = allocations_tx.clone(); let allocation_set = allocation_id.keys().cloned().collect::>(); allocations_tx .send(allocation_set) @@ -125,9 +124,8 @@ impl Actor for SenderAccountsManager { let accounts_clone = escrow_accounts.clone(); let _eligible_allocations_senders_handle = watch_pipe(accounts_clone, move |escrow_accounts| { - let myself = myself_clone.clone(); let senders = escrow_accounts.get_senders(); - myself + myself_clone .cast(SenderAccountsManagerMessage::UpdateSenderAccounts(senders)) .unwrap_or_else(|e| { error!("Error while updating sender_accounts: {:?}", e);