Skip to content

Commit 8e827ee

Browse files
refactor: use tokio::watch for allocations (#393)
1 parent d6ff24c commit 8e827ee

File tree

6 files changed

+90
-82
lines changed

6 files changed

+90
-82
lines changed

common/src/allocations/monitor.rs

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ use std::{
1010
use super::Allocation;
1111
use crate::prelude::SubgraphClient;
1212
use alloy::primitives::{TxHash, B256, U256};
13-
use eventuals::{timer, Eventual, EventualExt};
1413
use graphql_client::GraphQLQuery;
1514
use thegraph_core::{Address, DeploymentId};
16-
use tokio::time::sleep;
15+
use tokio::{
16+
sync::watch::{self, Receiver},
17+
time::{self, sleep},
18+
};
1719
use tracing::warn;
1820

1921
type BigInt = U256;
@@ -61,30 +63,38 @@ pub fn indexer_allocations(
6163
indexer_address: Address,
6264
interval: Duration,
6365
recently_closed_allocation_buffer: Duration,
64-
) -> Eventual<HashMap<Address, Allocation>> {
65-
// Refresh indexer allocations every now and then
66-
timer(interval).map_with_retry(
67-
move |_| async move {
68-
get_allocations(
66+
) -> Receiver<HashMap<Address, Allocation>> {
67+
let (tx, rx) = watch::channel(HashMap::new());
68+
tokio::spawn(async move {
69+
// Refresh indexer allocations every now and then
70+
let mut time_interval = time::interval(interval);
71+
time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
72+
loop {
73+
time_interval.tick().await;
74+
let result = get_allocations(
6975
network_subgraph,
7076
indexer_address,
7177
recently_closed_allocation_buffer,
7278
)
73-
.await
74-
.map_err(|e| e.to_string())
75-
},
76-
// Need to use string errors here because eventuals `map_with_retry` retries
77-
// errors that can be cloned
78-
move |err: String| {
79-
warn!(
80-
"Failed to fetch active or recently closed allocations for indexer {:?}: {}",
81-
indexer_address, err
82-
);
83-
84-
// Sleep for a bit before we retry
85-
sleep(interval.div_f32(2.0))
86-
},
87-
)
79+
.await;
80+
match result {
81+
Ok(allocations) => {
82+
tx.send(allocations)
83+
.expect("Failed to update indexer_allocations channel");
84+
}
85+
Err(err) => {
86+
warn!(
87+
"Failed to fetch active or recently closed allocations for indexer {:?}: {}",
88+
indexer_address, err
89+
);
90+
91+
// Sleep for a bit before we retry
92+
sleep(interval.div_f32(2.0)).await;
93+
}
94+
}
95+
}
96+
});
97+
rx
8898
}
8999

90100
pub async fn get_allocations(

common/src/attestations/signers.rs

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use eventuals::{Eventual, EventualExt};
54
use std::collections::HashMap;
65
use std::sync::Arc;
76
use thegraph_core::{Address, ChainId};
@@ -18,31 +17,19 @@ use crate::prelude::{Allocation, AttestationSigner};
1817

1918
/// An always up-to-date list of attestation signers, one for each of the indexer's allocations.
2019
pub async fn attestation_signers(
21-
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
20+
mut indexer_allocations_rx: Receiver<HashMap<Address, Allocation>>,
2221
indexer_mnemonic: String,
2322
chain_id: ChainId,
2423
mut dispute_manager_rx: Receiver<Option<Address>>,
2524
) -> Receiver<HashMap<Address, AttestationSigner>> {
2625
let attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>> =
2726
Box::leak(Box::new(Mutex::new(HashMap::new())));
2827

29-
// Actively listening to indexer_allocations to update allocations channel
30-
// Temporary fix until the indexer_allocations is migrated to tokio watch
31-
let (allocations_tx, mut allocations_rx) =
32-
watch::channel(indexer_allocations.value_immediate().unwrap_or_default());
33-
indexer_allocations
34-
.pipe(move |allocatons| {
35-
allocations_tx
36-
.send(allocatons)
37-
.expect("Failed to update allocations channel");
38-
})
39-
.forever();
40-
4128
let starter_signers_map = modify_sigers(
4229
Arc::new(indexer_mnemonic.clone()),
4330
chain_id,
4431
attestation_signers_map,
45-
allocations_rx.clone(),
32+
indexer_allocations_rx.clone(),
4633
dispute_manager_rx.clone(),
4734
)
4835
.await;
@@ -53,12 +40,12 @@ pub async fn attestation_signers(
5340
tokio::spawn(async move {
5441
loop {
5542
let updated_signers = select! {
56-
Ok(())= allocations_rx.changed() =>{
43+
Ok(())= indexer_allocations_rx.changed() =>{
5744
modify_sigers(
5845
Arc::new(indexer_mnemonic.clone()),
5946
chain_id,
6047
attestation_signers_map,
61-
allocations_rx.clone(),
48+
indexer_allocations_rx.clone(),
6249
dispute_manager_rx.clone(),
6350
).await
6451
},
@@ -67,7 +54,7 @@ pub async fn attestation_signers(
6754
Arc::new(indexer_mnemonic.clone()),
6855
chain_id,
6956
attestation_signers_map,
70-
allocations_rx.clone(),
57+
indexer_allocations_rx.clone(),
7158
dispute_manager_rx.clone()
7259
).await
7360
},
@@ -129,27 +116,27 @@ mod tests {
129116

130117
#[tokio::test]
131118
async fn test_attestation_signers_update_with_allocations() {
132-
let (mut allocations_writer, allocations) = Eventual::<HashMap<Address, Allocation>>::new();
119+
let (allocations_tx, allocations_rx) = watch::channel(HashMap::new());
133120
let (dispute_manager_tx, dispute_manager_rx) = watch::channel(None);
134121
dispute_manager_tx
135122
.send(Some(*DISPUTE_MANAGER_ADDRESS))
136123
.unwrap();
137124
let mut signers = attestation_signers(
138-
allocations,
125+
allocations_rx,
139126
(*INDEXER_OPERATOR_MNEMONIC).to_string(),
140127
1,
141128
dispute_manager_rx,
142129
)
143130
.await;
144131

145132
// Test that an empty set of allocations leads to an empty set of signers
146-
allocations_writer.write(HashMap::new());
133+
allocations_tx.send(HashMap::new()).unwrap();
147134
signers.changed().await.unwrap();
148135
let latest_signers = signers.borrow().clone();
149136
assert_eq!(latest_signers, HashMap::new());
150137

151138
// Test that writing our set of test allocations results in corresponding signers for all of them
152-
allocations_writer.write((*INDEXER_ALLOCATIONS).clone());
139+
allocations_tx.send((*INDEXER_ALLOCATIONS).clone()).unwrap();
153140
signers.changed().await.unwrap();
154141
let latest_signers = signers.borrow().clone();
155142
assert_eq!(latest_signers.len(), INDEXER_ALLOCATIONS.len());

common/src/tap.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::time::Duration;
1717
use std::{collections::HashMap, sync::Arc};
1818
use tap_core::receipt::checks::ReceiptCheck;
1919
use tokio::sync::mpsc::{self, Sender};
20+
use tokio::sync::watch::Receiver;
2021
use tokio_util::sync::CancellationToken;
2122
use tracing::error;
2223

@@ -38,7 +39,7 @@ pub enum AdapterError {
3839
impl IndexerTapContext {
3940
pub async fn get_checks(
4041
pgpool: PgPool,
41-
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
42+
indexer_allocations: Receiver<HashMap<Address, Allocation>>,
4243
escrow_accounts: Eventual<EscrowAccounts>,
4344
domain_separator: Eip712Domain,
4445
timestamp_error_tolerance: Duration,

common/src/tap/checks/allocation_eligible.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,21 @@ use std::collections::HashMap;
55

66
use alloy::primitives::Address;
77
use anyhow::anyhow;
8-
use eventuals::Eventual;
98

109
use tap_core::receipt::{
1110
checks::{Check, CheckError, CheckResult},
1211
state::Checking,
1312
ReceiptWithState,
1413
};
14+
use tokio::sync::watch::Receiver;
1515

1616
use crate::prelude::Allocation;
1717
pub struct AllocationEligible {
18-
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
18+
indexer_allocations: Receiver<HashMap<Address, Allocation>>,
1919
}
2020

2121
impl AllocationEligible {
22-
pub fn new(indexer_allocations: Eventual<HashMap<Address, Allocation>>) -> Self {
22+
pub fn new(indexer_allocations: Receiver<HashMap<Address, Allocation>>) -> Self {
2323
Self {
2424
indexer_allocations,
2525
}
@@ -31,10 +31,8 @@ impl Check for AllocationEligible {
3131
let allocation_id = receipt.signed_receipt().message.allocation_id;
3232
if !self
3333
.indexer_allocations
34-
.value()
35-
.await
36-
.map(|allocations| allocations.contains_key(&allocation_id))
37-
.unwrap_or(false)
34+
.borrow()
35+
.contains_key(&allocation_id)
3836
{
3937
return Err(CheckError::Failed(anyhow!(
4038
"Receipt allocation ID `{}` is not eligible for this indexer",

tap-agent/src/agent/sender_account.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeV
1313
use std::collections::{HashMap, HashSet};
1414
use std::str::FromStr;
1515
use std::time::Duration;
16+
use tokio::sync::watch::Receiver;
1617
use tokio::task::JoinHandle;
1718

1819
use alloy::dyn_abi::Eip712Domain;
@@ -119,7 +120,7 @@ pub struct SenderAccountArgs {
119120
pub pgpool: PgPool,
120121
pub sender_id: Address,
121122
pub escrow_accounts: Eventual<EscrowAccounts>,
122-
pub indexer_allocations: Eventual<HashSet<Address>>,
123+
pub indexer_allocations: Receiver<HashSet<Address>>,
123124
pub escrow_subgraph: &'static SubgraphClient,
124125
pub domain_separator: Eip712Domain,
125126
pub sender_aggregator_endpoint: String,
@@ -134,7 +135,7 @@ pub struct State {
134135
rav_tracker: SenderFeeTracker,
135136
invalid_receipts_tracker: SenderFeeTracker,
136137
allocation_ids: HashSet<Address>,
137-
_indexer_allocations_handle: PipeHandle,
138+
_indexer_allocations_handle: JoinHandle<()>,
138139
_escrow_account_monitor: PipeHandle,
139140
scheduled_rav_request: Option<JoinHandle<Result<(), MessagingErr<SenderAccountMessage>>>>,
140141

@@ -333,20 +334,21 @@ impl Actor for SenderAccount {
333334
}: Self::Arguments,
334335
) -> std::result::Result<Self::State, ActorProcessingErr> {
335336
let myself_clone = myself.clone();
336-
let _indexer_allocations_handle =
337-
indexer_allocations
338-
.clone()
339-
.pipe_async(move |allocation_ids| {
340-
let myself = myself_clone.clone();
341-
async move {
342-
// Update the allocation_ids
343-
myself
344-
.cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids))
345-
.unwrap_or_else(|e| {
346-
error!("Error while updating allocation_ids: {:?}", e);
347-
});
348-
}
349-
});
337+
let _indexer_allocations_handle = tokio::spawn(async move {
338+
let mut indexer_allocations = indexer_allocations.clone();
339+
loop {
340+
let allocation_ids = indexer_allocations.borrow().clone();
341+
// Update the allocation_ids
342+
myself_clone
343+
.cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids))
344+
.unwrap_or_else(|e| {
345+
error!("Error while updating allocation_ids: {:?}", e);
346+
});
347+
if indexer_allocations.changed().await.is_err() {
348+
break;
349+
}
350+
}
351+
});
350352

351353
let myself_clone = myself.clone();
352354
let pgpool_clone = pgpool.clone();
@@ -938,6 +940,7 @@ pub mod tests {
938940
use std::sync::atomic::AtomicU32;
939941
use std::sync::{Arc, Mutex};
940942
use std::time::Duration;
943+
use tokio::sync::watch;
941944
use wiremock::matchers::{body_string_contains, method};
942945
use wiremock::{Mock, MockServer, ResponseTemplate};
943946

@@ -1039,7 +1042,7 @@ pub mod tests {
10391042
pgpool,
10401043
sender_id: SENDER.1,
10411044
escrow_accounts: escrow_accounts_eventual,
1042-
indexer_allocations: Eventual::from_value(initial_allocation),
1045+
indexer_allocations: watch::channel(initial_allocation).1,
10431046
escrow_subgraph,
10441047
domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(),
10451048
sender_aggregator_endpoint: DUMMY_URL.to_string(),

0 commit comments

Comments
 (0)