Skip to content

Commit 817703e

Browse files
refactor: replace eventuals with Tokio watch for disputes (#369)
1 parent f30b210 commit 817703e

File tree

2 files changed

+124
-65
lines changed

2 files changed

+124
-65
lines changed

common/src/attestations/dispute_manager.rs

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::time::Duration;
5-
4+
use crate::subgraph_client::SubgraphClient;
65
use alloy::primitives::Address;
7-
use eventuals::{timer, Eventual, EventualExt};
6+
use anyhow::Error;
87
use graphql_client::GraphQLQuery;
9-
use tokio::time::sleep;
8+
use std::time::Duration;
9+
use tokio::sync::watch::{self, Receiver};
10+
use tokio::time::{self, sleep};
1011
use tracing::warn;
1112

12-
use crate::subgraph_client::SubgraphClient;
13-
1413
type Bytes = Address;
1514

1615
#[derive(GraphQLQuery)]
@@ -25,27 +24,41 @@ struct DisputeManager;
2524
pub fn dispute_manager(
2625
network_subgraph: &'static SubgraphClient,
2726
interval: Duration,
28-
) -> Eventual<Address> {
29-
timer(interval).map_with_retry(
30-
move |_| async move {
31-
let response = network_subgraph
32-
.query::<DisputeManager, _>(dispute_manager::Variables {})
33-
.await
34-
.map_err(|e| e.to_string())?;
35-
36-
response.map_err(|e| e.to_string()).and_then(|data| {
37-
data.graph_network
27+
) -> Receiver<Option<Address>> {
28+
let (tx, rx) = watch::channel(None);
29+
tokio::spawn(async move {
30+
let mut time_interval = time::interval(interval);
31+
time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
32+
loop {
33+
time_interval.tick().await;
34+
35+
let result = async {
36+
let response = network_subgraph
37+
.query::<DisputeManager, _>(dispute_manager::Variables {})
38+
.await?;
39+
response?
40+
.graph_network
3841
.map(|network| network.dispute_manager)
39-
.ok_or_else(|| "Network 1 not found in network subgraph".to_string())
40-
})
41-
},
42-
move |err: String| {
43-
warn!("Failed to query dispute manager: {}", err);
42+
.ok_or_else(|| Error::msg("Network 1 not found in network subgraph"))
43+
}
44+
.await;
4445

45-
// Sleep for a bit before we retry
46-
sleep(interval.div_f32(2.0))
47-
},
48-
)
46+
match result {
47+
Ok(address) => {
48+
if tx.send(Some(address)).is_err() {
49+
// stopping
50+
break;
51+
}
52+
}
53+
Err(err) => {
54+
warn!("Failed to query dispute manager for network: {}", err);
55+
// Sleep for a bit before we retry
56+
sleep(interval.div_f32(2.0)).await;
57+
}
58+
}
59+
}
60+
});
61+
rx
4962
}
5063

5164
#[cfg(test)]
@@ -100,10 +113,8 @@ mod test {
100113
let (network_subgraph, _mock_server) = setup_mock_network_subgraph().await;
101114

102115
let dispute_manager = dispute_manager(network_subgraph, Duration::from_secs(60));
103-
104-
assert_eq!(
105-
dispute_manager.value().await.unwrap(),
106-
*DISPUTE_MANAGER_ADDRESS
107-
);
116+
sleep(Duration::from_millis(50)).await;
117+
let result = *dispute_manager.borrow();
118+
assert_eq!(result.unwrap(), *DISPUTE_MANAGER_ADDRESS);
108119
}
109120
}

common/src/attestations/signers.rs

Lines changed: 83 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use eventuals::{join, Eventual, EventualExt};
4+
use eventuals::{Eventual, EventualExt, EventualWriter};
55
use std::collections::HashMap;
66
use std::sync::Arc;
77
use thegraph_core::{Address, ChainId};
8-
use tokio::sync::Mutex;
8+
use tokio::sync::watch;
9+
use tokio::{
10+
select,
11+
sync::{watch::Receiver, Mutex},
12+
};
913
use tracing::warn;
1014

