Skip to content

Commit da112ca

Browse files
migrate_2_tokio_watch attestation_signers
1 parent 87da82d commit da112ca

File tree

3 files changed

+23
-20
lines changed

3 files changed

+23
-20
lines changed

common/src/attestations/signers.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use eventuals::{Eventual, EventualExt, EventualWriter};
4+
use eventuals::{Eventual, EventualExt};
55
use std::collections::HashMap;
66
use std::sync::Arc;
77
use thegraph_core::{Address, ChainId};
8-
use tokio::sync::watch;
98
use tokio::{
109
select,
11-
sync::{watch::Receiver, Mutex},
10+
sync::{
11+
watch::{self, Receiver, Sender},
12+
Mutex,
13+
},
1214
};
1315
use tracing::warn;
1416

@@ -20,14 +22,13 @@ pub fn attestation_signers(
2022
indexer_mnemonic: String,
2123
chain_id: ChainId,
2224
mut dispute_manager_rx: Receiver<Option<Address>>,
23-
) -> Eventual<HashMap<Address, AttestationSigner>> {
25+
) -> Receiver<HashMap<Address, AttestationSigner>> {
2426
let attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>> =
2527
Box::leak(Box::new(Mutex::new(HashMap::new())));
2628

2729
// Whenever the indexer's active or recently closed allocations change, make sure
2830
// we have attestation signers for all of them.
29-
let (mut signers_writer, signers_reader) =
30-
Eventual::<HashMap<Address, AttestationSigner>>::new();
31+
let (signers_tx, signers_rx) = watch::channel(HashMap::new());
3132

3233
tokio::spawn(async move {
3334
// Listening to the allocation eventual and converting them to reciever.
@@ -48,15 +49,15 @@ pub fn attestation_signers(
4849
attestation_signers_map,
4950
allocations_rx.clone(),
5051
dispute_manager_rx.clone(),
51-
&mut signers_writer).await;
52+
signers_tx.clone()).await;
5253
},
5354
Ok(_)= dispute_manager_rx.changed() =>{
5455
modify_sigers(Arc::new(indexer_mnemonic.clone()),
5556
chain_id,
5657
attestation_signers_map,
5758
allocations_rx.clone(),
5859
dispute_manager_rx.clone(),
59-
&mut signers_writer).await;
60+
signers_tx.clone()).await;
6061
},
6162
else=>{
6263
// Something is wrong.
@@ -66,15 +67,15 @@ pub fn attestation_signers(
6667
}
6768
});
6869

69-
signers_reader
70+
signers_rx
7071
}
7172
async fn modify_sigers(
7273
indexer_mnemonic: Arc<String>,
7374
chain_id: ChainId,
7475
attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>>,
7576
allocations_rx: Receiver<HashMap<Address, Allocation>>,
7677
dispute_manager_rx: Receiver<Option<Address>>,
77-
signers_writer: &mut EventualWriter<HashMap<Address, AttestationSigner>>,
78+
signers_tx: Sender<HashMap<Address, AttestationSigner>>,
7879
) {
7980
let mut signers = attestation_signers_map.lock().await;
8081
let allocations = allocations_rx.borrow().clone();
@@ -101,7 +102,7 @@ async fn modify_sigers(
101102
}
102103
}
103104

104-
signers_writer.write(signers.clone());
105+
signers_tx.send(signers.clone()).unwrap();
105106
}
106107

107108
#[cfg(test)]
@@ -121,22 +122,23 @@ mod tests {
121122
.send(Some(*DISPUTE_MANAGER_ADDRESS))
122123
.unwrap();
123124

124-
let signers = attestation_signers(
125+
let mut signers = attestation_signers(
125126
allocations,
126127
(*INDEXER_OPERATOR_MNEMONIC).to_string(),
127128
1,
128129
dispute_manager,
129130
);
130-
let mut signers = signers.subscribe();
131131

132132
// Test that an empty set of allocations leads to an empty set of signers
133133
allocations_writer.write(HashMap::new());
134-
let latest_signers = signers.next().await.unwrap();
134+
signers.changed().await.unwrap();
135+
let latest_signers = signers.borrow().clone();
135136
assert_eq!(latest_signers, HashMap::new());
136137

137138
// Test that writing our set of test allocations results in corresponding signers for all of them
138139
allocations_writer.write((*INDEXER_ALLOCATIONS).clone());
139-
let latest_signers = signers.next().await.unwrap();
140+
signers.changed().await.unwrap();
141+
let latest_signers = signers.borrow().clone();
140142
assert_eq!(latest_signers.len(), INDEXER_ALLOCATIONS.len());
141143
for signer_allocation_id in latest_signers.keys() {
142144
assert!(INDEXER_ALLOCATIONS

common/src/indexer_service/http/indexer_service.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use thegraph_core::{Address, Attestation, DeploymentId};
3030
use thiserror::Error;
3131
use tokio::net::TcpListener;
3232
use tokio::signal;
33+
use tokio::sync::watch::Receiver;
3334
use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};
3435
use tower_http::{cors, cors::CorsLayer, normalize_path::NormalizePath, trace::TraceLayer};
3536
use tracing::error;
@@ -184,7 +185,7 @@ where
184185
I: IndexerServiceImpl + Sync + Send + 'static,
185186
{
186187
pub config: IndexerServiceConfig,
187-
pub attestation_signers: Eventual<HashMap<Address, AttestationSigner>>,
188+
pub attestation_signers: Receiver<HashMap<Address, AttestationSigner>>,
188189
pub tap_manager: Manager<IndexerTapContext>,
189190
pub service_impl: Arc<I>,
190191

common/src/indexer_service/http/request_handler.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,10 @@ where
155155
.map_err(IndexerServiceError::ReceiptError)?;
156156

157157
// Check if we have an attestation signer for the allocation the receipt was created for
158-
let signers = state
159-
.attestation_signers
160-
.value_immediate()
161-
.ok_or_else(|| IndexerServiceError::ServiceNotReady)?;
158+
// Removing check for empty value since channel was created with an empty HashMap
159+
//IndexerServiceError::ServiceNotReady
160+
161+
let signers = state.attestation_signers.borrow().clone();
162162

163163
let signer = signers
164164
.get(&allocation_id)

0 commit comments

Comments
 (0)