diff --git a/common/src/attestations/dispute_manager.rs b/common/src/attestations/dispute_manager.rs index 2cdc0fa34..3ddb2f5ea 100644 --- a/common/src/attestations/dispute_manager.rs +++ b/common/src/attestations/dispute_manager.rs @@ -1,16 +1,15 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::time::Duration; - +use crate::subgraph_client::SubgraphClient; use alloy::primitives::Address; -use eventuals::{timer, Eventual, EventualExt}; +use anyhow::Error; use graphql_client::GraphQLQuery; -use tokio::time::sleep; +use std::time::Duration; +use tokio::sync::watch::{self, Receiver}; +use tokio::time::{self, sleep}; use tracing::warn; -use crate::subgraph_client::SubgraphClient; - type Bytes = Address; #[derive(GraphQLQuery)] @@ -25,27 +24,41 @@ struct DisputeManager; pub fn dispute_manager( network_subgraph: &'static SubgraphClient, interval: Duration, -) -> Eventual
{ - timer(interval).map_with_retry( - move |_| async move { - let response = network_subgraph - .query::(dispute_manager::Variables {}) - .await - .map_err(|e| e.to_string())?; - - response.map_err(|e| e.to_string()).and_then(|data| { - data.graph_network +) -> Receiver> { + let (tx, rx) = watch::channel(None); + tokio::spawn(async move { + let mut time_interval = time::interval(interval); + time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); + loop { + time_interval.tick().await; + + let result = async { + let response = network_subgraph + .query::(dispute_manager::Variables {}) + .await?; + response? + .graph_network .map(|network| network.dispute_manager) - .ok_or_else(|| "Network 1 not found in network subgraph".to_string()) - }) - }, - move |err: String| { - warn!("Failed to query dispute manager: {}", err); + .ok_or_else(|| Error::msg("Network 1 not found in network subgraph")) + } + .await; - // Sleep for a bit before we retry - sleep(interval.div_f32(2.0)) - }, - ) + match result { + Ok(address) => { + if tx.send(Some(address)).is_err() { + // stopping + break; + } + } + Err(err) => { + warn!("Failed to query dispute manager for network: {}", err); + // Sleep for a bit before we retry + sleep(interval.div_f32(2.0)).await; + } + } + } + }); + rx } #[cfg(test)] @@ -100,10 +113,8 @@ mod test { let (network_subgraph, _mock_server) = setup_mock_network_subgraph().await; let dispute_manager = dispute_manager(network_subgraph, Duration::from_secs(60)); - - assert_eq!( - dispute_manager.value().await.unwrap(), - *DISPUTE_MANAGER_ADDRESS - ); + sleep(Duration::from_millis(50)).await; + let result = *dispute_manager.borrow(); + assert_eq!(result.unwrap(), *DISPUTE_MANAGER_ADDRESS); } } diff --git a/common/src/attestations/signers.rs b/common/src/attestations/signers.rs index e8780aa8b..bceefa1a6 100644 --- a/common/src/attestations/signers.rs +++ b/common/src/attestations/signers.rs @@ -1,11 +1,15 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use eventuals::{join, Eventual, EventualExt}; +use eventuals::{Eventual, EventualExt, EventualWriter}; use std::collections::HashMap; use std::sync::Arc; use thegraph_core::{Address, ChainId}; -use tokio::sync::Mutex; +use tokio::sync::watch; +use tokio::{ + select, + sync::{watch::Receiver, Mutex}, +}; use tracing::warn; use crate::prelude::{Allocation, AttestationSigner}; @@ -15,47 +19,89 @@ pub fn attestation_signers( indexer_allocations: Eventual>, indexer_mnemonic: String, chain_id: ChainId, - dispute_manager: Eventual
, + mut dispute_manager_rx: Receiver>, ) -> Eventual> { let attestation_signers_map: &'static Mutex> = Box::leak(Box::new(Mutex::new(HashMap::new()))); - let indexer_mnemonic = Arc::new(indexer_mnemonic); - // Whenever the indexer's active or recently closed allocations change, make sure - // we have attestation signers for all of them - join((indexer_allocations, dispute_manager)).map(move |(allocations, dispute_manager)| { - let indexer_mnemonic = indexer_mnemonic.clone(); - async move { - let mut signers = attestation_signers_map.lock().await; - - // Remove signers for allocations that are no longer active or recently closed - signers.retain(|id, _| allocations.contains_key(id)); - - // Create signers for new allocations - for (id, allocation) in allocations.iter() { - if !signers.contains_key(id) { - let signer = AttestationSigner::new( - &indexer_mnemonic, - allocation, + // we have attestation signers for all of them. + let (mut signers_writer, signers_reader) = + Eventual::>::new(); + + tokio::spawn(async move { + // Listening to the allocation eventual and converting them to reciever. + // Using pipe for updation. + // For temporary pupose only. + let (allocations_tx, mut allocations_rx) = + watch::channel(indexer_allocations.value().await.unwrap()); + let _p1 = indexer_allocations.pipe(move |allocatons| { + let _ = allocations_tx.send(allocatons); + }); + + loop { + select! { + Ok(_)= allocations_rx.changed() =>{ + modify_sigers( + Arc::new(indexer_mnemonic.clone()), chain_id, - dispute_manager, - ); - if let Err(e) = signer { - warn!( - "Failed to establish signer for allocation {}, deployment {}, createdAtEpoch {}: {}", - allocation.id, allocation.subgraph_deployment.id, - allocation.created_at_epoch, e - ); - } else { - signers.insert(*id, signer.unwrap()); - } + attestation_signers_map, + allocations_rx.clone(), + dispute_manager_rx.clone(), + &mut signers_writer).await; + }, + Ok(_)= dispute_manager_rx.changed() =>{ + modify_sigers(Arc::new(indexer_mnemonic.clone()), + chain_id, + attestation_signers_map, + allocations_rx.clone(), + dispute_manager_rx.clone(), + &mut signers_writer).await; + }, + else=>{ + // Something is wrong. + panic!("dispute_manager_rx or allocations_rx was dropped"); } } + } + }); - signers.clone() + signers_reader +} +async fn modify_sigers( + indexer_mnemonic: Arc, + chain_id: ChainId, + attestation_signers_map: &'static Mutex>, + allocations_rx: Receiver>, + dispute_manager_rx: Receiver>, + signers_writer: &mut EventualWriter>, +) { + let mut signers = attestation_signers_map.lock().await; + let allocations = allocations_rx.borrow().clone(); + let Some(dispute_manager) = *dispute_manager_rx.borrow() else { + return; + }; + // Remove signers for allocations that are no longer active or recently closed + signers.retain(|id, _| allocations.contains_key(id)); + + // Create signers for new allocations + for (id, allocation) in allocations.iter() { + if !signers.contains_key(id) { + let signer = + AttestationSigner::new(&indexer_mnemonic, allocation, chain_id, dispute_manager); + if let Err(e) = signer { + warn!( + "Failed to establish signer for allocation {}, deployment {}, createdAtEpoch {}: {}", + allocation.id, allocation.subgraph_deployment.id, + allocation.created_at_epoch, e + ); + } else { + signers.insert(*id, signer.unwrap()); + } } - }) + } + + signers_writer.write(signers.clone()); } #[cfg(test)] @@ -69,9 +115,11 @@ mod tests { #[tokio::test] async fn test_attestation_signers_update_with_allocations() { let (mut allocations_writer, allocations) = Eventual::>::new(); - let (mut dispute_manager_writer, dispute_manager) = Eventual::
::new(); + let (dispute_manager_writer, dispute_manager) = watch::channel(None); - dispute_manager_writer.write(*DISPUTE_MANAGER_ADDRESS); + dispute_manager_writer + .send(Some(*DISPUTE_MANAGER_ADDRESS)) + .unwrap(); let signers = attestation_signers( allocations,