diff --git a/common/src/allocations/monitor.rs b/common/src/allocations/monitor.rs index 8252f59ae..0a2ea3556 100644 --- a/common/src/allocations/monitor.rs +++ b/common/src/allocations/monitor.rs @@ -10,10 +10,12 @@ use std::{ use super::Allocation; use crate::prelude::SubgraphClient; use alloy::primitives::{TxHash, B256, U256}; -use eventuals::{timer, Eventual, EventualExt}; use graphql_client::GraphQLQuery; use thegraph_core::{Address, DeploymentId}; -use tokio::time::sleep; +use tokio::{ + sync::watch::{self, Receiver}, + time::{self, sleep}, +}; use tracing::warn; type BigInt = U256; @@ -61,30 +63,38 @@ pub fn indexer_allocations( indexer_address: Address, interval: Duration, recently_closed_allocation_buffer: Duration, -) -> Eventual> { - // Refresh indexer allocations every now and then - timer(interval).map_with_retry( - move |_| async move { - get_allocations( +) -> Receiver> { + let (tx, rx) = watch::channel(HashMap::new()); + tokio::spawn(async move { + // Refresh indexer allocations every now and then + let mut time_interval = time::interval(interval); + time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); + loop { + time_interval.tick().await; + let result = get_allocations( network_subgraph, indexer_address, recently_closed_allocation_buffer, ) - .await - .map_err(|e| e.to_string()) - }, - // Need to use string errors here because eventuals `map_with_retry` retries - // errors that can be cloned - move |err: String| { - warn!( - "Failed to fetch active or recently closed allocations for indexer {:?}: {}", - indexer_address, err - ); - - // Sleep for a bit before we retry - sleep(interval.div_f32(2.0)) - }, - ) + .await; + match result { + Ok(allocations) => { + tx.send(allocations) + .expect("Failed to update indexer_allocations channel"); + } + Err(err) => { + warn!( + "Failed to fetch active or recently closed allocations for indexer {:?}: {}", + indexer_address, err + ); + + // Sleep for a bit before we retry + sleep(interval.div_f32(2.0)).await; + } + } + } + }); + rx } pub async fn get_allocations( diff --git a/common/src/attestations/signers.rs b/common/src/attestations/signers.rs index 15eb0aec8..d3e777746 100644 --- a/common/src/attestations/signers.rs +++ b/common/src/attestations/signers.rs @@ -1,7 +1,6 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use eventuals::{Eventual, EventualExt}; use std::collections::HashMap; use std::sync::Arc; use thegraph_core::{Address, ChainId}; @@ -18,7 +17,7 @@ use crate::prelude::{Allocation, AttestationSigner}; /// An always up-to-date list of attestation signers, one for each of the indexer's allocations. pub async fn attestation_signers( - indexer_allocations: Eventual>, + mut indexer_allocations_rx: Receiver>, indexer_mnemonic: String, chain_id: ChainId, mut dispute_manager_rx: Receiver>, @@ -26,23 +25,11 @@ pub async fn attestation_signers( let attestation_signers_map: &'static Mutex> = Box::leak(Box::new(Mutex::new(HashMap::new()))); - // Actively listening to indexer_allocations to update allocations channel - // Temporary fix until the indexer_allocations is migrated to tokio watch - let (allocations_tx, mut allocations_rx) = - watch::channel(indexer_allocations.value_immediate().unwrap_or_default()); - indexer_allocations - .pipe(move |allocatons| { - allocations_tx - .send(allocatons) - .expect("Failed to update allocations channel"); - }) - .forever(); - let starter_signers_map = modify_sigers( Arc::new(indexer_mnemonic.clone()), chain_id, attestation_signers_map, - allocations_rx.clone(), + indexer_allocations_rx.clone(), dispute_manager_rx.clone(), ) .await; @@ -53,12 +40,12 @@ pub async fn attestation_signers( tokio::spawn(async move { loop { let updated_signers = select! { - Ok(())= allocations_rx.changed() =>{ + Ok(())= indexer_allocations_rx.changed() =>{ modify_sigers( Arc::new(indexer_mnemonic.clone()), chain_id, attestation_signers_map, - allocations_rx.clone(), + indexer_allocations_rx.clone(), dispute_manager_rx.clone(), ).await }, @@ -67,7 +54,7 @@ pub async fn attestation_signers( Arc::new(indexer_mnemonic.clone()), chain_id, attestation_signers_map, - allocations_rx.clone(), + indexer_allocations_rx.clone(), dispute_manager_rx.clone() ).await }, @@ -129,13 +116,13 @@ mod tests { #[tokio::test] async fn test_attestation_signers_update_with_allocations() { - let (mut allocations_writer, allocations) = Eventual::>::new(); + let (allocations_tx, allocations_rx) = watch::channel(HashMap::new()); let (dispute_manager_tx, dispute_manager_rx) = watch::channel(None); dispute_manager_tx .send(Some(*DISPUTE_MANAGER_ADDRESS)) .unwrap(); let mut signers = attestation_signers( - allocations, + allocations_rx, (*INDEXER_OPERATOR_MNEMONIC).to_string(), 1, dispute_manager_rx, @@ -143,13 +130,13 @@ mod tests { .await; // Test that an empty set of allocations leads to an empty set of signers - allocations_writer.write(HashMap::new()); + allocations_tx.send(HashMap::new()).unwrap(); signers.changed().await.unwrap(); let latest_signers = signers.borrow().clone(); assert_eq!(latest_signers, HashMap::new()); // Test that writing our set of test allocations results in corresponding signers for all of them - allocations_writer.write((*INDEXER_ALLOCATIONS).clone()); + allocations_tx.send((*INDEXER_ALLOCATIONS).clone()).unwrap(); signers.changed().await.unwrap(); let latest_signers = signers.borrow().clone(); assert_eq!(latest_signers.len(), INDEXER_ALLOCATIONS.len()); diff --git a/common/src/tap.rs b/common/src/tap.rs index f413f86b1..2d23ca922 100644 --- a/common/src/tap.rs +++ b/common/src/tap.rs @@ -17,6 +17,7 @@ use std::time::Duration; use std::{collections::HashMap, sync::Arc}; use tap_core::receipt::checks::ReceiptCheck; use tokio::sync::mpsc::{self, Sender}; +use tokio::sync::watch::Receiver; use tokio_util::sync::CancellationToken; use tracing::error; @@ -38,7 +39,7 @@ pub enum AdapterError { impl IndexerTapContext { pub async fn get_checks( pgpool: PgPool, - indexer_allocations: Eventual>, + indexer_allocations: Receiver>, escrow_accounts: Eventual, domain_separator: Eip712Domain, timestamp_error_tolerance: Duration, diff --git a/common/src/tap/checks/allocation_eligible.rs b/common/src/tap/checks/allocation_eligible.rs index fca26e62b..ee4e752c7 100644 --- a/common/src/tap/checks/allocation_eligible.rs +++ b/common/src/tap/checks/allocation_eligible.rs @@ -5,21 +5,21 @@ use std::collections::HashMap; use alloy::primitives::Address; use anyhow::anyhow; -use eventuals::Eventual; use tap_core::receipt::{ checks::{Check, CheckError, CheckResult}, state::Checking, ReceiptWithState, }; +use tokio::sync::watch::Receiver; use crate::prelude::Allocation; pub struct AllocationEligible { - indexer_allocations: Eventual>, + indexer_allocations: Receiver>, } impl AllocationEligible { - pub fn new(indexer_allocations: Eventual>) -> Self { + pub fn new(indexer_allocations: Receiver>) -> Self { Self { indexer_allocations, } @@ -31,10 +31,8 @@ impl Check for AllocationEligible { let allocation_id = receipt.signed_receipt().message.allocation_id; if !self .indexer_allocations - .value() - .await - .map(|allocations| allocations.contains_key(&allocation_id)) - .unwrap_or(false) + .borrow() + .contains_key(&allocation_id) { return Err(CheckError::Failed(anyhow!( "Receipt allocation ID `{}` is not eligible for this indexer", diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index ae4400fe5..dd31c3392 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -13,6 +13,7 @@ use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeV use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::time::Duration; +use tokio::sync::watch::Receiver; use tokio::task::JoinHandle; use alloy::dyn_abi::Eip712Domain; @@ -119,7 +120,7 @@ pub struct SenderAccountArgs { pub pgpool: PgPool, pub sender_id: Address, pub escrow_accounts: Eventual, - pub indexer_allocations: Eventual>, + pub indexer_allocations: Receiver>, pub escrow_subgraph: &'static SubgraphClient, pub domain_separator: Eip712Domain, pub sender_aggregator_endpoint: String, @@ -134,7 +135,7 @@ pub struct State { rav_tracker: SenderFeeTracker, invalid_receipts_tracker: SenderFeeTracker, allocation_ids: HashSet
, - _indexer_allocations_handle: PipeHandle, + _indexer_allocations_handle: JoinHandle<()>, _escrow_account_monitor: PipeHandle, scheduled_rav_request: Option>>>, @@ -333,20 +334,21 @@ impl Actor for SenderAccount { }: Self::Arguments, ) -> std::result::Result { let myself_clone = myself.clone(); - let _indexer_allocations_handle = - indexer_allocations - .clone() - .pipe_async(move |allocation_ids| { - let myself = myself_clone.clone(); - async move { - // Update the allocation_ids - myself - .cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids)) - .unwrap_or_else(|e| { - error!("Error while updating allocation_ids: {:?}", e); - }); - } - }); + let _indexer_allocations_handle = tokio::spawn(async move { + let mut indexer_allocations = indexer_allocations.clone(); + loop { + let allocation_ids = indexer_allocations.borrow().clone(); + // Update the allocation_ids + myself_clone + .cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids)) + .unwrap_or_else(|e| { + error!("Error while updating allocation_ids: {:?}", e); + }); + if indexer_allocations.changed().await.is_err() { + break; + } + } + }); let myself_clone = myself.clone(); let pgpool_clone = pgpool.clone(); @@ -938,6 +940,7 @@ pub mod tests { use std::sync::atomic::AtomicU32; use std::sync::{Arc, Mutex}; use std::time::Duration; + use tokio::sync::watch; use wiremock::matchers::{body_string_contains, method}; use wiremock::{Mock, MockServer, ResponseTemplate}; @@ -1039,7 +1042,7 @@ pub mod tests { pgpool, sender_id: SENDER.1, escrow_accounts: escrow_accounts_eventual, - indexer_allocations: Eventual::from_value(initial_allocation), + indexer_allocations: watch::channel(initial_allocation).1, escrow_subgraph, domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), sender_aggregator_endpoint: DUMMY_URL.to_string(), diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index 3543d74f6..aff314ec7 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -18,6 +18,7 @@ use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent}; use serde::Deserialize; use sqlx::{postgres::PgListener, PgPool}; use tokio::select; +use tokio::sync::watch::{self, Receiver}; use tracing::{error, warn}; use prometheus::{register_counter_vec, CounterVec}; @@ -55,7 +56,7 @@ pub struct SenderAccountsManagerArgs { pub domain_separator: Eip712Domain, pub pgpool: PgPool, - pub indexer_allocations: Eventual>, + pub indexer_allocations: Receiver>, pub escrow_accounts: Eventual, pub escrow_subgraph: &'static SubgraphClient, pub sender_aggregator_endpoints: HashMap, @@ -71,7 +72,7 @@ pub struct State { config: &'static config::Config, domain_separator: Eip712Domain, pgpool: PgPool, - indexer_allocations: Eventual>, + indexer_allocations: Receiver>, escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, sender_aggregator_endpoints: HashMap, @@ -98,8 +99,20 @@ impl Actor for SenderAccountsManager { prefix, }: Self::Arguments, ) -> std::result::Result { - let indexer_allocations = indexer_allocations.map(|allocations| async move { - allocations.keys().cloned().collect::>() + let (allocations_tx, allocations_rx) = watch::channel(HashSet::
::new()); + tokio::spawn(async move { + let mut indexer_allocations = indexer_allocations.clone(); + while indexer_allocations.changed().await.is_ok() { + allocations_tx + .send( + indexer_allocations + .borrow() + .keys() + .cloned() + .collect::>(), + ) + .expect("Failed to update indexer_allocations_set channel"); + } }); let mut pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap(); pglistener @@ -132,7 +145,7 @@ impl Actor for SenderAccountsManager { new_receipts_watcher_handle: None, _eligible_allocations_senders_pipe, pgpool, - indexer_allocations, + indexer_allocations: allocations_rx, escrow_accounts: escrow_accounts.clone(), escrow_subgraph, sender_aggregator_endpoints, @@ -587,9 +600,7 @@ mod tests { ALLOCATION_ID_1, INDEXER, SENDER, SENDER_2, SENDER_3, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, }; use alloy::hex::ToHexExt; - use alloy::primitives::Address; use eventuals::{Eventual, EventualExt}; - use indexer_common::allocations::Allocation; use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::prelude::{DeploymentDetails, SubgraphClient}; use ractor::concurrency::JoinHandle; @@ -599,7 +610,7 @@ mod tests { use sqlx::PgPool; use std::collections::{HashMap, HashSet}; use std::time::Duration; - use tokio::sync::mpsc; + use tokio::sync::{mpsc, watch}; const DUMMY_URL: &str = "http://localhost:1234"; @@ -634,9 +645,7 @@ mod tests { ) { let config = get_config(); - let (mut indexer_allocations_writer, indexer_allocations_eventual) = - Eventual::>::new(); - indexer_allocations_writer.write(HashMap::new()); + let (_allocations_tx, allocations_rx) = watch::channel(HashMap::new()); let escrow_subgraph = get_subgraph_client(); let (mut escrow_accounts_writer, escrow_accounts_eventual) = @@ -651,7 +660,7 @@ mod tests { config, domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), pgpool, - indexer_allocations: indexer_allocations_eventual, + indexer_allocations: allocations_rx, escrow_accounts: escrow_accounts_eventual, escrow_subgraph, sender_aggregator_endpoints: HashMap::from([ @@ -694,7 +703,7 @@ mod tests { _eligible_allocations_senders_pipe: Eventual::from_value(()) .pipe_async(|_| async {}), pgpool, - indexer_allocations: Eventual::from_value(HashSet::new()), + indexer_allocations: watch::channel(HashSet::new()).1, escrow_accounts: Eventual::from_value(escrow_accounts), escrow_subgraph: get_subgraph_client(), sender_aggregator_endpoints: HashMap::from([