22// SPDX-License-Identifier: Apache-2.0
33
44use std:: time:: Duration ;
5-
6- use eventuals:: { timer, Eventual , EventualExt } ;
75use graphql_client:: GraphQLQuery ;
86use thegraph_core:: Address ;
9- use tokio:: time:: sleep;
7+ use tokio:: sync:: watch:: { self , Receiver } ;
8+ use tokio:: time:: { self , sleep} ;
109use tracing:: warn;
1110
1211use crate :: subgraph_client:: SubgraphClient ;
@@ -25,27 +24,43 @@ struct DisputeManager;
2524pub fn dispute_manager (
2625 network_subgraph : & ' static SubgraphClient ,
2726 interval : Duration ,
28- ) -> Eventual < Address > {
29- timer ( interval) . map_with_retry (
30- move |_| async move {
31- let response = network_subgraph
27+ ) -> Receiver < Address > {
28+ let ( tx, rx) = watch:: channel ( Address :: default ( ) ) ;
29+ tokio:: spawn ( async move {
30+ let mut time_interval = time:: interval ( interval) ;
31+
32+ loop {
33+ time_interval. tick ( ) . await ;
34+
35+ let result = async {
36+ let response = network_subgraph
3237 . query :: < DisputeManager , _ > ( dispute_manager:: Variables { } )
3338 . await
3439 . map_err ( |e| e. to_string ( ) ) ?;
3540
36- response. map_err ( |e| e. to_string ( ) ) . and_then ( |data| {
37- data. graph_network
38- . map ( |network| network. dispute_manager )
39- . ok_or_else ( || "Network 1 not found in network subgraph" . to_string ( ) )
40- } )
41- } ,
42- move |err : String | {
43- warn ! ( "Failed to query dispute manager: {}" , err) ;
44-
45- // Sleep for a bit before we retry
46- sleep ( interval. div_f32 ( 2.0 ) )
47- } ,
48- )
41+ response. map_err ( |e| e. to_string ( ) ) . and_then ( |data| {
42+ data. graph_network
43+ . map ( |network| network. dispute_manager )
44+ . ok_or_else ( || "Network 1 not found in network subgraph" . to_string ( ) )
45+ } )
46+ } . await ;
47+
48+ match result {
49+ Ok ( address) => {
50+ if tx. send ( address) . is_err ( ) {
51+ // stopping
52+ break ;
53+ }
54+ }
55+ Err ( err) => {
56+ warn ! ( "Failed to query dispute manager for network: {}" , err) ;
57+ // Sleep for a bit before we retry
58+ sleep ( interval. div_f32 ( 2.0 ) ) . await ;
59+ }
60+ }
61+ }
62+ } ) ;
63+ rx
4964}
5065
5166#[ cfg( test) ]
@@ -100,9 +115,9 @@ mod test {
100115 let ( network_subgraph, _mock_server) = setup_mock_network_subgraph ( ) . await ;
101116
102117 let dispute_manager = dispute_manager ( network_subgraph, Duration :: from_secs ( 60 ) ) ;
103-
118+ let result = dispute_manager . borrow ( ) . clone ( ) ;
104119 assert_eq ! (
105- dispute_manager . value ( ) . await . unwrap ( ) ,
120+ result ,
106121 * DISPUTE_MANAGER_ADDRESS
107122 ) ;
108123 }
0 commit comments