Skip to content

Commit 695ad52

Browse files
committed
fix(tap-agent): stop global allocation typing; normalize per-sender with isLegacy + sender_type
- Remove global allocation typing in Manager; keep a single raw watcher - Add per-sender normalization using Network Subgraph Allocation.isLegacy + escrow-derived sender_type - Pass typed Receiver<HashSet<AllocationId>> to SenderAccount; remove in-actor retyping
1 parent eac2b7b commit 695ad52

File tree

3 files changed

+58
-29
lines changed

3 files changed

+58
-29
lines changed

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use std::{
1111
use anyhow::Context;
1212
use bigdecimal::{num_bigint::ToBigInt, ToPrimitive};
1313
use futures::{stream, StreamExt};
14-
use indexer_allocation::Allocation;
1514
use indexer_monitor::{EscrowAccounts, SubgraphClient};
1615
use indexer_query::{
1716
closed_allocations::{self, ClosedAllocations},
@@ -275,9 +274,8 @@ pub struct SenderAccountArgs {
275274
pub sender_id: Address,
276275
/// Watcher that returns a list of escrow accounts for current indexer
277276
pub escrow_accounts: Receiver<EscrowAccounts>,
278-
/// Raw watcher of open and recently closed allocations from Network Subgraph
279-
/// We normalize per-sender to the correct variant (Legacy/Horizon)
280-
pub indexer_allocations: Receiver<HashMap<Address, Allocation>>,
277+
/// Watcher of normalized allocation IDs (Legacy/Horizon) for this sender type
278+
pub indexer_allocations: Receiver<HashSet<AllocationId>>,
281279
/// SubgraphClient of the escrow subgraph
282280
pub escrow_subgraph: &'static SubgraphClient,
283281
/// SubgraphClient of the network subgraph
@@ -830,34 +828,21 @@ impl Actor for SenderAccount {
830828
sender_type,
831829
}: Self::Arguments,
832830
) -> Result<Self::State, ActorProcessingErr> {
833-
// Normalize raw allocation addresses to the correct variant for this sender_type
831+
// Pass-through normalized allocation IDs for this sender type
834832
let myself_clone = myself.clone();
835-
let sender_type_for_log = sender_type; // copy for move into closure
836-
watch_pipe(indexer_allocations, move |alloc_map| {
837-
let raw_count = alloc_map.len();
838-
// Extract addresses and normalize based on sender_type
839-
let normalized: HashSet<AllocationId> = alloc_map
840-
.keys()
841-
.cloned()
842-
.map(|addr| match sender_type_for_log {
843-
SenderType::Legacy => AllocationId::Legacy(AllocationIdCore::from(addr)),
844-
SenderType::Horizon => AllocationId::Horizon(CollectionId::from(addr)),
845-
})
846-
.collect();
847-
833+
watch_pipe(indexer_allocations, move |allocation_ids| {
834+
let count = allocation_ids.len();
848835
tracing::info!(
849836
sender = %sender_id,
850-
?sender_type_for_log,
851-
raw_count,
852-
normalized_count = normalized.len(),
853-
"indexer_allocations update: normalizing allocations for sender_type",
837+
sender_type = ?sender_type,
838+
count,
839+
"indexer_allocations update: received normalized allocations"
854840
);
855-
856-
// Forward normalized set to the actor to update allocations
857841
let myself = myself_clone.clone();
842+
let allocation_ids = allocation_ids.clone();
858843
async move {
859844
myself
860-
.cast(SenderAccountMessage::UpdateAllocationIds(normalized))
845+
.cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids))
861846
.unwrap_or_else(|e| {
862847
tracing::error!(error=?e, "Error while updating allocation_ids");
863848
});

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -991,12 +991,57 @@ impl State {
991991
SenderType::Horizon => self.escrow_accounts_v2.clone(),
992992
};
993993

994+
// Build a normalized allocation watcher for this sender type using isLegacy flag
995+
// from the Network Subgraph. Fallback: if the flag is missing, normalize by sender_type.
996+
let indexer_allocations = {
997+
let sender_type_for_log = sender_type;
998+
map_watcher(self.indexer_allocations.clone(), move |alloc_map| {
999+
let total = alloc_map.len();
1000+
let mut legacy_count = 0usize;
1001+
let mut horizon_count = 0usize;
1002+
let mut mismatched = 0usize;
1003+
let set: HashSet<AllocationId> = alloc_map
1004+
.iter()
1005+
.filter_map(|(addr, alloc)| {
1006+
if alloc.is_legacy {
1007+
legacy_count += 1;
1008+
if matches!(sender_type_for_log, SenderType::Legacy) {
1009+
Some(AllocationId::Legacy(AllocationIdCore::from(*addr)))
1010+
} else {
1011+
mismatched += 1;
1012+
None
1013+
}
1014+
} else {
1015+
horizon_count += 1;
1016+
if matches!(sender_type_for_log, SenderType::Horizon) {
1017+
Some(AllocationId::Horizon(CollectionId::from(*addr)))
1018+
} else {
1019+
mismatched += 1;
1020+
None
1021+
}
1022+
}
1023+
})
1024+
.collect();
1025+
1026+
tracing::info!(
1027+
?sender_type_for_log,
1028+
total,
1029+
legacy = legacy_count,
1030+
horizon = horizon_count,
1031+
mismatched,
1032+
normalized = set.len(),
1033+
"Normalized indexer allocations using isLegacy"
1034+
);
1035+
set
1036+
})
1037+
};
1038+
9941039
Ok(SenderAccountArgs {
9951040
config: self.config,
9961041
pgpool: self.pgpool.clone(),
9971042
sender_id: *sender_id,
9981043
escrow_accounts,
999-
indexer_allocations: self.indexer_allocations.clone(),
1044+
indexer_allocations,
10001045
escrow_subgraph: self.escrow_subgraph,
10011046
network_subgraph: self.network_subgraph,
10021047
domain_separator: self.domain_separator.clone(),

crates/tap-agent/src/test.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use actors::TestableActor;
1313
use anyhow::anyhow;
1414
use bigdecimal::num_bigint::BigInt;
1515
use indexer_config;
16-
use indexer_allocation::Allocation;
1716
use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient};
1817
use indexer_receipt::TapReceipt;
1918
use ractor::{concurrency::JoinHandle, Actor, ActorRef};
@@ -105,7 +104,7 @@ pub fn get_sender_account_config() -> &'static SenderAccountConfig {
105104
#[bon::builder]
106105
pub async fn create_sender_account(
107106
pgpool: PgPool,
108-
#[builder(default = HashMap::new())] initial_allocations_raw: HashMap<Address, Allocation>,
107+
#[builder(default = HashSet::new())] initial_allocation: HashSet<AllocationId>,
109108
#[builder(default = TRIGGER_VALUE)] rav_request_trigger_value: u128,
110109
#[builder(default = TRIGGER_VALUE)] max_amount_willing_to_lose_grt: u128,
111110
escrow_subgraph_endpoint: Option<&str>,
@@ -175,7 +174,7 @@ pub async fn create_sender_account(
175174
pgpool,
176175
sender_id: SENDER.1,
177176
escrow_accounts: escrow_accounts_rx,
178-
indexer_allocations: watch::channel(initial_allocations_raw).1,
177+
indexer_allocations: watch::channel(initial_allocation).1,
179178
escrow_subgraph,
180179
network_subgraph,
181180
domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(),

0 commit comments

Comments
 (0)