Skip to content

Commit c42f1d8

Browse files
taslimmuhammedgusinacio
authored andcommitted
Fn(T)->Fut
1 parent cf2eb15 commit c42f1d8

File tree

3 files changed

+32
-22
lines changed

3 files changed

+32
-22
lines changed

common/src/watcher.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,25 @@ where
9191
pub fn watch_pipe<T, F, Fut>(rx: watch::Receiver<T>, function: F) -> JoinHandle<()>
9292
where
9393
T: Clone + Send + Sync + 'static,
94-
F: Fn(watch::Receiver<T>) -> Fut + Send + Sync + 'static,
94+
F: Fn(T) -> Fut + Send + Sync + 'static,
9595
Fut: Future<Output = ()> + Send + 'static,
9696
{
9797
tokio::spawn(async move {
9898
let mut rx = rx;
99-
function(rx.clone()).await;
100-
while rx.changed().await.is_ok() {
101-
function(rx.clone()).await;
99+
let value = rx.borrow().clone();
100+
function(value).await;
101+
loop {
102+
let res = rx.changed().await;
103+
match res {
104+
Ok(_) => {
105+
let value = rx.borrow().clone();
106+
function(value).await;
107+
}
108+
Err(err) => {
109+
warn!("{err}");
110+
break;
111+
}
112+
};
102113
}
103114
})
104115
}

tap-agent/src/agent/sender_account.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -497,10 +497,9 @@ impl Actor for SenderAccount {
497497
}: Self::Arguments,
498498
) -> std::result::Result<Self::State, ActorProcessingErr> {
499499
let myself_clone = myself.clone();
500-
let _indexer_allocations_handle = watch_pipe(indexer_allocations, move |rx| {
500+
let _indexer_allocations_handle = watch_pipe(indexer_allocations, move |allocation_ids| {
501501
let myself = myself_clone.clone();
502502
async move {
503-
let allocation_ids = rx.borrow().clone();
504503
// Update the allocation_ids
505504
myself
506505
.cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids))
@@ -513,8 +512,7 @@ impl Actor for SenderAccount {
513512
let myself_clone = myself.clone();
514513
let pgpool_clone = pgpool.clone();
515514
let accounts_clone = escrow_accounts.clone();
516-
let _escrow_account_monitor = watch_pipe(accounts_clone, move |rx| {
517-
let escrow_account = rx.borrow().clone();
515+
let _escrow_account_monitor = watch_pipe(accounts_clone, move |escrow_account| {
518516
let myself = myself_clone.clone();
519517
let pgpool = pgpool_clone.clone();
520518
async move {

tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,12 @@ impl Actor for SenderAccountsManager {
105105
}: Self::Arguments,
106106
) -> std::result::Result<Self::State, ActorProcessingErr> {
107107
let (allocations_tx, allocations_rx) = watch::channel(HashSet::<Address>::new());
108-
watch_pipe(indexer_allocations.clone(), move |rx| {
108+
watch_pipe(indexer_allocations.clone(), move |allocation_id| {
109109
let allocations_tx = allocations_tx.clone();
110+
let allocation_set = allocation_id.keys().cloned().collect::<HashSet<Address>>();
110111
async move {
111112
allocations_tx
112-
.send(rx.borrow().keys().cloned().collect::<HashSet<Address>>())
113+
.send(allocation_set)
113114
.expect("Failed to update indexer_allocations_set channel");
114115
}
115116
});
@@ -123,18 +124,18 @@ impl Actor for SenderAccountsManager {
123124
);
124125
let myself_clone = myself.clone();
125126
let accounts_clone = escrow_accounts.clone();
126-
let _eligible_allocations_senders_handle = watch_pipe(accounts_clone, move |rx| {
127-
let myself = myself_clone.clone();
128-
async move {
129-
myself
130-
.cast(SenderAccountsManagerMessage::UpdateSenderAccounts(
131-
rx.borrow().get_senders(),
132-
))
133-
.unwrap_or_else(|e| {
134-
error!("Error while updating sender_accounts: {:?}", e);
135-
});
136-
}
137-
});
127+
let _eligible_allocations_senders_handle =
128+
watch_pipe(accounts_clone, move |escrow_accounts| {
129+
let myself = myself_clone.clone();
130+
let senders = escrow_accounts.get_senders();
131+
async move {
132+
myself
133+
.cast(SenderAccountsManagerMessage::UpdateSenderAccounts(senders))
134+
.unwrap_or_else(|e| {
135+
error!("Error while updating sender_accounts: {:?}", e);
136+
});
137+
}
138+
});
138139

139140
let mut state = State {
140141
config,

0 commit comments

Comments
 (0)