Skip to content

Commit 6345f6c

Browse files
committed
tap-agent
1 parent 6c0497a commit 6345f6c

File tree

4 files changed

+193
-90
lines changed

4 files changed

+193
-90
lines changed

crates/tap-agent/src/agent.rs

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -175,37 +175,48 @@ pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandl
175175
.await
176176
.expect("Error creating escrow_accounts_v2 channel");
177177

178-
// Configure watchers based on Horizon mode
179-
use indexer_config::HorizonMode;
180-
let (escrow_accounts_v1_final, escrow_accounts_v2_final) = match CONFIG.horizon.mode {
181-
HorizonMode::Legacy => {
182-
tracing::info!("Horizon mode: Legacy - v1 receipts only");
183-
(escrow_accounts_v1, escrow_accounts_v2) // Keep both for compatibility
184-
}
185-
HorizonMode::Transition => {
186-
tracing::info!("Horizon mode: Transition - both v1 and v2 receipts supported");
187-
(escrow_accounts_v1, escrow_accounts_v2)
188-
}
189-
HorizonMode::Full => {
190-
tracing::info!("Horizon mode: Full - v2 receipts only");
191-
(escrow_accounts_v1, escrow_accounts_v2) // Keep both for compatibility
178+
// Determine if we should check for Horizon contracts and potentially enable hybrid mode:
179+
// - If horizon.enabled = false: Pure legacy mode, no Horizon detection
180+
// - If horizon.enabled = true: Check if Horizon contracts are active in the network
181+
let is_horizon_enabled = if CONFIG.horizon.enabled {
182+
tracing::info!("Horizon migration support enabled - checking if Horizon contracts are active in the network");
183+
match indexer_monitor::is_horizon_active(network_subgraph).await {
184+
Ok(active) => {
185+
if active {
186+
tracing::info!("Horizon contracts detected in network subgraph - enabling hybrid migration mode");
187+
tracing::info!("TAP Agent Mode: Process existing V1 receipts for RAVs, accept new V2 receipts");
188+
} else {
189+
tracing::info!("Horizon contracts not yet active in network subgraph - remaining in legacy mode");
190+
}
191+
active
192+
}
193+
Err(e) => {
194+
tracing::warn!(
195+
"Failed to detect Horizon contracts: {}. Remaining in legacy mode.",
196+
e
197+
);
198+
false
199+
}
192200
}
201+
} else {
202+
tracing::info!(
203+
"Horizon migration support disabled in configuration - using pure legacy mode"
204+
);
205+
false
206+
};
207+
208+
// In both modes we need both watchers for the hybrid processing
209+
let (escrow_accounts_v1_final, escrow_accounts_v2_final) = if is_horizon_enabled {
210+
tracing::info!("TAP Agent: Horizon migration mode - processing existing V1 receipts and new V2 receipts");
211+
(escrow_accounts_v1, escrow_accounts_v2)
212+
} else {
213+
tracing::info!("TAP Agent: Legacy mode - V1 receipts only");
214+
(escrow_accounts_v1, escrow_accounts_v2) // Still keep V2 watcher for consistency
193215
};
194216

195217
let config = Box::leak(Box::new({
196218
let mut config = SenderAccountConfig::from_config(&CONFIG);
197-
// Use the configuration setting for Horizon support
198-
match CONFIG.horizon.mode {
199-
HorizonMode::Legacy => {
200-
config.horizon_enabled = false;
201-
}
202-
HorizonMode::Transition => {
203-
config.horizon_enabled = true;
204-
}
205-
HorizonMode::Full => {
206-
config.horizon_enabled = true;
207-
}
208-
}
219+
config.horizon_enabled = is_horizon_enabled;
209220
config
210221
}));
211222

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -422,11 +422,7 @@ impl SenderAccountConfig {
422422
rav_request_timeout: config.tap.rav_request.request_timeout_secs,
423423
tap_sender_timeout: config.tap.sender_timeout_secs,
424424
trusted_senders: config.tap.trusted_senders.clone(),
425-
horizon_enabled: match config.horizon.mode {
426-
indexer_config::HorizonMode::Legacy => false,
427-
indexer_config::HorizonMode::Transition => true,
428-
indexer_config::HorizonMode::Full => true,
429-
},
425+
horizon_enabled: config.horizon.enabled,
430426
}
431427
}
432428
}

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 131 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use reqwest::Url;
2020
use serde::Deserialize;
2121
use sqlx::{postgres::PgListener, PgPool};
2222
use thegraph_core::{
23-
alloy::{hex::ToHexExt, primitives::Address, sol_types::Eip712Domain},
23+
alloy::{primitives::Address, sol_types::Eip712Domain},
2424
AllocationId as AllocationIdCore, CollectionId,
2525
};
2626
use tokio::{select, sync::watch::Receiver};
@@ -39,14 +39,14 @@ static RECEIPTS_CREATED: LazyLock<CounterVec> = LazyLock::new(|| {
3939
.unwrap()
4040
});
4141

