Skip to content

Commit 26a8be3

Browse files
committed
feat(tap-agent): finish v2 implementation
1 parent 8b4cb66 commit 26a8be3

File tree

7 files changed

+371
-123
lines changed

7 files changed

+371
-123
lines changed

crates/tap-agent/src/agent.rs

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -165,26 +165,58 @@ pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandl
165165
.await
166166
.expect("Error creating escrow_accounts channel");
167167

168+
// V2 escrow accounts are in the network subgraph, not a separate TAP v2 subgraph
168169
let escrow_accounts_v2 = escrow_accounts_v2(
169-
escrow_subgraph,
170+
network_subgraph,
170171
*indexer_address,
171-
*escrow_sync_interval,
172+
*network_sync_interval,
172173
false,
173174
)
174175
.await
175-
.expect("Error creating escrow_accounts channel");
176+
.expect("Error creating escrow_accounts_v2 channel");
177+
178+
// Determine if we should check for Horizon contracts and potentially enable hybrid mode:
179+
// - If horizon.enabled = false: Pure legacy mode, no Horizon detection
180+
// - If horizon.enabled = true: Check if Horizon contracts are active in the network
181+
let is_horizon_enabled = if CONFIG.horizon.enabled {
182+
tracing::info!("Horizon migration support enabled - checking if Horizon contracts are active in the network");
183+
match indexer_monitor::is_horizon_active(network_subgraph).await {
184+
Ok(active) => {
185+
if active {
186+
tracing::info!("Horizon contracts detected in network subgraph - enabling hybrid migration mode");
187+
tracing::info!("TAP Agent Mode: Process existing V1 receipts for RAVs, accept new V2 receipts");
188+
} else {
189+
tracing::info!("Horizon contracts not yet active in network subgraph - remaining in legacy mode");
190+
}
191+
active
192+
}
193+
Err(e) => {
194+
tracing::warn!(
195+
"Failed to detect Horizon contracts: {}. Remaining in legacy mode.",
196+
e
197+
);
198+
false
199+
}
200+
}
201+
} else {
202+
tracing::info!(
203+
"Horizon migration support disabled in configuration - using pure legacy mode"
204+
);
205+
false
206+
};
207+
208+
// In both modes we need both watchers for the hybrid processing
209+
let (escrow_accounts_v1_final, escrow_accounts_v2_final) = if is_horizon_enabled {
210+
tracing::info!("TAP Agent: Horizon migration mode - processing existing V1 receipts and new V2 receipts");
211+
(escrow_accounts_v1, escrow_accounts_v2)
212+
} else {
213+
tracing::info!("TAP Agent: Legacy mode - V1 receipts only");
214+
(escrow_accounts_v1, escrow_accounts_v2) // Still keep V2 watcher for consistency
215+
};
176216

177217
let config = Box::leak(Box::new({
178218
let mut config = SenderAccountConfig::from_config(&CONFIG);
179-
// Enable Horizon support
180-
config.horizon_enabled = true;
181-
// Add a warning log so operators know their setting was ignore
182-
if CONFIG.horizon.enabled {
183-
tracing::warn!(
184-
"Horizon support is configured as enabled but has been forcibly disabled as it's not fully supported yet. \
185-
This is a temporary measure until Horizon support is stable."
186-
);
187-
}
219+
config.horizon_enabled = is_horizon_enabled;
188220
config
189221
}));
190222

