Skip to content

Commit 24984ad

Browse files
reversed tokio watch for all except disputes
1 parent 2be0657 commit 24984ad

20 files changed

+379
-392
lines changed

.sqlx/query-23cabe6cd63488809ca71db63f59e194653d2acaa6a74404c8675614759db032.json renamed to .sqlx/query-b1d4dfcd202af310df032edc34d83dabb56d7b947b023ee3a8b32b24b07bcd18.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/src/allocations/monitor.rs

Lines changed: 26 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,10 @@ use std::{
1010
use super::Allocation;
1111
use crate::prelude::SubgraphClient;
1212
use alloy::primitives::{TxHash, B256, U256};
13+
use eventuals::{timer, Eventual, EventualExt};
1314
use graphql_client::GraphQLQuery;
1415
use thegraph_core::{Address, DeploymentId};
15-
use tokio::{
16-
sync::watch::{self, Receiver},
17-
time::{self, sleep},
18-
};
16+
use tokio::time::sleep;
1917
use tracing::warn;
2018

2119
type BigInt = U256;
@@ -63,43 +61,30 @@ pub fn indexer_allocations(
6361
indexer_address: Address,
6462
interval: Duration,
6563
recently_closed_allocation_buffer: Duration,
66-
) -> Receiver<HashMap<Address, Allocation>> {
67-
let (tx, rx) = watch::channel(HashMap::new());
68-
tokio::spawn(async move {
69-
let mut time_interval = time::interval(interval);
70-
// Refresh indexer allocations every now and then
71-
loop {
72-
time_interval.tick().await;
73-
let result = async {
74-
get_allocations(
75-
network_subgraph,
76-
indexer_address,
77-
recently_closed_allocation_buffer,
78-
)
79-
.await
80-
.map_err(|e| e.to_string())
81-
}
82-
.await;
83-
match result {
84-
Ok(res) => {
85-
if tx.send(res).is_err() {
86-
//stopping[something gone wrong with channel]
87-
break;
88-
}
89-
}
90-
Err(err) => {
91-
warn!(
92-
"Failed to fetch active or recently closed allocations for indexer {:?}: {}",
93-
indexer_address, err
94-
);
95-
96-
// Sleep for a bit before we retry
97-
sleep(interval.div_f32(2.0)).await;
98-
}
99-
}
100-
}
101-
});
102-
rx
64+
) -> Eventual<HashMap<Address, Allocation>> {
65+
// Refresh indexer allocations every now and then
66+
timer(interval).map_with_retry(
67+
move |_| async move {
68+
get_allocations(
69+
network_subgraph,
70+
indexer_address,
71+
recently_closed_allocation_buffer,
72+
)
73+
.await
74+
.map_err(|e| e.to_string())
75+
},
76+
// Need to use string errors here because eventuals `map_with_retry` retries
77+
// errors that can be cloned
78+
move |err: String| {
79+
warn!(
80+
"Failed to fetch active or recently closed allocations for indexer {:?}: {}",
81+
indexer_address, err
82+
);
83+
84+
// Sleep for a bit before we retry
85+
sleep(interval.div_f32(2.0))
86+
},
87+
)
10388
}
10489

10590
pub async fn get_allocations(

common/src/attestations/dispute_manager.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ struct DisputeManager;
2424
pub fn dispute_manager(
2525
network_subgraph: &'static SubgraphClient,
2626
interval: Duration,
27-
) -> Receiver<Address> {
28-
let (tx, rx) = watch::channel(Address::default());
27+
) -> Receiver<Option<Address>> {
28+
let (tx, rx) = watch::channel(None);
2929
tokio::spawn(async move {
3030
let mut time_interval = time::interval(interval);
31-
31+
time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
3232
loop {
3333
time_interval.tick().await;
3434

@@ -48,7 +48,7 @@ pub fn dispute_manager(
4848

4949
match result {
5050
Ok(address) => {
51-
if tx.send(address).is_err() {
51+
if tx.send(Some(address)).is_err() {
5252
// stopping
5353
break;
5454
}
@@ -118,6 +118,6 @@ mod test {
118118
let dispute_manager = dispute_manager(network_subgraph, Duration::from_secs(60));
119119
sleep(Duration::from_millis(50)).await;
120120
let result = *dispute_manager.borrow();
121-
assert_eq!(result, *DISPUTE_MANAGER_ADDRESS);
121+
assert_eq!(result.unwrap(), *DISPUTE_MANAGER_ADDRESS);
122122
}
123123
}

common/src/attestations/signers.rs

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,58 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use eventuals::{Eventual, EventualExt};
45
use std::collections::HashMap;
56
use std::sync::Arc;
67
use thegraph_core::{Address, ChainId};
7-
use tokio::sync::{watch, watch::Receiver, Mutex};
8+
use tokio::sync::watch;
9+
use tokio::{
10+
select,
11+
sync::{watch::Receiver, Mutex},
12+
};
813
use tracing::warn;
914

1015
use crate::prelude::{Allocation, AttestationSigner};
1116

1217
/// An always up-to-date list of attestation signers, one for each of the indexer's allocations.
1318
pub fn attestation_signers(
14-
mut indexer_allocations: Receiver<HashMap<Address, Allocation>>,
19+
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
1520
indexer_mnemonic: String,
1621
chain_id: ChainId,
17-
mut dispute_manager: Receiver<Address>,
18-
) -> Receiver<HashMap<Address, AttestationSigner>> {
22+
mut dispute_manager_rx: Receiver<Option<Address>>,
23+
) -> Eventual<HashMap<Address, AttestationSigner>> {
1924
let attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>> =
2025
Box::leak(Box::new(Mutex::new(HashMap::new())));
26+
2127
let indexer_mnemonic = Arc::new(indexer_mnemonic);
2228

23-
let (tx, rx) = watch::channel(HashMap::new());
29+
// Whenever the indexer's active or recently closed allocations change, make sure
30+
// we have attestation signers for all of them
31+
let (mut signers_writer, signers_reader) =
32+
Eventual::<HashMap<Address, AttestationSigner>>::new();
2433

2534
tokio::spawn(async move {
35+
//listening to the allocation eventual and converting them to reciever
36+
//using pipe for updation
37+
//for temporary pupose only
38+
let (allocations_tx, mut allocations_rx) =
39+
watch::channel(indexer_allocations.value().await.unwrap());
40+
let _p1 = indexer_allocations.pipe(move |allocatons| {
41+
let _ = allocations_tx.send(allocatons);
42+
});
2643
loop {
27-
tokio::select! {
28-
Ok(_) = indexer_allocations.changed() => {},
29-
Ok(_) = dispute_manager.changed() => {},
30-
else => break,
44+
select! {
45+
Ok(_)= allocations_rx.changed() =>{},
46+
Ok(_)= dispute_manager_rx.changed() =>{},
3147
}
32-
33-
let allocations = indexer_allocations.borrow().clone();
34-
let dispute_manager = *dispute_manager.borrow();
35-
let indexer_mnemonic = indexer_mnemonic.clone();
36-
3748
let mut signers = attestation_signers_map.lock().await;
3849

50+
let allocations = allocations_rx.borrow().clone();
51+
let dispute_manager = *dispute_manager_rx.borrow();
52+
if dispute_manager.is_none() {
53+
continue;
54+
}
55+
let dispute_manager = dispute_manager.unwrap();
3956
// Remove signers for allocations that are no longer active or recently closed
4057
signers.retain(|id, _| allocations.contains_key(id));
4158

@@ -60,18 +77,15 @@ pub fn attestation_signers(
6077
}
6178
}
6279

63-
// sending updated signers map
64-
tx.send(signers.clone()).unwrap();
80+
signers_writer.write(signers.clone());
6581
}
6682
});
6783

68-
rx
84+
signers_reader
6985
}
7086

7187
#[cfg(test)]
7288
mod tests {
73-
use tokio::time::sleep;
74-
7589
use crate::test_vectors::{
7690
DISPUTE_MANAGER_ADDRESS, INDEXER_ALLOCATIONS, INDEXER_OPERATOR_MNEMONIC,
7791
};
@@ -80,29 +94,29 @@ mod tests {
8094

8195
#[tokio::test]
8296
async fn test_attestation_signers_update_with_allocations() {
83-
let (allocations_tx, allocations_rx) = watch::channel(HashMap::new());
84-
let (dispute_manager_tx, dispute_manager_rx) = watch::channel(Address::default());
97+
let (mut allocations_writer, allocations) = Eventual::<HashMap<Address, Allocation>>::new();
98+
let (dispute_manager_writer, dispute_manager) = watch::channel(None);
8599

86-
dispute_manager_tx.send(*DISPUTE_MANAGER_ADDRESS).unwrap();
100+
dispute_manager_writer
101+
.send(Some(*DISPUTE_MANAGER_ADDRESS))
102+
.unwrap();
87103

88-
let signers_rx = attestation_signers(
89-
allocations_rx,
104+
let signers = attestation_signers(
105+
allocations,
90106
(*INDEXER_OPERATOR_MNEMONIC).to_string(),
91107
1,
92-
dispute_manager_rx,
108+
dispute_manager,
93109
);
110+
let mut signers = signers.subscribe();
94111

95112
// Test that an empty set of allocations leads to an empty set of signers
96-
allocations_tx.send(HashMap::new()).unwrap();
97-
//change wait if required
98-
sleep(std::time::Duration::from_millis(50)).await; // waiting for propegation
99-
let latest_signers = signers_rx.borrow().clone();
113+
allocations_writer.write(HashMap::new());
114+
let latest_signers = signers.next().await.unwrap();
100115
assert_eq!(latest_signers, HashMap::new());
101116

102117
// Test that writing our set of test allocations results in corresponding signers for all of them
103-
allocations_tx.send((*INDEXER_ALLOCATIONS).clone()).unwrap();
104-
sleep(std::time::Duration::from_millis(50)).await; // waiting for propegation
105-
let latest_signers = signers_rx.borrow().clone();
118+
allocations_writer.write((*INDEXER_ALLOCATIONS).clone());
119+
let latest_signers = signers.next().await.unwrap();
106120
assert_eq!(latest_signers.len(), INDEXER_ALLOCATIONS.len());
107121
for signer_allocation_id in latest_signers.keys() {
108122
assert!(INDEXER_ALLOCATIONS

common/src/escrow_accounts.rs

Lines changed: 20 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@ use std::{
99

1010
use alloy::primitives::U256;
1111
use anyhow::{anyhow, Result};
12+
use eventuals::{timer, Eventual, EventualExt};
1213
use graphql_client::GraphQLQuery;
1314
use thegraph_core::Address;
1415
use thiserror::Error;
15-
use tokio::{
16-
sync::watch::{self, Receiver},
17-
time::{self, sleep},
18-
};
16+
use tokio::time::sleep;
1917
use tracing::{error, warn};
2018

2119
use crate::prelude::SubgraphClient;
@@ -107,34 +105,22 @@ pub fn escrow_accounts(
107105
indexer_address: Address,
108106
interval: Duration,
109107
reject_thawing_signers: bool,
110-
) -> Receiver<EscrowAccounts> {
111-
let (tx, rx) = watch::channel(EscrowAccounts::default());
112-
tokio::spawn(async move {
113-
let mut time_interval = time::interval(interval);
108+
) -> Eventual<EscrowAccounts> {
109+
timer(interval).map_with_retry(
110+
move |_| async move {
111+
get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers)
112+
.await
113+
.map_err(|e| e.to_string())
114+
},
115+
move |err: String| {
116+
error!(
117+
"Failed to fetch escrow accounts for indexer {:?}: {}",
118+
indexer_address, err
119+
);
114120

115-
loop {
116-
time_interval.tick().await;
117-
let result =
118-
get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers)
119-
.await
120-
.map_err(|e| e.to_string());
121-
match result {
122-
Ok(escrow_accounts) => {
123-
if tx.send(escrow_accounts).is_err() {
124-
break; // something wrong with channel
125-
}
126-
}
127-
Err(err) => {
128-
error!(
129-
"Failed to fetch escrow accounts for indexer {:?}: {}",
130-
indexer_address, err
131-
);
132-
sleep(interval.div_f32(2.0)).await;
133-
}
134-
}
135-
}
136-
});
137-
rx
121+
sleep(interval.div_f32(2.0))
122+
},
123+
)
138124
}
139125

140126
async fn get_escrow_accounts(
@@ -250,15 +236,15 @@ mod tests {
250236
);
251237
mock_server.register(mock).await;
252238

253-
let accounts_rx = escrow_accounts(
239+
let accounts = escrow_accounts(
254240
escrow_subgraph,
255241
*test_vectors::INDEXER_ADDRESS,
256242
Duration::from_secs(60),
257243
true,
258244
);
259-
sleep(Duration::from_millis(50)).await;
245+
260246
assert_eq!(
261-
accounts_rx.borrow().clone(),
247+
accounts.value().await.unwrap(),
262248
EscrowAccounts::new(
263249
test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(),
264250
test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(),

common/src/indexer_service/http/indexer_service.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use axum::{
2020
};
2121
use axum::{serve, ServiceExt};
2222
use build_info::BuildInfo;
23+
use eventuals::Eventual;
2324
use prometheus::TextEncoder;
2425
use reqwest::StatusCode;
2526
use serde::{de::DeserializeOwned, Serialize};
@@ -29,7 +30,6 @@ use thegraph_core::{Address, Attestation, DeploymentId};
2930
use thiserror::Error;
3031
use tokio::net::TcpListener;
3132
use tokio::signal;
32-
use tokio::sync::watch::Receiver;
3333
use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};
3434
use tower_http::{cors, cors::CorsLayer, normalize_path::NormalizePath, trace::TraceLayer};
3535
use tracing::error;
@@ -184,12 +184,12 @@ where
184184
I: IndexerServiceImpl + Sync + Send + 'static,
185185
{
186186
pub config: IndexerServiceConfig,
187-
pub attestation_signers: Receiver<HashMap<Address, AttestationSigner>>,
187+
pub attestation_signers: Eventual<HashMap<Address, AttestationSigner>>,
188188
pub tap_manager: Manager<IndexerTapContext>,
189189
pub service_impl: Arc<I>,
190190

191191
// tap
192-
pub escrow_accounts: Receiver<EscrowAccounts>,
192+
pub escrow_accounts: Eventual<EscrowAccounts>,
193193
pub domain_separator: Eip712Domain,
194194
}
195195

0 commit comments

Comments
 (0)