42-
/// Notification received by pgnotify
42+
/// Notification received by pgnotify for V1 (legacy) receipts
4343
///
44-
/// This contains a list of properties that are sent by postgres when a receipt is inserted
44+
/// This contains a list of properties that are sent by postgres when a V1 receipt is inserted
4545
#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
46-
pub struct NewReceiptNotification {
46+
pub struct NewReceiptNotificationV1 {
4747
/// id inside the table
4848
pub id: u64,
49-
/// address of the allocation
49+
/// address of the allocation (V1 uses 20-byte allocation_id)
5050
pub allocation_id: Address,
5151
/// address of wallet that signed this receipt
5252
pub signer_address: Address,
@@ -56,6 +56,81 @@ pub struct NewReceiptNotification {
5656
pub value: u128,
5757
}
5858

59+
/// Notification received by pgnotify for V2 (Horizon) receipts
60+
///
61+
/// This contains a list of properties that are sent by postgres when a V2 receipt is inserted
62+
#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
63+
pub struct NewReceiptNotificationV2 {
64+
/// id inside the table
65+
pub id: u64,
66+
/// collection id (V2 uses 32-byte collection_id)
67+
pub collection_id: String, // 64-character hex string from database
68+
/// address of wallet that signed this receipt
69+
pub signer_address: Address,
70+
/// timestamp of the receipt
71+
pub timestamp_ns: u64,
72+
/// value of the receipt
73+
pub value: u128,
74+
}
75+
76+
/// Unified notification that can represent both V1 and V2 receipts
77+
#[derive(Debug, PartialEq, Eq, Clone)]
78+
pub enum NewReceiptNotification {
79+
/// V1 (Legacy) receipt notification with allocation_id
80+
V1(NewReceiptNotificationV1),
81+
/// V2 (Horizon) receipt notification with collection_id
82+
V2(NewReceiptNotificationV2),
83+
}
84+
85+
impl NewReceiptNotification {
86+
/// Get the ID regardless of version
87+
pub fn id(&self) -> u64 {
88+
match self {
89+
NewReceiptNotification::V1(n) => n.id,
90+
NewReceiptNotification::V2(n) => n.id,
91+
}
92+
}
93+
94+
/// Get the signer address regardless of version
95+
pub fn signer_address(&self) -> Address {
96+
match self {
97+
NewReceiptNotification::V1(n) => n.signer_address,
98+
NewReceiptNotification::V2(n) => n.signer_address,
99+
}
100+
}
101+
102+
/// Get the timestamp regardless of version
103+
pub fn timestamp_ns(&self) -> u64 {
104+
match self {
105+
NewReceiptNotification::V1(n) => n.timestamp_ns,
106+
NewReceiptNotification::V2(n) => n.timestamp_ns,
107+
}
108+
}
109+
110+
/// Get the value regardless of version
111+
pub fn value(&self) -> u128 {
112+
match self {
113+
NewReceiptNotification::V1(n) => n.value,
114+
NewReceiptNotification::V2(n) => n.value,
115+
}
116+
}
117+
118+
/// Get the allocation ID as a unified type
119+
pub fn allocation_id(&self) -> AllocationId {
120+
match self {
121+
NewReceiptNotification::V1(n) => {
122+
AllocationId::Legacy(AllocationIdCore::from(n.allocation_id))
123+
}
124+
NewReceiptNotification::V2(n) => {
125+
// Convert the hex string to CollectionId
126+
let collection_id = CollectionId::from_str(&n.collection_id)
127+
.expect("Valid collection_id in database");
128+
AllocationId::Horizon(collection_id)
129+
}
130+
}
131+
}
132+
}
133+
59134
/// Manager Actor
60135
#[derive(Debug, Clone)]
61136
pub struct SenderAccountsManager;
@@ -888,14 +963,43 @@ async fn new_receipts_watcher(
888963
);
889964
break;
890965
};
891-
let Ok(new_receipt_notification) =
892-
serde_json::from_str::<NewReceiptNotification>(pg_notification.payload())
893-
else {
894-
tracing::error!(
895-
"should be able to deserialize the Postgres Notify event payload as a \
896-
NewReceiptNotification",
897-
);
898-
break;
966+
// Determine notification format based on the channel name
967+
let new_receipt_notification = match pg_notification.channel() {
968+
"scalar_tap_receipt_notification" => {
969+
// V1 notification format
970+
match serde_json::from_str::<NewReceiptNotificationV1>(pg_notification.payload()) {
971+
Ok(v1_notif) => NewReceiptNotification::V1(v1_notif),
972+
Err(e) => {
973+
tracing::error!(
974+
"Failed to deserialize V1 notification payload: {}, payload: {}",
975+
e,
976+
pg_notification.payload()
977+
);
978+
break;
979+
}
980+
}
981+
}
982+
"tap_horizon_receipt_notification" => {
983+
// V2 notification format
984+
match serde_json::from_str::<NewReceiptNotificationV2>(pg_notification.payload()) {
985+
Ok(v2_notif) => NewReceiptNotification::V2(v2_notif),
986+
Err(e) => {
987+
tracing::error!(
988+
"Failed to deserialize V2 notification payload: {}, payload: {}",
989+
e,
990+
pg_notification.payload()
991+
);
992+
break;
993+
}
994+
}
995+
}
996+
unknown_channel => {
997+
tracing::error!(
998+
"Received notification from unknown channel: {}",
999+
unknown_channel
1000+
);
1001+
break;
1002+
}
8991003
};
9001004
if let Err(e) = handle_notification(
9011005
new_receipt_notification,
@@ -939,18 +1043,18 @@ async fn handle_notification(
9391043

9401044
let Ok(sender_address) = escrow_accounts_rx
9411045
.borrow()
942-
.get_sender_for_signer(&new_receipt_notification.signer_address)
1046+
.get_sender_for_signer(&new_receipt_notification.signer_address())
9431047
else {
9441048
// TODO: save the receipt in the failed receipts table?
9451049
bail!(
9461050
"No sender address found for receipt signer address {}. \
9471051
This should not happen.",
948-
new_receipt_notification.signer_address
1052+
new_receipt_notification.signer_address()
9491053
);
9501054
};
9511055

952-
let allocation_id = &new_receipt_notification.allocation_id;
953-
let allocation_str = &allocation_id.to_string();
1056+
let allocation_id = new_receipt_notification.allocation_id();
1057+
let allocation_str = allocation_id.to_hex();
9541058

9551059
let actor_name = format!(
9561060
"{}{sender_address}:{allocation_id}",
@@ -985,21 +1089,7 @@ async fn handle_notification(
9851089
);
9861090
};
9871091
sender_account
988-
.cast(SenderAccountMessage::NewAllocationId(match sender_type {
989-
SenderType::Legacy => AllocationId::Legacy(AllocationIdCore::from(*allocation_id)),
990-
SenderType::Horizon => {
991-
// For now, convert Address to CollectionId for Horizon
992-
// This is a temporary fix - in production the notification should contain CollectionId
993-
let collection_id_str = format!(
994-
"000000000000000000000000{}",
995-
allocation_id.encode_hex_with_prefix()
996-
);
997-
AllocationId::Horizon(
998-
CollectionId::from_str(&collection_id_str[2..])
999-
.expect("Failed to convert address to collection ID"),
1000-
)
1001-
}
1002-
}))
1092+
.cast(SenderAccountMessage::NewAllocationId(allocation_id))
10031093
.map_err(|e| {
10041094
anyhow!(
10051095
"Error while sendeing new allocation id message to sender_account: {:?}",
@@ -1021,7 +1111,7 @@ async fn handle_notification(
10211111
})?;
10221112

10231113
RECEIPTS_CREATED
1024-
.with_label_values(&[&sender_address.to_string(), allocation_str])
1114+
.with_label_values(&[&sender_address.to_string(), &allocation_str])
10251115
.inc();
10261116
Ok(())
10271117
}
@@ -1045,11 +1135,14 @@ mod tests {
10451135
watch,
10461136
};
10471137

1048-
use super::{new_receipts_watcher, SenderAccountsManagerMessage, State};
1138+
use super::{
1139+
new_receipts_watcher, NewReceiptNotification, NewReceiptNotificationV1,
1140+
SenderAccountsManagerMessage, State,
1141+
};
10491142
use crate::{
10501143
agent::{
10511144
sender_account::SenderAccountMessage,
1052-
sender_accounts_manager::{handle_notification, NewReceiptNotification, SenderType},
1145+
sender_accounts_manager::{handle_notification, SenderType},
10531146
},
10541147
test::{
10551148
actors::{DummyActor, MockSenderAccount, MockSenderAllocation, TestableActor},
@@ -1347,7 +1440,7 @@ mod tests {
13471440
for i in 1..=receipts_count {
13481441
let receipt = receipts.recv().await.unwrap();
13491442

1350-
assert_eq!(i, receipt.id);
1443+
assert_eq!(i, receipt.id());
13511444
}
13521445
assert_eq!(receipts.try_recv().unwrap_err(), TryRecvError::Empty);
13531446

@@ -1406,13 +1499,13 @@ mod tests {
14061499
.await
14071500
.unwrap();
14081501

1409-
let new_receipt_notification = NewReceiptNotification {
1502+
let new_receipt_notification = NewReceiptNotification::V1(NewReceiptNotificationV1 {
14101503
id: 1,
14111504
allocation_id: ALLOCATION_ID_0,
14121505
signer_address: SIGNER.1,
14131506
timestamp_ns: 1,
14141507
value: 1,
1415-
};
1508+
});
14161509

14171510
handle_notification(
14181511
new_receipt_notification,

0 commit comments

Comments
 (0)