1115
use crate::prelude::{Allocation, AttestationSigner};
@@ -15,47 +19,89 @@ pub fn attestation_signers(
1519
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
1620
indexer_mnemonic: String,
1721
chain_id: ChainId,
18-
dispute_manager: Eventual<Address>,
22+
mut dispute_manager_rx: Receiver<Option<Address>>,
1923
) -> Eventual<HashMap<Address, AttestationSigner>> {
2024
let attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>> =
2125
Box::leak(Box::new(Mutex::new(HashMap::new())));
2226

23-
let indexer_mnemonic = Arc::new(indexer_mnemonic);
24-
2527
// Whenever the indexer's active or recently closed allocations change, make sure
26-
// we have attestation signers for all of them
27-
join((indexer_allocations, dispute_manager)).map(move |(allocations, dispute_manager)| {
28-
let indexer_mnemonic = indexer_mnemonic.clone();
29-
async move {
30-
let mut signers = attestation_signers_map.lock().await;
31-
32-
// Remove signers for allocations that are no longer active or recently closed
33-
signers.retain(|id, _| allocations.contains_key(id));
34-
35-
// Create signers for new allocations
36-
for (id, allocation) in allocations.iter() {
37-
if !signers.contains_key(id) {
38-
let signer = AttestationSigner::new(
39-
&indexer_mnemonic,
40-
allocation,
28+
// we have attestation signers for all of them.
29+
let (mut signers_writer, signers_reader) =
30+
Eventual::<HashMap<Address, AttestationSigner>>::new();
31+
32+
tokio::spawn(async move {
33+
// Listening to the allocation eventual and converting them to reciever.
34+
// Using pipe for updation.
35+
// For temporary pupose only.
36+
let (allocations_tx, mut allocations_rx) =
37+
watch::channel(indexer_allocations.value().await.unwrap());
38+
let _p1 = indexer_allocations.pipe(move |allocatons| {
39+
let _ = allocations_tx.send(allocatons);
40+
});
41+
42+
loop {
43+
select! {
44+
Ok(_)= allocations_rx.changed() =>{
45+
modify_sigers(
46+
Arc::new(indexer_mnemonic.clone()),
4147
chain_id,
42-
dispute_manager,
43-
);
44-
if let Err(e) = signer {
45-
warn!(
46-
"Failed to establish signer for allocation {}, deployment {}, createdAtEpoch {}: {}",
47-
allocation.id, allocation.subgraph_deployment.id,
48-
allocation.created_at_epoch, e
49-
);
50-
} else {
51-
signers.insert(*id, signer.unwrap());
52-
}
48+
attestation_signers_map,
49+
allocations_rx.clone(),
50+
dispute_manager_rx.clone(),
51+
&mut signers_writer).await;
52+
},
53+
Ok(_)= dispute_manager_rx.changed() =>{
54+
modify_sigers(Arc::new(indexer_mnemonic.clone()),
55+
chain_id,
56+
attestation_signers_map,
57+
allocations_rx.clone(),
58+
dispute_manager_rx.clone(),
59+
&mut signers_writer).await;
60+
},
61+
else=>{
62+
// Something is wrong.
63+
panic!("dispute_manager_rx or allocations_rx was dropped");
5364
}
5465
}
66+
}
67+
});
5568

56-
signers.clone()
69+
signers_reader
70+
}
71+
async fn modify_sigers(
72+
indexer_mnemonic: Arc<String>,
73+
chain_id: ChainId,
74+
attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>>,
75+
allocations_rx: Receiver<HashMap<Address, Allocation>>,
76+
dispute_manager_rx: Receiver<Option<Address>>,
77+
signers_writer: &mut EventualWriter<HashMap<Address, AttestationSigner>>,
78+
) {
79+
let mut signers = attestation_signers_map.lock().await;
80+
let allocations = allocations_rx.borrow().clone();
81+
let Some(dispute_manager) = *dispute_manager_rx.borrow() else {
82+
return;
83+
};
84+
// Remove signers for allocations that are no longer active or recently closed
85+
signers.retain(|id, _| allocations.contains_key(id));
86+
87+
// Create signers for new allocations
88+
for (id, allocation) in allocations.iter() {
89+
if !signers.contains_key(id) {
90+
let signer =
91+
AttestationSigner::new(&indexer_mnemonic, allocation, chain_id, dispute_manager);
92+
if let Err(e) = signer {
93+
warn!(
94+
"Failed to establish signer for allocation {}, deployment {}, createdAtEpoch {}: {}",
95+
allocation.id, allocation.subgraph_deployment.id,
96+
allocation.created_at_epoch, e
97+
);
98+
} else {
99+
signers.insert(*id, signer.unwrap());
100+
}
57101
}
58-
})
102+
}
103+
104+
signers_writer.write(signers.clone());
59105
}
60106

61107
#[cfg(test)]
@@ -69,9 +115,11 @@ mod tests {
69115
#[tokio::test]
70116
async fn test_attestation_signers_update_with_allocations() {
71117
let (mut allocations_writer, allocations) = Eventual::<HashMap<Address, Allocation>>::new();
72-
let (mut dispute_manager_writer, dispute_manager) = Eventual::<Address>::new();
118+
let (dispute_manager_writer, dispute_manager) = watch::channel(None);
73119

74-
dispute_manager_writer.write(*DISPUTE_MANAGER_ADDRESS);
120+
dispute_manager_writer
121+
.send(Some(*DISPUTE_MANAGER_ADDRESS))
122+
.unwrap();
75123

76124
let signers = attestation_signers(
77125
allocations,

0 commit comments

Comments
 (0)