Skip to content

Commit cf2eb15

Browse files
taslimmuhammedgusinacio
authored andcommitted
pipe function for watcher.rs and replaced usage
1 parent fcf9e33 commit cf2eb15

File tree

3 files changed

+45
-35
lines changed

3 files changed

+45
-35
lines changed

common/src/watcher.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::{future::Future, time::Duration};
1111
use tokio::{
1212
select,
1313
sync::watch,
14+
task::JoinHandle,
1415
time::{self, sleep},
1516
};
1617
use tracing::warn;
@@ -84,3 +85,20 @@ where
8485
});
8586
rx
8687
}
88+
89+
// Replacement for pipe_async function in eventuals
90+
// Listen to the changes in a receiver and runs parametric function
91+
pub fn watch_pipe<T, F, Fut>(rx: watch::Receiver<T>, function: F) -> JoinHandle<()>
92+
where
93+
T: Clone + Send + Sync + 'static,
94+
F: Fn(watch::Receiver<T>) -> Fut + Send + Sync + 'static,
95+
Fut: Future<Output = ()> + Send + 'static,
96+
{
97+
tokio::spawn(async move {
98+
let mut rx = rx;
99+
function(rx.clone()).await;
100+
while rx.changed().await.is_ok() {
101+
function(rx.clone()).await;
102+
}
103+
})
104+
}

tap-agent/src/agent/sender_account.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use bigdecimal::ToPrimitive;
99

1010
use futures::{stream, StreamExt};
1111
use graphql_client::GraphQLQuery;
12+
use indexer_common::watcher::watch_pipe;
1213
use jsonrpsee::http_client::HttpClientBuilder;
1314
use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
1415
use reqwest::Url;
@@ -496,30 +497,27 @@ impl Actor for SenderAccount {
496497
}: Self::Arguments,
497498
) -> std::result::Result<Self::State, ActorProcessingErr> {
498499
let myself_clone = myself.clone();
499-
let _indexer_allocations_handle = tokio::spawn(async move {
500-
let mut indexer_allocations = indexer_allocations.clone();
501-
loop {
502-
let allocation_ids = indexer_allocations.borrow().clone();
500+
let _indexer_allocations_handle = watch_pipe(indexer_allocations, move |rx| {
501+
let myself = myself_clone.clone();
502+
async move {
503+
let allocation_ids = rx.borrow().clone();
503504
// Update the allocation_ids
504-
myself_clone
505+
myself
505506
.cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids))
506507
.unwrap_or_else(|e| {
507508
error!("Error while updating allocation_ids: {:?}", e);
508509
});
509-
if indexer_allocations.changed().await.is_err() {
510-
break;
511-
}
512510
}
513511
});
514512

515513
let myself_clone = myself.clone();
516514
let pgpool_clone = pgpool.clone();
517-
let mut accounts_clone = escrow_accounts.clone();
518-
let _escrow_account_monitor = tokio::spawn(async move {
519-
while accounts_clone.changed().await.is_ok() {
520-
let escrow_account = accounts_clone.borrow().clone();
521-
let myself = myself_clone.clone();
522-
let pgpool = pgpool_clone.clone();
515+
let accounts_clone = escrow_accounts.clone();
516+
let _escrow_account_monitor = watch_pipe(accounts_clone, move |rx| {
517+
let escrow_account = rx.borrow().clone();
518+
let myself = myself_clone.clone();
519+
let pgpool = pgpool_clone.clone();
520+
async move {
523521
// Get balance or default value for sender
524522
// this balance already takes into account thawing
525523
let balance = escrow_account

tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ use std::collections::HashSet;
55
use std::time::Duration;
66
use std::{collections::HashMap, str::FromStr};
77

8+
use super::sender_account::{
9+
SenderAccount, SenderAccountArgs, SenderAccountConfig, SenderAccountMessage,
10+
};
811
use crate::agent::sender_allocation::SenderAllocationMessage;
912
use crate::lazy_static;
1013
use alloy::dyn_abi::Eip712Domain;
@@ -14,6 +17,8 @@ use anyhow::{anyhow, bail};
1417
use futures::{stream, StreamExt};
1518
use indexer_common::escrow_accounts::EscrowAccounts;
1619
use indexer_common::prelude::{Allocation, SubgraphClient};
20+
use indexer_common::watcher::watch_pipe;
21+
use prometheus::{register_counter_vec, CounterVec};
1722
use ractor::concurrency::JoinHandle;
1823
use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent};
1924
use reqwest::Url;
@@ -23,12 +28,6 @@ use tokio::select;
2328
use tokio::sync::watch::{self, Receiver};
2429
use tracing::{error, warn};
2530

26-
use prometheus::{register_counter_vec, CounterVec};
27-
28-
use super::sender_account::{
29-
SenderAccount, SenderAccountArgs, SenderAccountConfig, SenderAccountMessage,
30-
};
31-
3231
lazy_static! {
3332
static ref RECEIPTS_CREATED: CounterVec = register_counter_vec!(
3433
"tap_receipts_received_total",
@@ -106,17 +105,11 @@ impl Actor for SenderAccountsManager {
106105
}: Self::Arguments,
107106
) -> std::result::Result<Self::State, ActorProcessingErr> {
108107
let (allocations_tx, allocations_rx) = watch::channel(HashSet::<Address>::new());
109-
tokio::spawn(async move {
110-
let mut indexer_allocations = indexer_allocations.clone();
111-
while indexer_allocations.changed().await.is_ok() {
108+
watch_pipe(indexer_allocations.clone(), move |rx| {
109+
let allocations_tx = allocations_tx.clone();
110+
async move {
112111
allocations_tx
113-
.send(
114-
indexer_allocations
115-
.borrow()
116-
.keys()
117-
.cloned()
118-
.collect::<HashSet<Address>>(),
119-
)
112+
.send(rx.borrow().keys().cloned().collect::<HashSet<Address>>())
120113
.expect("Failed to update indexer_allocations_set channel");
121114
}
122115
});
@@ -129,12 +122,13 @@ impl Actor for SenderAccountsManager {
129122
'scalar_tap_receipt_notification'",
130123
);
131124
let myself_clone = myself.clone();
132-
let mut accounts_clone = escrow_accounts.clone();
133-
let _eligible_allocations_senders_handle = tokio::spawn(async move {
134-
while accounts_clone.changed().await.is_ok() {
135-
myself_clone
125+
let accounts_clone = escrow_accounts.clone();
126+
let _eligible_allocations_senders_handle = watch_pipe(accounts_clone, move |rx| {
127+
let myself = myself_clone.clone();
128+
async move {
129+
myself
136130
.cast(SenderAccountsManagerMessage::UpdateSenderAccounts(
137-
accounts_clone.borrow().get_senders(),
131+
rx.borrow().get_senders(),
138132
))
139133
.unwrap_or_else(|e| {
140134
error!("Error while updating sender_accounts: {:?}", e);

0 commit comments

Comments
 (0)