Skip to content

Commit f0c7d03

Browse files
authored
fix: initialize allocations monitor (#428)
1 parent eb31981 commit f0c7d03

File tree

7 files changed

+162
-155
lines changed

7 files changed

+162
-155
lines changed

common/src/allocations/monitor.rs

Lines changed: 13 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,11 @@ use std::{
88
};
99

1010
use super::Allocation;
11-
use crate::prelude::SubgraphClient;
11+
use crate::{prelude::SubgraphClient, watcher::new_watcher};
1212
use alloy::primitives::{TxHash, B256, U256};
1313
use graphql_client::GraphQLQuery;
1414
use thegraph_core::{Address, DeploymentId};
15-
use tokio::{
16-
sync::watch::{self, Receiver},
17-
time::{self, sleep},
18-
};
19-
use tracing::warn;
15+
use tokio::sync::watch::Receiver;
2016

2117
type BigInt = U256;
2218
type Bytes = B256;
@@ -58,43 +54,21 @@ impl TryFrom<allocations_query::AllocationFragment> for Allocation {
5854
}
5955

6056
/// An always up-to-date list of an indexer's active and recently closed allocations.
61-
pub fn indexer_allocations(
57+
pub async fn indexer_allocations(
6258
network_subgraph: &'static SubgraphClient,
6359
indexer_address: Address,
6460
interval: Duration,
6561
recently_closed_allocation_buffer: Duration,
66-
) -> Receiver<HashMap<Address, Allocation>> {
67-
let (tx, rx) = watch::channel(HashMap::new());
68-
tokio::spawn(async move {
69-
// Refresh indexer allocations every now and then
70-
let mut time_interval = time::interval(interval);
71-
time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
72-
loop {
73-
time_interval.tick().await;
74-
let result = get_allocations(
75-
network_subgraph,
76-
indexer_address,
77-
recently_closed_allocation_buffer,
78-
)
79-
.await;
80-
match result {
81-
Ok(allocations) => {
82-
tx.send(allocations)
83-
.expect("Failed to update indexer_allocations channel");
84-
}
85-
Err(err) => {
86-
warn!(
87-
"Failed to fetch active or recently closed allocations for indexer {:?}: {}",
88-
indexer_address, err
89-
);
90-
91-
// Sleep for a bit before we retry
92-
sleep(interval.div_f32(2.0)).await;
93-
}
94-
}
95-
}
96-
});
97-
rx
62+
) -> anyhow::Result<Receiver<HashMap<Address, Allocation>>> {
63+
new_watcher(interval, move || async move {
64+
get_allocations(
65+
network_subgraph,
66+
indexer_address,
67+
recently_closed_allocation_buffer,
68+
)
69+
.await
70+
})
71+
.await
9872
}
9973

10074
pub async fn get_allocations(

common/src/attestations/dispute_manager.rs

Lines changed: 19 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use crate::subgraph_client::SubgraphClient;
5+
use crate::watcher::new_watcher;
56
use alloy::primitives::Address;
67
use anyhow::Error;
78
use graphql_client::GraphQLQuery;
89
use std::time::Duration;
9-
use tokio::sync::watch::{self, Receiver};
10-
use tokio::time::{self, sleep};
11-
use tracing::warn;
10+
use tokio::sync::watch::Receiver;
1211

1312
type Bytes = Address;
1413

@@ -21,46 +20,26 @@ type Bytes = Address;
2120
)]
2221
struct DisputeManager;
2322

24-
pub fn dispute_manager(
23+
pub async fn dispute_manager(
2524
network_subgraph: &'static SubgraphClient,
2625
interval: Duration,
27-
) -> Receiver<Option<Address>> {
28-
let (tx, rx) = watch::channel(None);
29-
tokio::spawn(async move {
30-
let mut time_interval = time::interval(interval);
31-
time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
32-
loop {
33-
time_interval.tick().await;
34-
35-
let result = async {
36-
let response = network_subgraph
37-
.query::<DisputeManager, _>(dispute_manager::Variables {})
38-
.await?;
39-
response?
40-
.graph_network
41-
.map(|network| network.dispute_manager)
42-
.ok_or_else(|| Error::msg("Network 1 not found in network subgraph"))
43-
}
44-
.await;
45-
46-
match result {
47-
Ok(address) => tx
48-
.send(Some(address))
49-
.expect("Failed to update dispute_manager channel"),
50-
Err(err) => {
51-
warn!("Failed to query dispute manager for network: {}", err);
52-
// Sleep for a bit before we retry
53-
sleep(interval.div_f32(2.0)).await;
54-
}
55-
}
56-
}
57-
});
58-
rx
26+
) -> anyhow::Result<Receiver<Address>> {
27+
new_watcher(interval, move || async move {
28+
let response = network_subgraph
29+
.query::<DisputeManager, _>(dispute_manager::Variables {})
30+
.await?;
31+
response?
32+
.graph_network
33+
.map(|network| network.dispute_manager)
34+
.ok_or_else(|| Error::msg("Network 1 not found in network subgraph"))
35+
})
36+
.await
5937
}
6038

6139
#[cfg(test)]
6240
mod test {
6341
use serde_json::json;
42+
use tokio::time::sleep;
6443
use wiremock::{
6544
matchers::{method, path},
6645
Mock, MockServer, ResponseTemplate,
@@ -109,9 +88,11 @@ mod test {
10988
async fn test_parses_dispute_manager_from_network_subgraph_correctly() {
11089
let (network_subgraph, _mock_server) = setup_mock_network_subgraph().await;
11190

112-
let dispute_manager = dispute_manager(network_subgraph, Duration::from_secs(60));
91+
let dispute_manager = dispute_manager(network_subgraph, Duration::from_secs(60))
92+
.await
93+
.unwrap();
11394
sleep(Duration::from_millis(50)).await;
11495
let result = *dispute_manager.borrow();
115-
assert_eq!(result.unwrap(), *DISPUTE_MANAGER_ADDRESS);
96+
assert_eq!(result, *DISPUTE_MANAGER_ADDRESS);
11697
}
11798
}

common/src/attestations/signers.rs

Lines changed: 33 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -2,97 +2,59 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use bip39::Mnemonic;
5-
use std::collections::HashMap;
65
use std::sync::Arc;
6+
use std::{collections::HashMap, sync::Mutex};
77
use thegraph_core::{Address, ChainId};
8-
use tokio::{
9-
select,
10-
sync::{
11-
watch::{self, Receiver},
12-
Mutex,
13-
},
14-
};
8+
use tokio::sync::watch::Receiver;
159
use tracing::warn;
1610

17-
use crate::prelude::{Allocation, AttestationSigner};
11+
use crate::{
12+
prelude::{Allocation, AttestationSigner},
13+
watcher::join_and_map_watcher,
14+
};
1815

1916
/// An always up-to-date list of attestation signers, one for each of the indexer's allocations.
20-
pub async fn attestation_signers(
21-
mut indexer_allocations_rx: Receiver<HashMap<Address, Allocation>>,
17+
pub fn attestation_signers(
18+
indexer_allocations_rx: Receiver<HashMap<Address, Allocation>>,
2219
indexer_mnemonic: Mnemonic,
2320
chain_id: ChainId,
24-
mut dispute_manager_rx: Receiver<Option<Address>>,
21+
dispute_manager_rx: Receiver<Address>,
2522
) -> Receiver<HashMap<Address, AttestationSigner>> {
2623
let attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>> =
2724
Box::leak(Box::new(Mutex::new(HashMap::new())));
28-
let indexer_mnemonic = indexer_mnemonic.to_string();
25+
let indexer_mnemonic = Arc::new(indexer_mnemonic.to_string());
2926

30-
let starter_signers_map = modify_sigers(
31-
Arc::new(indexer_mnemonic.clone()),
32-
chain_id,
33-
attestation_signers_map,
34-
indexer_allocations_rx.clone(),
35-
dispute_manager_rx.clone(),
27+
join_and_map_watcher(
28+
indexer_allocations_rx,
29+
dispute_manager_rx,
30+
move |(allocation, dispute)| {
31+
let indexer_mnemonic = indexer_mnemonic.clone();
32+
modify_sigers(
33+
&indexer_mnemonic,
34+
chain_id,
35+
attestation_signers_map,
36+
&allocation,
37+
&dispute,
38+
)
39+
},
3640
)
37-
.await;
38-
39-
// Whenever the indexer's active or recently closed allocations change, make sure
40-
// we have attestation signers for all of them.
41-
let (signers_tx, signers_rx) = watch::channel(starter_signers_map);
42-
tokio::spawn(async move {
43-
loop {
44-
let updated_signers = select! {
45-
Ok(())= indexer_allocations_rx.changed() =>{
46-
modify_sigers(
47-
Arc::new(indexer_mnemonic.clone()),
48-
chain_id,
49-
attestation_signers_map,
50-
indexer_allocations_rx.clone(),
51-
dispute_manager_rx.clone(),
52-
).await
53-
},
54-
Ok(())= dispute_manager_rx.changed() =>{
55-
modify_sigers(
56-
Arc::new(indexer_mnemonic.clone()),
57-
chain_id,
58-
attestation_signers_map,
59-
indexer_allocations_rx.clone(),
60-
dispute_manager_rx.clone()
61-
).await
62-
},
63-
else=>{
64-
// Something is wrong.
65-
panic!("dispute_manager_rx or allocations_rx was dropped");
66-
}
67-
};
68-
signers_tx
69-
.send(updated_signers)
70-
.expect("Failed to update signers channel");
71-
}
72-
});
73-
74-
signers_rx
7541
}
76-
async fn modify_sigers(
77-
indexer_mnemonic: Arc<String>,
42+
fn modify_sigers(
43+
indexer_mnemonic: &str,
7844
chain_id: ChainId,
7945
attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>>,
80-
allocations_rx: Receiver<HashMap<Address, Allocation>>,
81-
dispute_manager_rx: Receiver<Option<Address>>,
46+
allocations: &HashMap<Address, Allocation>,
47+
dispute_manager: &Address,
8248
) -> HashMap<thegraph_core::Address, AttestationSigner> {
83-
let mut signers = attestation_signers_map.lock().await;
84-
let allocations = allocations_rx.borrow().clone();
85-
let Some(dispute_manager) = *dispute_manager_rx.borrow() else {
86-
return signers.clone();
87-
};
49+
let mut signers = attestation_signers_map.lock().unwrap();
8850
// Remove signers for allocations that are no longer active or recently closed
8951
signers.retain(|id, _| allocations.contains_key(id));
9052

9153
// Create signers for new allocations
9254
for (id, allocation) in allocations.iter() {
9355
if !signers.contains_key(id) {
9456
let signer =
95-
AttestationSigner::new(&indexer_mnemonic, allocation, chain_id, dispute_manager);
57+
AttestationSigner::new(indexer_mnemonic, allocation, chain_id, *dispute_manager);
9658
match signer {
9759
Ok(signer) => {
9860
signers.insert(*id, signer);
@@ -113,24 +75,22 @@ async fn modify_sigers(
11375

11476
#[cfg(test)]
11577
mod tests {
78+
use tokio::sync::watch;
79+
11680
use crate::test_vectors::{DISPUTE_MANAGER_ADDRESS, INDEXER_ALLOCATIONS, INDEXER_MNEMONIC};
11781

11882
use super::*;
11983

12084
#[tokio::test]
12185
async fn test_attestation_signers_update_with_allocations() {
12286
let (allocations_tx, allocations_rx) = watch::channel(HashMap::new());
123-
let (dispute_manager_tx, dispute_manager_rx) = watch::channel(None);
124-
dispute_manager_tx
125-
.send(Some(*DISPUTE_MANAGER_ADDRESS))
126-
.unwrap();
87+
let (_, dispute_manager_rx) = watch::channel(*DISPUTE_MANAGER_ADDRESS);
12788
let mut signers = attestation_signers(
12889
allocations_rx,
12990
INDEXER_MNEMONIC.clone(),
13091
1,
13192
dispute_manager_rx,
132-
)
133-
.await;
93+
);
13494

13595
// Test that an empty set of allocations leads to an empty set of signers
13696
allocations_tx.send(HashMap::new()).unwrap();

common/src/indexer_service/http/indexer_service.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,9 @@ impl IndexerService {
226226
)));
227227

228228
// Identify the dispute manager for the configured network
229-
let dispute_manager = dispute_manager(network_subgraph, Duration::from_secs(3600));
229+
let dispute_manager = dispute_manager(network_subgraph, Duration::from_secs(3600))
230+
.await
231+
.expect("Failed to initialize dispute manager");
230232

231233
// Monitor the indexer's own allocations
232234
let allocations = indexer_allocations(
@@ -243,7 +245,9 @@ impl IndexerService {
243245
.subgraphs
244246
.network
245247
.recently_closed_allocation_buffer_secs,
246-
);
248+
)
249+
.await
250+
.expect("Failed to initialize indexer_allocations watcher");
247251

248252
// Maintain an up-to-date set of attestation signers, one for each
249253
// allocation
@@ -252,8 +256,7 @@ impl IndexerService {
252256
options.config.indexer.operator_mnemonic.clone(),
253257
options.config.blockchain.chain_id as u64,
254258
dispute_manager,
255-
)
256-
.await;
259+
);
257260

258261
let escrow_subgraph: &'static SubgraphClient = Box::leak(Box::new(SubgraphClient::new(
259262
http_client,

common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub mod graphql;
1010
pub mod indexer_service;
1111
pub mod subgraph_client;
1212
pub mod tap;
13+
pub mod watcher;
1314

1415
#[cfg(test)]
1516
mod test_vectors;

0 commit comments

Comments
 (0)