11// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22// SPDX-License-Identifier: Apache-2.0
33
4- use eventuals:: { Eventual , EventualExt } ;
4+ use eventuals:: { Eventual , EventualExt , EventualWriter } ;
55use std:: collections:: HashMap ;
66use std:: sync:: Arc ;
77use thegraph_core:: { Address , ChainId } ;
@@ -24,8 +24,6 @@ pub fn attestation_signers(
2424 let attestation_signers_map: & ' static Mutex < HashMap < Address , AttestationSigner > > =
2525 Box :: leak ( Box :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ) ;
2626
27- let indexer_mnemonic = Arc :: new ( indexer_mnemonic) ;
28-
2927 // Whenever the indexer's active or recently closed allocations change, make sure
3028 // we have attestation signers for all of them
3129 let ( mut signers_writer, signers_reader) =
@@ -40,49 +38,74 @@ pub fn attestation_signers(
4038 let _p1 = indexer_allocations. pipe ( move |allocatons| {
4139 let _ = allocations_tx. send ( allocatons) ;
4240 } ) ;
41+
4342 loop {
4443 select ! {
45- Ok ( _) = allocations_rx. changed( ) =>{ } ,
46- Ok ( _) = dispute_manager_rx. changed( ) =>{ } ,
47- }
48- let mut signers = attestation_signers_map. lock ( ) . await ;
49-
50- let allocations = allocations_rx. borrow ( ) . clone ( ) ;
51- let dispute_manager = * dispute_manager_rx. borrow ( ) ;
52- if dispute_manager. is_none ( ) {
53- continue ;
54- }
55- let dispute_manager = dispute_manager. unwrap ( ) ;
56- // Remove signers for allocations that are no longer active or recently closed
57- signers. retain ( |id, _| allocations. contains_key ( id) ) ;
58-
59- // Create signers for new allocations
60- for ( id, allocation) in allocations. iter ( ) {
61- if !signers. contains_key ( id) {
62- let signer = AttestationSigner :: new (
63- & indexer_mnemonic,
64- allocation,
44+ Ok ( _) = allocations_rx. changed( ) =>{
45+ signers_writer = modify_sigers(
46+ Arc :: new( indexer_mnemonic. clone( ) ) ,
6547 chain_id,
66- dispute_manager,
67- ) ;
68- if let Err ( e) = signer {
69- warn ! (
70- "Failed to establish signer for allocation {}, deployment {}, createdAtEpoch {}: {}" ,
71- allocation. id, allocation. subgraph_deployment. id,
72- allocation. created_at_epoch, e
73- ) ;
74- } else {
75- signers. insert ( * id, signer. unwrap ( ) ) ;
76- }
48+ attestation_signers_map,
49+ allocations_rx. clone( ) ,
50+ dispute_manager_rx. clone( ) ,
51+ signers_writer) . await ;
52+ } ,
53+ Ok ( _) = dispute_manager_rx. changed( ) =>{
54+ signers_writer = modify_sigers( Arc :: new( indexer_mnemonic. clone( ) ) ,
55+ chain_id,
56+ attestation_signers_map,
57+ allocations_rx. clone( ) ,
58+ dispute_manager_rx. clone( ) ,
59+ signers_writer) . await ;
60+ } ,
61+ else=>{
62+ //something is wrong
63+ break ;
7764 }
7865 }
79-
80- signers_writer. write ( signers. clone ( ) ) ;
8166 }
8267 } ) ;
8368
8469 signers_reader
8570}
71+ async fn modify_sigers (
72+ indexer_mnemonic : Arc < String > ,
73+ chain_id : ChainId ,
74+ attestation_signers_map : & ' static Mutex < HashMap < Address , AttestationSigner > > ,
75+ allocations_rx : Receiver < HashMap < Address , Allocation > > ,
76+ dispute_manager_rx : Receiver < Option < Address > > ,
77+ mut signers_writer : EventualWriter < HashMap < Address , AttestationSigner > > ,
78+ ) -> EventualWriter < HashMap < Address , AttestationSigner > > {
79+ let mut signers = attestation_signers_map. lock ( ) . await ;
80+ let allocations = allocations_rx. borrow ( ) . clone ( ) ;
81+ let dispute_manager = * dispute_manager_rx. borrow ( ) ;
82+ if dispute_manager. is_none ( ) {
83+ return signers_writer;
84+ }
85+ let dispute_manager = dispute_manager. unwrap ( ) ;
86+ // Remove signers for allocations that are no longer active or recently closed
87+ signers. retain ( |id, _| allocations. contains_key ( id) ) ;
88+
89+ // Create signers for new allocations
90+ for ( id, allocation) in allocations. iter ( ) {
91+ if !signers. contains_key ( id) {
92+ let signer =
93+ AttestationSigner :: new ( & indexer_mnemonic, allocation, chain_id, dispute_manager) ;
94+ if let Err ( e) = signer {
95+ warn ! (
96+ "Failed to establish signer for allocation {}, deployment {}, createdAtEpoch {}: {}" ,
97+ allocation. id, allocation. subgraph_deployment. id,
98+ allocation. created_at_epoch, e
99+ ) ;
100+ } else {
101+ signers. insert ( * id, signer. unwrap ( ) ) ;
102+ }
103+ }
104+ }
105+
106+ signers_writer. write ( signers. clone ( ) ) ;
107+ signers_writer
108+ }
86109
87110#[ cfg( test) ]
88111mod tests {
0 commit comments