Skip to content

Commit 62170ea

Browse files
committed
fix: v1 processing fix allocation type
1 parent dfc9c34 commit 62170ea

File tree

3 files changed

+47
-57
lines changed

3 files changed

+47
-57
lines changed

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::{
1111
use anyhow::Context;
1212
use bigdecimal::{num_bigint::ToBigInt, ToPrimitive};
1313
use futures::{stream, StreamExt};
14+
use indexer_allocation::Allocation;
1415
use indexer_monitor::{EscrowAccounts, SubgraphClient};
1516
use indexer_query::{
1617
closed_allocations::{self, ClosedAllocations},
@@ -274,8 +275,9 @@ pub struct SenderAccountArgs {
274275
pub sender_id: Address,
275276
/// Watcher that returns a list of escrow accounts for current indexer
276277
pub escrow_accounts: Receiver<EscrowAccounts>,
277-
/// Watcher that returns a set of open and recently closed allocation ids
278-
pub indexer_allocations: Receiver<HashSet<AllocationId>>,
278+
/// Raw watcher of open and recently closed allocations from Network Subgraph
279+
/// We normalize per-sender to the correct variant (Legacy/Horizon)
280+
pub indexer_allocations: Receiver<HashMap<Address, Allocation>>,
279281
/// SubgraphClient of the escrow subgraph
280282
pub escrow_subgraph: &'static SubgraphClient,
281283
/// SubgraphClient of the network subgraph
@@ -828,16 +830,38 @@ impl Actor for SenderAccount {
828830
sender_type,
829831
}: Self::Arguments,
830832
) -> Result<Self::State, ActorProcessingErr> {
833+
// Normalize raw allocation addresses to the correct variant for this sender_type
831834
let myself_clone = myself.clone();
832-
watch_pipe(indexer_allocations, move |allocation_ids| {
833-
let allocation_ids = allocation_ids.clone();
834-
// Update the allocation_ids
835-
myself_clone
836-
.cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids))
837-
.unwrap_or_else(|e| {
838-
tracing::error!(error=?e, "Error while updating allocation_ids");
839-
});
840-
async {}
835+
let sender_type_for_log = sender_type; // copy for move into closure
836+
watch_pipe(indexer_allocations, move |alloc_map| {
837+
let raw_count = alloc_map.len();
838+
// Extract addresses and normalize based on sender_type
839+
let normalized: HashSet<AllocationId> = alloc_map
840+
.keys()
841+
.cloned()
842+
.map(|addr| match sender_type_for_log {
843+
SenderType::Legacy => AllocationId::Legacy(AllocationIdCore::from(addr)),
844+
SenderType::Horizon => AllocationId::Horizon(CollectionId::from(addr)),
845+
})
846+
.collect();
847+
848+
tracing::info!(
849+
sender = %sender_id,
850+
?sender_type_for_log,
851+
raw_count,
852+
normalized_count = normalized.len(),
853+
"indexer_allocations update: normalizing allocations for sender_type",
854+
);
855+
856+
// Forward normalized set to the actor to update allocations
857+
let myself = myself_clone.clone();
858+
async move {
859+
myself
860+
.cast(SenderAccountMessage::UpdateAllocationIds(normalized))
861+
.unwrap_or_else(|e| {
862+
tracing::error!(error=?e, "Error while updating allocation_ids");
863+
});
864+
}
841865
});
842866

843867
let myself_clone = myself.clone();

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 9 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,8 @@ pub struct State {
304304
domain_separator: Eip712Domain,
305305
domain_separator_v2: Eip712Domain,
306306
pgpool: PgPool,
307-
indexer_allocations: Receiver<HashSet<AllocationId>>,
307+
// Raw allocation watcher (address -> Allocation). Normalized per-sender later.
308+
indexer_allocations: Receiver<HashMap<Address, Allocation>>,
308309
/// Watcher containing the escrow accounts for v1
309310
escrow_accounts_v1: Receiver<EscrowAccounts>,
310311
/// Watcher containing the escrow accounts for v2
@@ -341,48 +342,12 @@ impl Actor for SenderAccountsManager {
341342
prefix,
342343
}: Self::Arguments,
343344
) -> Result<Self::State, ActorProcessingErr> {
344-
// we can use config.tap_mode.is_horizon() directly
345-
// because agent_start method does the horizon smart contract/subgraph validation
346-
// and updates the passed in config accordingly
347-
let is_horizon_active = config.tap_mode.is_horizon();
348-
if is_horizon_active {
349-
tracing::info!(is_horizon = true, "Allocations will use Horizon type");
350-
} else {
351-
tracing::info!(is_horizon = false, "Allocations will use Legacy type");
352-
}
353-
354-
let indexer_allocations = map_watcher(indexer_allocations, move |allocation_id| {
355-
let allocations: HashSet<_> = allocation_id
356-
.keys()
357-
.cloned()
358-
.map(|addr| {
359-
if is_horizon_active {
360-
AllocationId::Horizon(CollectionId::from(addr))
361-
} else {
362-
AllocationId::Legacy(AllocationIdCore::from(addr))
363-
}
364-
})
365-
.collect();
366-
367-
tracing::info!(
368-
"Allocation watcher update: {} allocations mapped to {} format",
369-
allocations.len(),
370-
if is_horizon_active {
371-
"Horizon"
372-
} else {
373-
"Legacy"
374-
}
375-
);
376-
for alloc in &allocations {
377-
tracing::debug!(
378-
"Mapped allocation: {} (address: {})",
379-
alloc,
380-
alloc.address()
381-
);
382-
}
383-
384-
allocations
385-
});
345+
// Do not pre-map allocations globally. We keep the raw watcher and
346+
// normalize per SenderAccount based on its sender_type (Legacy/Horizon).
347+
tracing::info!(
348+
horizon_active = %config.tap_mode.is_horizon(),
349+
"Using raw indexer_allocations watcher; normalization happens per sender"
350+
);
386351
// we need two connections because each one will listen to different notify events
387352
let pglistener_v1 = PgListener::connect_with(&pgpool.clone()).await.unwrap();
388353

@@ -1420,7 +1385,7 @@ mod tests {
14201385
new_receipts_watcher_handle_v1: None,
14211386
new_receipts_watcher_handle_v2: None,
14221387
pgpool,
1423-
indexer_allocations: watch::channel(HashSet::new()).1,
1388+
indexer_allocations: watch::channel(HashMap::new()).1,
14241389
escrow_accounts_v1: watch::channel(escrow_accounts.clone()).1,
14251390
escrow_accounts_v2: watch::channel(escrow_accounts).1,
14261391
escrow_subgraph: get_subgraph_client().await,

crates/tap-agent/src/test.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use actors::TestableActor;
1313
use anyhow::anyhow;
1414
use bigdecimal::num_bigint::BigInt;
1515
use indexer_config;
16+
use indexer_allocation::Allocation;
1617
use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient};
1718
use indexer_receipt::TapReceipt;
1819
use ractor::{concurrency::JoinHandle, Actor, ActorRef};
@@ -104,7 +105,7 @@ pub fn get_sender_account_config() -> &'static SenderAccountConfig {
104105
#[bon::builder]
105106
pub async fn create_sender_account(
106107
pgpool: PgPool,
107-
#[builder(default = HashSet::new())] initial_allocation: HashSet<AllocationId>,
108+
#[builder(default = HashMap::new())] initial_allocations_raw: HashMap<Address, Allocation>,
108109
#[builder(default = TRIGGER_VALUE)] rav_request_trigger_value: u128,
109110
#[builder(default = TRIGGER_VALUE)] max_amount_willing_to_lose_grt: u128,
110111
escrow_subgraph_endpoint: Option<&str>,
@@ -174,7 +175,7 @@ pub async fn create_sender_account(
174175
pgpool,
175176
sender_id: SENDER.1,
176177
escrow_accounts: escrow_accounts_rx,
177-
indexer_allocations: watch::channel(initial_allocation).1,
178+
indexer_allocations: watch::channel(initial_allocations_raw).1,
178179
escrow_subgraph,
179180
network_subgraph,
180181
domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(),

0 commit comments

Comments
 (0)