Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 41 additions & 26 deletions common/src/allocations/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ use std::{
use super::Allocation;
use crate::prelude::SubgraphClient;
use alloy::primitives::{TxHash, B256, U256};
use eventuals::{timer, Eventual, EventualExt};
use graphql_client::GraphQLQuery;
use thegraph_core::{Address, DeploymentId};
use tokio::time::sleep;
use tokio::{
sync::watch::{self, Receiver},
time::{self, sleep},
};
use tracing::warn;

type BigInt = U256;
Expand Down Expand Up @@ -61,30 +63,43 @@ pub fn indexer_allocations(
indexer_address: Address,
interval: Duration,
recently_closed_allocation_buffer: Duration,
) -> Eventual<HashMap<Address, Allocation>> {
// Refresh indexer allocations every now and then
timer(interval).map_with_retry(
move |_| async move {
get_allocations(
network_subgraph,
indexer_address,
recently_closed_allocation_buffer,
)
.await
.map_err(|e| e.to_string())
},
// Need to use string errors here because eventuals `map_with_retry` retries
// errors that can be cloned
move |err: String| {
warn!(
"Failed to fetch active or recently closed allocations for indexer {:?}: {}",
indexer_address, err
);

// Sleep for a bit before we retry
sleep(interval.div_f32(2.0))
},
)
) -> Receiver<HashMap<Address, Allocation>> {
let (tx, rx) = watch::channel(HashMap::new());
tokio::spawn(async move {
let mut time_interval = time::interval(interval);
// Refresh indexer allocations every now and then
loop {
time_interval.tick().await;
let result = async {
get_allocations(
network_subgraph,
indexer_address,
recently_closed_allocation_buffer,
)
.await
.map_err(|e| e.to_string())
}
.await;
match result {
Ok(res) => {
if tx.send(res).is_err() {
//stopping[something gone wrong with channel]
break;
}
}
Err(err) => {
warn!(
"Failed to fetch active or recently closed allocations for indexer {:?}: {}",
indexer_address, err
);

// Sleep for a bit before we retry
sleep(interval.div_f32(2.0)).await;
}
}
}
});
rx
}

pub async fn get_allocations(
Expand Down
74 changes: 44 additions & 30 deletions common/src/attestations/dispute_manager.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use eventuals::{timer, Eventual, EventualExt};
use graphql_client::GraphQLQuery;
use std::time::Duration;
use thegraph_core::Address;
use tokio::time::sleep;
use tokio::sync::watch::{self, Receiver};
use tokio::time::{self, sleep};
use tracing::warn;

use crate::subgraph_client::SubgraphClient;
Expand All @@ -25,27 +24,44 @@ struct DisputeManager;
pub fn dispute_manager(
network_subgraph: &'static SubgraphClient,
interval: Duration,
) -> Eventual<Address> {
timer(interval).map_with_retry(
move |_| async move {
let response = network_subgraph
.query::<DisputeManager, _>(dispute_manager::Variables {})
.await
.map_err(|e| e.to_string())?;

response.map_err(|e| e.to_string()).and_then(|data| {
data.graph_network
.map(|network| network.dispute_manager)
.ok_or_else(|| "Network 1 not found in network subgraph".to_string())
})
},
move |err: String| {
warn!("Failed to query dispute manager: {}", err);

// Sleep for a bit before we retry
sleep(interval.div_f32(2.0))
},
)
) -> Receiver<Address> {
let (tx, rx) = watch::channel(Address::default());
tokio::spawn(async move {
let mut time_interval = time::interval(interval);

loop {
time_interval.tick().await;

let result = async {
let response = network_subgraph
.query::<DisputeManager, _>(dispute_manager::Variables {})
.await
.map_err(|e| e.to_string())?;

response.map_err(|e| e.to_string()).and_then(|data| {
data.graph_network
.map(|network| network.dispute_manager)
.ok_or_else(|| "Network 1 not found in network subgraph".to_string())
})
}
.await;

match result {
Ok(address) => {
if tx.send(address).is_err() {
// stopping
break;
}
}
Err(err) => {
warn!("Failed to query dispute manager for network: {}", err);
// Sleep for a bit before we retry
sleep(interval.div_f32(2.0)).await;
}
}
}
});
rx
}

#[cfg(test)]
Expand Down Expand Up @@ -100,10 +116,8 @@ mod test {
let (network_subgraph, _mock_server) = setup_mock_network_subgraph().await;

let dispute_manager = dispute_manager(network_subgraph, Duration::from_secs(60));

assert_eq!(
dispute_manager.value().await.unwrap(),
*DISPUTE_MANAGER_ADDRESS
);
sleep(Duration::from_millis(50)).await;
let result = *dispute_manager.borrow();
assert_eq!(result, *DISPUTE_MANAGER_ADDRESS);
}
}
62 changes: 38 additions & 24 deletions common/src/attestations/signers.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,39 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use eventuals::{join, Eventual, EventualExt};
use std::collections::HashMap;
use std::sync::Arc;
use thegraph_core::{Address, ChainId};
use tokio::sync::Mutex;
use tokio::sync::{watch, watch::Receiver, Mutex};
use tracing::warn;

use crate::prelude::{Allocation, AttestationSigner};

/// An always up-to-date list of attestation signers, one for each of the indexer's allocations.
pub fn attestation_signers(
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
mut indexer_allocations: Receiver<HashMap<Address, Allocation>>,
indexer_mnemonic: String,
chain_id: ChainId,
dispute_manager: Eventual<Address>,
) -> Eventual<HashMap<Address, AttestationSigner>> {
mut dispute_manager: Receiver<Address>,
) -> Receiver<HashMap<Address, AttestationSigner>> {
let attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>> =
Box::leak(Box::new(Mutex::new(HashMap::new())));

let indexer_mnemonic = Arc::new(indexer_mnemonic);

// Whenever the indexer's active or recently closed allocations change, make sure
// we have attestation signers for all of them
join((indexer_allocations, dispute_manager)).map(move |(allocations, dispute_manager)| {
let indexer_mnemonic = indexer_mnemonic.clone();
async move {
let (tx, rx) = watch::channel(HashMap::new());

tokio::spawn(async move {
loop {
tokio::select! {
Ok(_) = indexer_allocations.changed() => {},
Ok(_) = dispute_manager.changed() => {},
else => break,
}

let allocations = indexer_allocations.borrow().clone();
let dispute_manager = *dispute_manager.borrow();
let indexer_mnemonic = indexer_mnemonic.clone();

let mut signers = attestation_signers_map.lock().await;

// Remove signers for allocations that are no longer active or recently closed
Expand All @@ -53,13 +60,18 @@ pub fn attestation_signers(
}
}

signers.clone()
// sending updated signers map
tx.send(signers.clone()).unwrap();
}
})
});

rx
}

#[cfg(test)]
mod tests {
use tokio::time::sleep;

use crate::test_vectors::{
DISPUTE_MANAGER_ADDRESS, INDEXER_ALLOCATIONS, INDEXER_OPERATOR_MNEMONIC,
};
Expand All @@ -68,27 +80,29 @@ mod tests {

#[tokio::test]
async fn test_attestation_signers_update_with_allocations() {
let (mut allocations_writer, allocations) = Eventual::<HashMap<Address, Allocation>>::new();
let (mut dispute_manager_writer, dispute_manager) = Eventual::<Address>::new();
let (allocations_tx, allocations_rx) = watch::channel(HashMap::new());
let (dispute_manager_tx, dispute_manager_rx) = watch::channel(Address::default());

dispute_manager_writer.write(*DISPUTE_MANAGER_ADDRESS);
dispute_manager_tx.send(*DISPUTE_MANAGER_ADDRESS).unwrap();

let signers = attestation_signers(
allocations,
let signers_rx = attestation_signers(
allocations_rx,
(*INDEXER_OPERATOR_MNEMONIC).to_string(),
1,
dispute_manager,
dispute_manager_rx,
);
let mut signers = signers.subscribe();

// Test that an empty set of allocations leads to an empty set of signers
allocations_writer.write(HashMap::new());
let latest_signers = signers.next().await.unwrap();
allocations_tx.send(HashMap::new()).unwrap();
//change wait if required
sleep(std::time::Duration::from_millis(50)).await; // waiting for propegation
let latest_signers = signers_rx.borrow().clone();
assert_eq!(latest_signers, HashMap::new());

// Test that writing our set of test allocations results in corresponding signers for all of them
allocations_writer.write((*INDEXER_ALLOCATIONS).clone());
let latest_signers = signers.next().await.unwrap();
allocations_tx.send((*INDEXER_ALLOCATIONS).clone()).unwrap();
sleep(std::time::Duration::from_millis(50)).await; // waiting for propegation
let latest_signers = signers_rx.borrow().clone();
assert_eq!(latest_signers.len(), INDEXER_ALLOCATIONS.len());
for signer_allocation_id in latest_signers.keys() {
assert!(INDEXER_ALLOCATIONS
Expand Down
54 changes: 34 additions & 20 deletions common/src/escrow_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ use std::{

use alloy::primitives::U256;
use anyhow::{anyhow, Result};
use eventuals::{timer, Eventual, EventualExt};
use graphql_client::GraphQLQuery;
use thegraph_core::Address;
use thiserror::Error;
use tokio::time::sleep;
use tokio::{
sync::watch::{self, Receiver},
time::{self, sleep},
};
use tracing::{error, warn};

use crate::prelude::SubgraphClient;
Expand Down Expand Up @@ -105,22 +107,34 @@ pub fn escrow_accounts(
indexer_address: Address,
interval: Duration,
reject_thawing_signers: bool,
) -> Eventual<EscrowAccounts> {
timer(interval).map_with_retry(
move |_| async move {
get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers)
.await
.map_err(|e| e.to_string())
},
move |err: String| {
error!(
"Failed to fetch escrow accounts for indexer {:?}: {}",
indexer_address, err
);
) -> Receiver<EscrowAccounts> {
let (tx, rx) = watch::channel(EscrowAccounts::default());
tokio::spawn(async move {
let mut time_interval = time::interval(interval);

sleep(interval.div_f32(2.0))
},
)
loop {
time_interval.tick().await;
let result =
get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers)
.await
.map_err(|e| e.to_string());
match result {
Ok(escrow_accounts) => {
if tx.send(escrow_accounts).is_err() {
break; // something wrong with channel
}
}
Err(err) => {
error!(
"Failed to fetch escrow accounts for indexer {:?}: {}",
indexer_address, err
);
sleep(interval.div_f32(2.0)).await;
}
}
}
});
rx
}

async fn get_escrow_accounts(
Expand Down Expand Up @@ -236,15 +250,15 @@ mod tests {
);
mock_server.register(mock).await;

let accounts = escrow_accounts(
let accounts_rx = escrow_accounts(
escrow_subgraph,
*test_vectors::INDEXER_ADDRESS,
Duration::from_secs(60),
true,
);

sleep(Duration::from_millis(50)).await;
assert_eq!(
accounts.value().await.unwrap(),
accounts_rx.borrow().clone(),
EscrowAccounts::new(
test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(),
test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(),
Expand Down
Loading
Loading