diff --git a/common/src/attestations/dispute_manager.rs b/common/src/attestations/dispute_manager.rs index 7c64c6e02..548ebffaf 100644 --- a/common/src/attestations/dispute_manager.rs +++ b/common/src/attestations/dispute_manager.rs @@ -2,11 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use std::time::Duration; - -use eventuals::{timer, Eventual, EventualExt}; use graphql_client::GraphQLQuery; use thegraph_core::Address; -use tokio::time::sleep; +use tokio::sync::watch::{self, Receiver}; +use tokio::time::{self, sleep}; use tracing::warn; use crate::subgraph_client::SubgraphClient; @@ -25,27 +24,43 @@ struct DisputeManager; pub fn dispute_manager( network_subgraph: &'static SubgraphClient, interval: Duration, -) -> Eventual
{ - timer(interval).map_with_retry( - move |_| async move { - let response = network_subgraph +) -> Receiver
{ + let (tx, rx) = watch::channel(Address::default()); + tokio::spawn(async move { + let mut time_interval = time::interval(interval); + + loop { + time_interval.tick().await; + + let result = async { + let response = network_subgraph .query::(dispute_manager::Variables {}) .await .map_err(|e| e.to_string())?; - response.map_err(|e| e.to_string()).and_then(|data| { - data.graph_network - .map(|network| network.dispute_manager) - .ok_or_else(|| "Network 1 not found in network subgraph".to_string()) - }) - }, - move |err: String| { - warn!("Failed to query dispute manager: {}", err); - - // Sleep for a bit before we retry - sleep(interval.div_f32(2.0)) - }, - ) + response.map_err(|e| e.to_string()).and_then(|data| { + data.graph_network + .map(|network| network.dispute_manager) + .ok_or_else(|| "Network 1 not found in network subgraph".to_string()) + }) + }.await; + + match result { + Ok(address) => { + if tx.send(address).is_err() { + // stopping + break; + } + } + Err(err) => { + warn!("Failed to query dispute manager for network: {}", err); + // Sleep for a bit before we retry + sleep(interval.div_f32(2.0)).await; + } + } + } + }); + rx } #[cfg(test)] @@ -100,9 +115,9 @@ mod test { let (network_subgraph, _mock_server) = setup_mock_network_subgraph().await; let dispute_manager = dispute_manager(network_subgraph, Duration::from_secs(60)); - + let result = dispute_manager.borrow().clone(); assert_eq!( - dispute_manager.value().await.unwrap(), + result, *DISPUTE_MANAGER_ADDRESS ); }