@@ -8,7 +8,7 @@ use thegraph_core::{Address, ChainId};
88use tokio:: {
99 select,
1010 sync:: {
11- watch:: { self , Receiver , Sender } ,
11+ watch:: { self , Receiver } ,
1212 Mutex ,
1313 } ,
1414} ;
@@ -39,31 +39,40 @@ pub fn attestation_signers(
3939 let _p1 = indexer_allocations. pipe ( move |allocatons| {
4040 let _ = allocations_tx. send ( allocatons) ;
4141 } ) ;
42-
42+ modify_sigers (
43+ Arc :: new ( indexer_mnemonic. clone ( ) ) ,
44+ chain_id,
45+ attestation_signers_map,
46+ allocations_rx. clone ( ) ,
47+ dispute_manager_rx. clone ( ) ,
48+ )
49+ . await ;
4350 loop {
44- select ! {
45- Ok ( _ ) = allocations_rx. changed( ) =>{
51+ let updated_signers = select ! {
52+ Ok ( ( ) ) = allocations_rx. changed( ) =>{
4653 modify_sigers(
4754 Arc :: new( indexer_mnemonic. clone( ) ) ,
4855 chain_id,
4956 attestation_signers_map,
5057 allocations_rx. clone( ) ,
5158 dispute_manager_rx. clone( ) ,
52- signers_tx . clone ( ) ) . await ;
59+ ) . await
5360 } ,
54- Ok ( _) = dispute_manager_rx. changed( ) =>{
55- modify_sigers( Arc :: new( indexer_mnemonic. clone( ) ) ,
56- chain_id,
57- attestation_signers_map,
58- allocations_rx. clone( ) ,
59- dispute_manager_rx. clone( ) ,
60- signers_tx. clone( ) ) . await ;
61+ Ok ( ( ) ) = dispute_manager_rx. changed( ) =>{
62+ modify_sigers(
63+ Arc :: new( indexer_mnemonic. clone( ) ) ,
64+ chain_id,
65+ attestation_signers_map,
66+ allocations_rx. clone( ) ,
67+ dispute_manager_rx. clone( )
68+ ) . await
6169 } ,
6270 else=>{
6371 // Something is wrong.
6472 panic!( "dispute_manager_rx or allocations_rx was dropped" ) ;
6573 }
66- }
74+ } ;
75+ signers_tx. send ( updated_signers) . unwrap ( ) ;
6776 }
6877 } ) ;
6978
@@ -75,12 +84,11 @@ async fn modify_sigers(
7584 attestation_signers_map : & ' static Mutex < HashMap < Address , AttestationSigner > > ,
7685 allocations_rx : Receiver < HashMap < Address , Allocation > > ,
7786 dispute_manager_rx : Receiver < Option < Address > > ,
78- signers_tx : Sender < HashMap < Address , AttestationSigner > > ,
79- ) {
87+ ) -> HashMap < thegraph_core:: Address , AttestationSigner > {
8088 let mut signers = attestation_signers_map. lock ( ) . await ;
8189 let allocations = allocations_rx. borrow ( ) . clone ( ) ;
8290 let Some ( dispute_manager) = * dispute_manager_rx. borrow ( ) else {
83- return ;
91+ return signers . clone ( ) ;
8492 } ;
8593 // Remove signers for allocations that are no longer active or recently closed
8694 signers. retain ( |id, _| allocations. contains_key ( id) ) ;
@@ -102,7 +110,7 @@ async fn modify_sigers(
102110 }
103111 }
104112
105- signers_tx . send ( signers. clone ( ) ) . unwrap ( ) ;
113+ signers. clone ( )
106114}
107115
108116#[ cfg( test) ]
0 commit comments