Skip to content

Commit d6ff24c

Browse files
refactor: use tokio::watch for attestation_signers (#387)
1 parent 1eab5ab commit d6ff24c

File tree

4 files changed

+68
-56
lines changed

4 files changed

+68
-56
lines changed

common/src/attestations/dispute_manager.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,9 @@ pub fn dispute_manager(
4444
.await;
4545

4646
match result {
47-
Ok(address) => {
48-
if tx.send(Some(address)).is_err() {
49-
// stopping
50-
break;
51-
}
52-
}
47+
Ok(address) => tx
48+
.send(Some(address))
49+
.expect("Failed to update dispute_manager channel"),
5350
Err(err) => {
5451
warn!("Failed to query dispute manager for network: {}", err);
5552
// Sleep for a bit before we retry

common/src/attestations/signers.rs

Lines changed: 59 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,85 +1,100 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use eventuals::{Eventual, EventualExt, EventualWriter};
4+
use eventuals::{Eventual, EventualExt};
55
use std::collections::HashMap;
66
use std::sync::Arc;
77
use thegraph_core::{Address, ChainId};
8-
use tokio::sync::watch;
98
use tokio::{
109
select,
11-
sync::{watch::Receiver, Mutex},
10+
sync::{
11+
watch::{self, Receiver},
12+
Mutex,
13+
},
1214
};
1315
use tracing::warn;
1416

1517
use crate::prelude::{Allocation, AttestationSigner};
1618

1719
/// An always up-to-date list of attestation signers, one for each of the indexer's allocations.
18-
pub fn attestation_signers(
20+
pub async fn attestation_signers(
1921
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
2022
indexer_mnemonic: String,
2123
chain_id: ChainId,
2224
mut dispute_manager_rx: Receiver<Option<Address>>,
23-
) -> Eventual<HashMap<Address, AttestationSigner>> {
25+
) -> Receiver<HashMap<Address, AttestationSigner>> {
2426
let attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>> =
2527
Box::leak(Box::new(Mutex::new(HashMap::new())));
2628

29+
// Actively listening to indexer_allocations to update allocations channel
30+
// Temporary fix until the indexer_allocations is migrated to tokio watch
31+
let (allocations_tx, mut allocations_rx) =
32+
watch::channel(indexer_allocations.value_immediate().unwrap_or_default());
33+
indexer_allocations
34+
.pipe(move |allocatons| {
35+
allocations_tx
36+
.send(allocatons)
37+
.expect("Failed to update allocations channel");
38+
})
39+
.forever();
40+
41+
let starter_signers_map = modify_sigers(
42+
Arc::new(indexer_mnemonic.clone()),
43+
chain_id,
44+
attestation_signers_map,
45+
allocations_rx.clone(),
46+
dispute_manager_rx.clone(),
47+
)
48+
.await;
49+
2750
// Whenever the indexer's active or recently closed allocations change, make sure
2851
// we have attestation signers for all of them.
29-
let (mut signers_writer, signers_reader) =
30-
Eventual::<HashMap<Address, AttestationSigner>>::new();
31-
52+
let (signers_tx, signers_rx) = watch::channel(starter_signers_map);
3253
tokio::spawn(async move {
33-
// Listening to the allocation eventual and converting them to reciever.
34-
// Using pipe for updation.
35-
// For temporary pupose only.
36-
let (allocations_tx, mut allocations_rx) =
37-
watch::channel(indexer_allocations.value().await.unwrap());
38-
let _p1 = indexer_allocations.pipe(move |allocatons| {
39-
let _ = allocations_tx.send(allocatons);
40-
});
41-
4254
loop {
43-
select! {
44-
Ok(_)= allocations_rx.changed() =>{
55+
let updated_signers = select! {
56+
Ok(())= allocations_rx.changed() =>{
4557
modify_sigers(
4658
Arc::new(indexer_mnemonic.clone()),
4759
chain_id,
4860
attestation_signers_map,
4961
allocations_rx.clone(),
5062
dispute_manager_rx.clone(),
51-
&mut signers_writer).await;
63+
).await
5264
},
53-
Ok(_)= dispute_manager_rx.changed() =>{
54-
modify_sigers(Arc::new(indexer_mnemonic.clone()),
55-
chain_id,
56-
attestation_signers_map,
57-
allocations_rx.clone(),
58-
dispute_manager_rx.clone(),
59-
&mut signers_writer).await;
65+
Ok(())= dispute_manager_rx.changed() =>{
66+
modify_sigers(
67+
Arc::new(indexer_mnemonic.clone()),
68+
chain_id,
69+
attestation_signers_map,
70+
allocations_rx.clone(),
71+
dispute_manager_rx.clone()
72+
).await
6073
},
6174
else=>{
6275
// Something is wrong.
6376
panic!("dispute_manager_rx or allocations_rx was dropped");
6477
}
65-
}
78+
};
79+
signers_tx
80+
.send(updated_signers)
81+
.expect("Failed to update signers channel");
6682
}
6783
});
6884

69-
signers_reader
85+
signers_rx
7086
}
7187
async fn modify_sigers(
7288
indexer_mnemonic: Arc<String>,
7389
chain_id: ChainId,
7490
attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>>,
7591
allocations_rx: Receiver<HashMap<Address, Allocation>>,
7692
dispute_manager_rx: Receiver<Option<Address>>,
77-
signers_writer: &mut EventualWriter<HashMap<Address, AttestationSigner>>,
78-
) {
93+
) -> HashMap<thegraph_core::Address, AttestationSigner> {
7994
let mut signers = attestation_signers_map.lock().await;
8095
let allocations = allocations_rx.borrow().clone();
8196
let Some(dispute_manager) = *dispute_manager_rx.borrow() else {
82-
return;
97+
return signers.clone();
8398
};
8499
// Remove signers for allocations that are no longer active or recently closed
85100
signers.retain(|id, _| allocations.contains_key(id));
@@ -101,7 +116,7 @@ async fn modify_sigers(
101116
}
102117
}
103118

104-
signers_writer.write(signers.clone());
119+
signers.clone()
105120
}
106121

107122
#[cfg(test)]
@@ -115,29 +130,30 @@ mod tests {
115130
#[tokio::test]
116131
async fn test_attestation_signers_update_with_allocations() {
117132
let (mut allocations_writer, allocations) = Eventual::<HashMap<Address, Allocation>>::new();
118-
let (dispute_manager_writer, dispute_manager) = watch::channel(None);
119-
120-
dispute_manager_writer
133+
let (dispute_manager_tx, dispute_manager_rx) = watch::channel(None);
134+
dispute_manager_tx
121135
.send(Some(*DISPUTE_MANAGER_ADDRESS))
122136
.unwrap();
123-
124-
let signers = attestation_signers(
137+
let mut signers = attestation_signers(
125138
allocations,
126139
(*INDEXER_OPERATOR_MNEMONIC).to_string(),
127140
1,
128-
dispute_manager,
129-
);
130-
let mut signers = signers.subscribe();
141+
dispute_manager_rx,
142+
)
143+
.await;
131144

132145
// Test that an empty set of allocations leads to an empty set of signers
133146
allocations_writer.write(HashMap::new());
134-
let latest_signers = signers.next().await.unwrap();
147+
signers.changed().await.unwrap();
148+
let latest_signers = signers.borrow().clone();
135149
assert_eq!(latest_signers, HashMap::new());
136150

137151
// Test that writing our set of test allocations results in corresponding signers for all of them
138152
allocations_writer.write((*INDEXER_ALLOCATIONS).clone());
139-
let latest_signers = signers.next().await.unwrap();
153+
signers.changed().await.unwrap();
154+
let latest_signers = signers.borrow().clone();
140155
assert_eq!(latest_signers.len(), INDEXER_ALLOCATIONS.len());
156+
141157
for signer_allocation_id in latest_signers.keys() {
142158
assert!(INDEXER_ALLOCATIONS
143159
.keys()

common/src/indexer_service/http/indexer_service.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use thegraph_core::{Address, Attestation, DeploymentId};
2828
use thiserror::Error;
2929
use tokio::net::TcpListener;
3030
use tokio::signal;
31+
use tokio::sync::watch::Receiver;
3132
use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};
3233
use tower_http::{cors, cors::CorsLayer, normalize_path::NormalizePath, trace::TraceLayer};
3334
use tracing::error;
@@ -182,7 +183,7 @@ where
182183
I: IndexerServiceImpl + Sync + Send + 'static,
183184
{
184185
pub config: IndexerServiceConfig,
185-
pub attestation_signers: Eventual<HashMap<Address, AttestationSigner>>,
186+
pub attestation_signers: Receiver<HashMap<Address, AttestationSigner>>,
186187
pub tap_manager: Manager<IndexerTapContext>,
187188
pub service_impl: Arc<I>,
188189

@@ -248,7 +249,8 @@ impl IndexerService {
248249
options.config.indexer.operator_mnemonic.clone(),
249250
options.config.graph_network.chain_id,
250251
dispute_manager,
251-
);
252+
)
253+
.await;
252254

253255
let escrow_subgraph: &'static SubgraphClient = Box::leak(Box::new(SubgraphClient::new(
254256
http_client,

common/src/indexer_service/http/request_handler.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,9 @@ where
155155
.map_err(IndexerServiceError::ReceiptError)?;
156156

157157
// Check if we have an attestation signer for the allocation the receipt was created for
158-
let signers = state
158+
let signer = state
159159
.attestation_signers
160-
.value_immediate()
161-
.ok_or_else(|| IndexerServiceError::ServiceNotReady)?;
162-
163-
let signer = signers
160+
.borrow()
164161
.get(&allocation_id)
165162
.cloned()
166163
.ok_or_else(|| (IndexerServiceError::NoSignerForAllocation(allocation_id)))?;

0 commit comments

Comments
 (0)