Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 32 additions & 21 deletions common/src/allocations/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ use std::{
use super::Allocation;
use crate::prelude::SubgraphClient;
use alloy::primitives::{TxHash, B256, U256};
use eventuals::{timer, Eventual, EventualExt};
use graphql_client::GraphQLQuery;
use thegraph_core::{Address, DeploymentId};
use tokio::time::sleep;
use tokio::{
sync::watch::{self, Receiver},
time::{self, sleep},
};
use tracing::warn;

type BigInt = U256;
Expand Down Expand Up @@ -61,30 +63,39 @@ pub fn indexer_allocations(
indexer_address: Address,
interval: Duration,
recently_closed_allocation_buffer: Duration,
) -> Eventual<HashMap<Address, Allocation>> {
// Refresh indexer allocations every now and then
timer(interval).map_with_retry(
move |_| async move {
get_allocations(
) -> Receiver<HashMap<Address, Allocation>> {
let (tx, rx) = watch::channel(HashMap::new());
tokio::spawn(async move {
// Refresh indexer allocations every now and then
let mut time_interval = time::interval(interval);
time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
loop {
time_interval.tick().await;
let result = get_allocations(
network_subgraph,
indexer_address,
recently_closed_allocation_buffer,
)
.await
.map_err(|e| e.to_string())
},
// Need to use string errors here because eventuals `map_with_retry` retries
// errors that can be cloned
move |err: String| {
warn!(
"Failed to fetch active or recently closed allocations for indexer {:?}: {}",
indexer_address, err
);

// Sleep for a bit before we retry
sleep(interval.div_f32(2.0))
},
)
.map_err(|e| e.to_string());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You dont need to map to strings here.

match result {
Ok(allocations) => {
tx.send(allocations)
.expect("Failed to update indexer_allocations channel");
}
Err(err) => {
warn!(
"Failed to fetch active or recently closed allocations for indexer {:?}: {}",
indexer_address, err
);

// Sleep for a bit before we retry
sleep(interval.div_f32(2.0)).await;
}
}
}
});
rx
}

pub async fn get_allocations(
Expand Down
31 changes: 9 additions & 22 deletions common/src/attestations/signers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use eventuals::{Eventual, EventualExt};
use std::collections::HashMap;
use std::sync::Arc;
use thegraph_core::{Address, ChainId};
Expand All @@ -18,31 +17,19 @@ use crate::prelude::{Allocation, AttestationSigner};

/// An always up-to-date list of attestation signers, one for each of the indexer's allocations.
pub async fn attestation_signers(
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
mut indexer_allocations_rx: Receiver<HashMap<Address, Allocation>>,
indexer_mnemonic: String,
chain_id: ChainId,
mut dispute_manager_rx: Receiver<Option<Address>>,
) -> Receiver<HashMap<Address, AttestationSigner>> {
let attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>> =
Box::leak(Box::new(Mutex::new(HashMap::new())));

// Actively listening to indexer_allocations to update allocations channel
// Temporary fix until the indexer_allocations is migrated to tokio watch
let (allocations_tx, mut allocations_rx) =
watch::channel(indexer_allocations.value_immediate().unwrap_or_default());
indexer_allocations
.pipe(move |allocatons| {
allocations_tx
.send(allocatons)
.expect("Failed to update allocations channel");
})
.forever();

let starter_signers_map = modify_sigers(
Arc::new(indexer_mnemonic.clone()),
chain_id,
attestation_signers_map,
allocations_rx.clone(),
indexer_allocations_rx.clone(),
dispute_manager_rx.clone(),
)
.await;
Expand All @@ -53,12 +40,12 @@ pub async fn attestation_signers(
tokio::spawn(async move {
loop {
let updated_signers = select! {
Ok(())= allocations_rx.changed() =>{
Ok(())= indexer_allocations_rx.changed() =>{
modify_sigers(
Arc::new(indexer_mnemonic.clone()),
chain_id,
attestation_signers_map,
allocations_rx.clone(),
indexer_allocations_rx.clone(),
dispute_manager_rx.clone(),
).await
},
Expand All @@ -67,7 +54,7 @@ pub async fn attestation_signers(
Arc::new(indexer_mnemonic.clone()),
chain_id,
attestation_signers_map,
allocations_rx.clone(),
indexer_allocations_rx.clone(),
dispute_manager_rx.clone()
).await
},
Expand Down Expand Up @@ -129,27 +116,27 @@ mod tests {

#[tokio::test]
async fn test_attestation_signers_update_with_allocations() {
let (mut allocations_writer, allocations) = Eventual::<HashMap<Address, Allocation>>::new();
let (allocations_tx, allcoations_rx) = watch::channel(HashMap::new());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo allocations_rx

let (dispute_manager_tx, dispute_manager_rx) = watch::channel(None);
dispute_manager_tx
.send(Some(*DISPUTE_MANAGER_ADDRESS))
.unwrap();
let mut signers = attestation_signers(
allocations,
allcoations_rx,
(*INDEXER_OPERATOR_MNEMONIC).to_string(),
1,
dispute_manager_rx,
)
.await;

// Test that an empty set of allocations leads to an empty set of signers
allocations_writer.write(HashMap::new());
allocations_tx.send(HashMap::new()).unwrap();
signers.changed().await.unwrap();
let latest_signers = signers.borrow().clone();
assert_eq!(latest_signers, HashMap::new());

// Test that writing our set of test allocations results in corresponding signers for all of them
allocations_writer.write((*INDEXER_ALLOCATIONS).clone());
allocations_tx.send((*INDEXER_ALLOCATIONS).clone()).unwrap();
signers.changed().await.unwrap();
let latest_signers = signers.borrow().clone();
assert_eq!(latest_signers.len(), INDEXER_ALLOCATIONS.len());
Expand Down
3 changes: 2 additions & 1 deletion common/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::Duration;
use std::{collections::HashMap, sync::Arc};
use tap_core::receipt::checks::ReceiptCheck;
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::watch::Receiver;
use tokio_util::sync::CancellationToken;
use tracing::error;

Expand All @@ -38,7 +39,7 @@ pub enum AdapterError {
impl IndexerTapContext {
pub async fn get_checks(
pgpool: PgPool,
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
indexer_allocations: Receiver<HashMap<Address, Allocation>>,
escrow_accounts: Eventual<EscrowAccounts>,
domain_separator: Eip712Domain,
timestamp_error_tolerance: Duration,
Expand Down
12 changes: 5 additions & 7 deletions common/src/tap/checks/allocation_eligible.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ use std::collections::HashMap;

use alloy::primitives::Address;
use anyhow::anyhow;
use eventuals::Eventual;

use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
};
use tokio::sync::watch::Receiver;

use crate::prelude::Allocation;
pub struct AllocationEligible {
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
indexer_allocations: Receiver<HashMap<Address, Allocation>>,
}

impl AllocationEligible {
pub fn new(indexer_allocations: Eventual<HashMap<Address, Allocation>>) -> Self {
pub fn new(indexer_allocations: Receiver<HashMap<Address, Allocation>>) -> Self {
Self {
indexer_allocations,
}
Expand All @@ -31,10 +31,8 @@ impl Check for AllocationEligible {
let allocation_id = receipt.signed_receipt().message.allocation_id;
if !self
.indexer_allocations
.value()
.await
.map(|allocations| allocations.contains_key(&allocation_id))
.unwrap_or(false)
.borrow()
.contains_key(&allocation_id)
{
return Err(CheckError::Failed(anyhow!(
"Receipt allocation ID `{}` is not eligible for this indexer",
Expand Down
37 changes: 20 additions & 17 deletions tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeV
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::time::Duration;
use tokio::sync::watch::Receiver;
use tokio::task::JoinHandle;

use alloy::dyn_abi::Eip712Domain;
Expand Down Expand Up @@ -119,7 +120,7 @@ pub struct SenderAccountArgs {
pub pgpool: PgPool,
pub sender_id: Address,
pub escrow_accounts: Eventual<EscrowAccounts>,
pub indexer_allocations: Eventual<HashSet<Address>>,
pub indexer_allocations: Receiver<HashSet<Address>>,
pub escrow_subgraph: &'static SubgraphClient,
pub domain_separator: Eip712Domain,
pub sender_aggregator_endpoint: String,
Expand All @@ -134,7 +135,7 @@ pub struct State {
rav_tracker: SenderFeeTracker,
invalid_receipts_tracker: SenderFeeTracker,
allocation_ids: HashSet<Address>,
_indexer_allocations_handle: PipeHandle,
_indexer_allocations_handle: JoinHandle<()>,
_escrow_account_monitor: PipeHandle,
scheduled_rav_request: Option<JoinHandle<Result<(), MessagingErr<SenderAccountMessage>>>>,

Expand Down Expand Up @@ -333,20 +334,21 @@ impl Actor for SenderAccount {
}: Self::Arguments,
) -> std::result::Result<Self::State, ActorProcessingErr> {
let myself_clone = myself.clone();
let _indexer_allocations_handle =
indexer_allocations
.clone()
.pipe_async(move |allocation_ids| {
let myself = myself_clone.clone();
async move {
// Update the allocation_ids
myself
.cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids))
.unwrap_or_else(|e| {
error!("Error while updating allocation_ids: {:?}", e);
});
}
});
let _indexer_allocations_handle = tokio::spawn(async move {
let mut indexer_allocations = indexer_allocations.clone();
loop {
let allocation_ids = indexer_allocations.borrow().clone();
// Update the allocation_ids
myself_clone
.cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids))
.unwrap_or_else(|e| {
error!("Error while updating allocation_ids: {:?}", e);
});
if indexer_allocations.changed().await.is_err() {
break;
}
}
});

let myself_clone = myself.clone();
let pgpool_clone = pgpool.clone();
Expand Down Expand Up @@ -938,6 +940,7 @@ pub mod tests {
use std::sync::atomic::AtomicU32;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::watch;
use wiremock::matchers::{body_string_contains, method};
use wiremock::{Mock, MockServer, ResponseTemplate};

Expand Down Expand Up @@ -1039,7 +1042,7 @@ pub mod tests {
pgpool,
sender_id: SENDER.1,
escrow_accounts: escrow_accounts_eventual,
indexer_allocations: Eventual::from_value(initial_allocation),
indexer_allocations: watch::channel(initial_allocation).1,
escrow_subgraph,
domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(),
sender_aggregator_endpoint: DUMMY_URL.to_string(),
Expand Down
Loading
Loading