Skip to content

Commit aba5ac7

Browse files
taslimmuhammedgusinacio
authored andcommitted
watch_pipe
1 parent fee3baa commit aba5ac7

File tree

3 files changed

+4
-7
lines changed

3 files changed

+4
-7
lines changed

common/src/watcher.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use tokio::{
1414
task::JoinHandle,
1515
time::{self, sleep},
1616
};
17-
use tracing::warn;
17+
use tracing::{error, warn};
1818

1919
/// Creates a new watcher that auto initializes it with initial_value
2020
/// and updates it given an interval
@@ -106,7 +106,7 @@ where
106106
function(value).await;
107107
}
108108
Err(err) => {
109-
warn!("{err}");
109+
error!("There was an error piping the watcher results: {err}");
110110
break;
111111
}
112112
};

tap-agent/src/agent/sender_account.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -498,10 +498,9 @@ impl Actor for SenderAccount {
498498
) -> std::result::Result<Self::State, ActorProcessingErr> {
499499
let myself_clone = myself.clone();
500500
let _indexer_allocations_handle = watch_pipe(indexer_allocations, move |allocation_ids| {
501-
let myself = myself_clone.clone();
502501
let allocation_ids = allocation_ids.clone();
503502
// Update the allocation_ids
504-
myself
503+
myself_clone
505504
.cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids))
506505
.unwrap_or_else(|e| {
507506
error!("Error while updating allocation_ids: {:?}", e);

tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ impl Actor for SenderAccountsManager {
106106
) -> std::result::Result<Self::State, ActorProcessingErr> {
107107
let (allocations_tx, allocations_rx) = watch::channel(HashSet::<Address>::new());
108108
watch_pipe(indexer_allocations.clone(), move |allocation_id| {
109-
let allocations_tx = allocations_tx.clone();
110109
let allocation_set = allocation_id.keys().cloned().collect::<HashSet<Address>>();
111110
allocations_tx
112111
.send(allocation_set)
@@ -125,9 +124,8 @@ impl Actor for SenderAccountsManager {
125124
let accounts_clone = escrow_accounts.clone();
126125
let _eligible_allocations_senders_handle =
127126
watch_pipe(accounts_clone, move |escrow_accounts| {
128-
let myself = myself_clone.clone();
129127
let senders = escrow_accounts.get_senders();
130-
myself
128+
myself_clone
131129
.cast(SenderAccountsManagerMessage::UpdateSenderAccounts(senders))
132130
.unwrap_or_else(|e| {
133131
error!("Error while updating sender_accounts: {:?}", e);

0 commit comments

Comments
 (0)