@@ -193,8 +225,8 @@ pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandl
193225
domain_separator: EIP_712_DOMAIN.clone(),
194226
pgpool,
195227
indexer_allocations,
196-
escrow_accounts_v1,
197-
escrow_accounts_v2,
228+
escrow_accounts_v1: escrow_accounts_v1_final,
229+
escrow_accounts_v2: escrow_accounts_v2_final,
198230
escrow_subgraph,
199231
network_subgraph,
200232
sender_aggregator_endpoints: sender_aggregator_endpoints.clone(),

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -631,14 +631,6 @@ impl State {
631631
sender_balance = self.sender_balance.to_u128(),
632632
"Denying sender."
633633
);
634-
// Check if this is horizon like sender and if it is actually enable,
635-
// otherwise just ignore.
636-
// FIXME: This should be removed once full horizon support
637-
// is implemented!
638-
if matches!(self.sender_type, SenderType::Horizon) && !self.config.horizon_enabled {
639-
return;
640-
}
641-
642634
SenderAccount::deny_sender(self.sender_type, &self.pgpool, self.sender).await;
643635
self.denied = true;
644636
SENDER_DENIED
@@ -865,14 +857,85 @@ impl Actor for SenderAccount {
865857
_ => vec![],
866858
}
867859
}
868-
// TODO Implement query for unfinalized v2 transactions
869-
// Depends on Escrow Subgraph Schema
870860
SenderType::Horizon => {
871861
if config.horizon_enabled {
872-
todo!("Implement query for unfinalized v2 transactions, It depends on Escrow Subgraph Schema")
862+
// V2 doesn't have transaction tracking like V1, but we can check if the RAVs
863+
// we're about to redeem are still the latest ones by querying LatestRavs.
864+
// If the subgraph has newer RAVs, it means ours were already redeemed.
865+
use indexer_query::latest_ravs_v2::{self, LatestRavs};
866+
867+
let collection_ids: Vec<String> = last_non_final_ravs
868+
.iter()
869+
.map(|(collection_id, _)| collection_id.clone())
870+
.collect();
871+
872+
if !collection_ids.is_empty() {
873+
// For V2, use the indexer address as the data service since the indexer
874+
// is providing the data service for the queries
875+
let data_service = config.indexer_address;
876+
877+
match escrow_subgraph
878+
.query::<LatestRavs, _>(latest_ravs_v2::Variables {
879+
payer: format!("{:x?}", sender_id),
880+
data_service: format!("{:x?}", data_service),
881+
service_provider: format!("{:x?}", config.indexer_address),
882+
collection_ids: collection_ids.clone(),
883+
})
884+
.await
885+
{
886+
Ok(Ok(response)) => {
887+
// Create a map of our current RAVs for easy lookup
888+
let our_ravs: HashMap<String, u128> = last_non_final_ravs
889+
.iter()
890+
.map(|(collection_id, value)| {
891+
let value_u128 = value
892+
.to_bigint()
893+
.and_then(|v| v.to_u128())
894+
.unwrap_or(0);
895+
(collection_id.clone(), value_u128)
896+
})
897+
.collect();
898+
899+
// Check which RAVs have been updated (indicating redemption)
900+
let mut finalized_allocation_ids = vec![];
901+
for rav in response.latest_ravs {
902+
if let Some(&our_value) = our_ravs.get(&rav.id) {
903+
// If the subgraph RAV has higher value, our RAV was redeemed
904+
if let Ok(subgraph_value) =
905+
rav.value_aggregate.parse::<u128>()
906+
{
907+
if subgraph_value > our_value {
908+
// Return collection ID string for filtering
909+
finalized_allocation_ids.push(rav.id);
910+
}
911+
}
912+
}
913+
}
914+
finalized_allocation_ids
915+
}
916+
Ok(Err(e)) => {
917+
tracing::warn!(
918+
error = %e,
919+
sender = %sender_id,
920+
"Failed to query V2 latest RAVs, assuming none are finalized"
921+
);
922+
vec![]
923+
}
924+
Err(e) => {
925+
tracing::warn!(
926+
error = %e,
927+
sender = %sender_id,
928+
"Failed to execute V2 latest RAVs query, assuming none are finalized"
929+
);
930+
vec![]
931+
}
932+
}
933+
} else {
934+
vec![]
935+
}
936+
} else {
937+
vec![]
873938
}
874-
// if we have any problems, we don't want to filter out
875-
vec![]
876939
}
877940
};
878941

0 commit comments

Comments
 (0)