Skip to content

Commit 678d06a

Browse files
committed
feat: use network subgraph for v2 escrow accounts
1 parent f49cd91 commit 678d06a

File tree

7 files changed

+177
-251
lines changed

7 files changed

+177
-251
lines changed

crates/monitor/src/escrow_accounts.rs

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ use std::{
99

1010
use anyhow::anyhow;
1111
use indexer_query::escrow_account::{self, EscrowAccountQuery};
12-
use indexer_query::escrow_account_v2::{
13-
self as escrow_account_v2, EscrowAccountQuery as EscrowAccountQueryV2,
14-
};
1512
use thegraph_core::alloy::primitives::{Address, U256};
1613
use thiserror::Error;
1714
use tokio::sync::watch::Receiver;
@@ -91,6 +88,12 @@ impl EscrowAccounts {
9188

9289
pub type EscrowAccountsWatcher = Receiver<EscrowAccounts>;
9390

91+
pub fn empty_escrow_accounts_watcher() -> EscrowAccountsWatcher {
92+
let (_, receiver) =
93+
tokio::sync::watch::channel(EscrowAccounts::new(HashMap::new(), HashMap::new()));
94+
receiver
95+
}
96+
9497
pub async fn escrow_accounts_v1(
9598
escrow_subgraph: &'static SubgraphClient,
9699
indexer_address: Address,
@@ -120,22 +123,16 @@ async fn get_escrow_accounts_v2(
120123
indexer_address: Address,
121124
reject_thawing_signers: bool,
122125
) -> anyhow::Result<EscrowAccounts> {
123-
// V2 TAP receipts use different field names (payer/service_provider) but the underlying
124-
// escrow account model is identical to V1. Both V1 and V2 receipts reference the same
125-
// sender addresses and the same escrow relationships.
126-
//
127-
// The separation of V1/V2 escrow account watchers allows for potential future differences
128-
// in escrow models, but currently both query the same subgraph data with identical logic.
129-
//
130-
// V2 receipt flow:
131-
// 1. V2 receipt contains payer address (equivalent to V1 sender)
132-
// 2. Receipt is signed by a signer authorized by the payer
133-
// 3. Escrow accounts map: signer -> payer (sender) -> balance
134-
// 4. Service provider (indexer) receives payments from payer's escrow
126+
// Query V2 escrow accounts from the network subgraph which tracks PaymentsEscrow
127+
// and GraphTallyCollector contract events.
128+
129+
use indexer_query::network_escrow_account_v2::{
130+
self as network_escrow_account_v2, NetworkEscrowAccountQueryV2,
131+
};
135132

136133
let response = escrow_subgraph
137-
.query::<EscrowAccountQueryV2, _>(escrow_account_v2::Variables {
138-
indexer: format!("{:x?}", indexer_address),
134+
.query::<NetworkEscrowAccountQueryV2, _>(network_escrow_account_v2::Variables {
135+
receiver: format!("{:x?}", indexer_address),
139136
thaw_end_timestamp: if reject_thawing_signers {
140137
U256::ZERO.to_string()
141138
} else {
@@ -146,10 +143,23 @@ async fn get_escrow_accounts_v2(
146143

147144
let response = response?;
148145

149-
tracing::trace!("V2 Escrow accounts response: {:?}", response);
146+
tracing::trace!("Network V2 Escrow accounts response: {:?}", response);
147+
148+
// V2 TAP receipts use different field names (payer/service_provider) but the underlying
149+
// escrow account model is identical to V1. Both V1 and V2 receipts reference the same
150+
// sender addresses and the same escrow relationships.
151+
//
152+
// V1 queries the TAP subgraph while V2 queries the network subgraph, but both return
153+
// the same escrow account structure for processing.
154+
//
155+
// V2 receipt flow:
156+
// 1. V2 receipt contains payer address (equivalent to V1 sender)
157+
// 2. Receipt is signed by a signer authorized by the payer
158+
// 3. Escrow accounts map: signer -> payer (sender) -> balance
159+
// 4. Service provider (indexer) receives payments from payer's escrow
150160

151161
let senders_balances: HashMap<Address, U256> = response
152-
.escrow_accounts
162+
.payments_escrow_accounts
153163
.iter()
154164
.map(|account| {
155165
let balance = U256::checked_sub(
@@ -170,7 +180,7 @@ async fn get_escrow_accounts_v2(
170180
.collect::<Result<HashMap<_, _>, anyhow::Error>>()?;
171181

172182
let senders_to_signers = response
173-
.escrow_accounts
183+
.payments_escrow_accounts
174184
.into_iter()
175185
.map(|account| {
176186
let payer = Address::from_str(&account.payer.id)?;

crates/monitor/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ pub use crate::{
1616
deployment_to_allocation::{deployment_to_allocation, DeploymentToAllocationWatcher},
1717
dispute_manager::{dispute_manager, DisputeManagerWatcher},
1818
escrow_accounts::{
19-
escrow_accounts_v1, escrow_accounts_v2, EscrowAccounts, EscrowAccountsError,
20-
EscrowAccountsWatcher,
19+
empty_escrow_accounts_watcher, escrow_accounts_v1, escrow_accounts_v2, EscrowAccounts,
20+
EscrowAccountsError, EscrowAccountsWatcher,
2121
},
2222
horizon_detection::is_horizon_active,
2323
};

crates/service/src/service.rs

Lines changed: 122 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,7 @@ pub async fn run() -> anyhow::Result<()> {
7878
)
7979
.await;
8080

81-
let escrow_subgraph_v2 = if let Some(ref escrow_v2_config) = config.subgraphs.escrow_v2 {
82-
Some(
83-
create_subgraph_client(
84-
http_client.clone(),
85-
&config.graph_node,
86-
&escrow_v2_config.config,
87-
)
88-
.await,
89-
)
90-
} else {
91-
None
92-
};
81+
// V2 escrow accounts are in the network subgraph, not a separate escrow_v2 subgraph
9382

9483
// Establish Database connection necessary for serving indexer management
9584
// requests with defined schema
@@ -113,158 +102,130 @@ pub async fn run() -> anyhow::Result<()> {
113102

114103
// Capture individual fields needed for DIPS before they get moved
115104
let escrow_v1_query_url_for_dips = config.subgraphs.escrow.config.query_url.clone();
116-
let escrow_v2_query_url_for_dips = config
117-
.subgraphs
118-
.escrow_v2
119-
.as_ref()
120-
.map(|c| c.config.query_url.clone());
121-
122-
// Configure router with escrow watchers based on Horizon mode
123-
use indexer_config::HorizonMode;
124-
let router = match config.horizon.mode {
125-
HorizonMode::Legacy => {
126-
tracing::info!("Horizon mode: Legacy - using escrow accounts v1 only");
127-
// Only create v1 watcher for legacy mode
128-
let escrow_subgraph_v1 = create_subgraph_client(
129-
http_client.clone(),
130-
&config.graph_node,
131-
&config.subgraphs.escrow.config,
132-
)
133-
.await;
134-
135-
let v1_watcher = indexer_monitor::escrow_accounts_v1(
136-
escrow_subgraph_v1,
137-
indexer_address,
138-
config.subgraphs.escrow.config.syncing_interval_secs,
139-
true, // Reject thawing signers eagerly
140-
)
141-
.await
142-
.expect("Error creating escrow_accounts_v1 channel");
143-
144-
ServiceRouter::builder()
145-
.database(database.clone())
146-
.domain_separator(domain_separator.clone())
147-
.graph_node(config.graph_node)
148-
.http_client(http_client)
149-
.release(release)
150-
.indexer(config.indexer)
151-
.service(config.service)
152-
.blockchain(config.blockchain)
153-
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
154-
.network_subgraph(network_subgraph, config.subgraphs.network)
155-
.escrow_accounts_v1(v1_watcher)
156-
.build()
105+
// V2 escrow accounts are in the network subgraph
106+
let escrow_v2_query_url_for_dips = Some(config.subgraphs.network.config.query_url.clone());
107+
108+
// Determine if we should check for Horizon contracts and potentially enable hybrid mode:
109+
// - If horizon.enabled = false: Pure legacy mode, no Horizon detection
110+
// - If horizon.enabled = true: Check if Horizon contracts are active in the network
111+
let is_horizon_active = if config.horizon.enabled {
112+
tracing::info!("Horizon migration support enabled - checking if Horizon contracts are active in the network");
113+
match indexer_monitor::is_horizon_active(network_subgraph).await {
114+
Ok(active) => {
115+
if active {
116+
tracing::info!("Horizon contracts detected in network subgraph - enabling hybrid migration mode");
117+
tracing::info!("Mode: Accept new V2 receipts only, continue processing existing V1 receipts for RAVs");
118+
} else {
119+
tracing::info!("Horizon contracts not yet active in network subgraph - remaining in legacy mode");
120+
}
121+
active
122+
}
123+
Err(e) => {
124+
tracing::warn!(
125+
"Failed to detect Horizon contracts: {}. Remaining in legacy mode.",
126+
e
127+
);
128+
false
129+
}
157130
}
158-
HorizonMode::Transition => {
159-
tracing::info!("Horizon mode: Transition - using both escrow accounts v1 and v2");
160-
// Create both watchers for transition mode using separate subgraph clients
161-
let escrow_subgraph_v1 = create_subgraph_client(
162-
http_client.clone(),
163-
&config.graph_node,
164-
&config.subgraphs.escrow.config,
165-
)
166-
.await;
131+
} else {
132+
tracing::info!(
133+
"Horizon migration support disabled in configuration - using pure legacy mode"
134+
);
135+
false
136+
};
167137

168-
let v1_watcher = indexer_monitor::escrow_accounts_v1(
169-
escrow_subgraph_v1,
170-
indexer_address,
171-
config.subgraphs.escrow.config.syncing_interval_secs,
172-
true, // Reject thawing signers eagerly
173-
)
174-
.await
175-
.expect("Error creating escrow_accounts_v1 channel");
176-
177-
if let Some(escrow_v2_subgraph) = escrow_subgraph_v2 {
178-
let v2_watcher = indexer_monitor::escrow_accounts_v2(
179-
escrow_v2_subgraph,
180-
indexer_address,
181-
config
182-
.subgraphs
183-
.escrow_v2
184-
.as_ref()
185-
.unwrap()
186-
.config
187-
.syncing_interval_secs,
188-
true, // Reject thawing signers eagerly
189-
)
190-
.await
191-
.expect("Error creating escrow_accounts_v2 channel");
192-
193-
ServiceRouter::builder()
194-
.database(database.clone())
195-
.domain_separator(domain_separator.clone())
196-
.graph_node(config.graph_node)
197-
.http_client(http_client)
198-
.release(release)
199-
.indexer(config.indexer)
200-
.service(config.service)
201-
.blockchain(config.blockchain)
202-
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
203-
.network_subgraph(network_subgraph, config.subgraphs.network)
204-
.escrow_accounts_v1(v1_watcher)
205-
.escrow_accounts_v2(v2_watcher)
206-
.build()
207-
} else {
208-
tracing::warn!("Horizon mode is Transition but no escrow_v2 configuration provided, falling back to v1 only");
209-
ServiceRouter::builder()
210-
.database(database.clone())
211-
.domain_separator(domain_separator.clone())
212-
.graph_node(config.graph_node)
213-
.http_client(http_client)
214-
.release(release)
215-
.indexer(config.indexer)
216-
.service(config.service)
217-
.blockchain(config.blockchain)
218-
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
219-
.network_subgraph(network_subgraph, config.subgraphs.network)
220-
.escrow_accounts_v1(v1_watcher)
221-
.build()
138+
// Configure router with escrow watchers based on automatic Horizon detection
139+
let router = if is_horizon_active {
140+
tracing::info!("Horizon contracts detected - using Horizon migration mode: V2 receipts only, but processing existing V1 receipts");
141+
142+
// Create V1 escrow watcher for processing existing receipts
143+
let escrow_subgraph_v1 = create_subgraph_client(
144+
http_client.clone(),
145+
&config.graph_node,
146+
&config.subgraphs.escrow.config,
147+
)
148+
.await;
149+
150+
let v1_watcher = indexer_monitor::escrow_accounts_v1(
151+
escrow_subgraph_v1,
152+
indexer_address,
153+
config.subgraphs.escrow.config.syncing_interval_secs,
154+
true, // Reject thawing signers eagerly
155+
)
156+
.await
157+
.expect("Error creating escrow_accounts_v1 channel");
158+
159+
// Create V2 escrow watcher for new receipts (V2 escrow accounts are in the network subgraph)
160+
let v2_watcher = match indexer_monitor::escrow_accounts_v2(
161+
network_subgraph,
162+
indexer_address,
163+
config.subgraphs.network.config.syncing_interval_secs,
164+
true, // Reject thawing signers eagerly
165+
)
166+
.await
167+
{
168+
Ok(watcher) => {
169+
tracing::info!("V2 escrow accounts successfully initialized from network subgraph");
170+
watcher
222171
}
223-
}
224-
HorizonMode::Full => {
225-
tracing::info!("Horizon mode: Full - using escrow accounts v2 only");
226-
// Only create v2 watcher for full Horizon mode
227-
let v2_subgraph = if let Some(escrow_v2_subgraph) = escrow_subgraph_v2 {
228-
escrow_v2_subgraph
229-
} else {
230-
tracing::warn!("Horizon mode is Full but no escrow_v2 configuration provided, falling back to escrow v1 endpoint for v2 queries");
231-
create_subgraph_client(
232-
http_client.clone(),
233-
&config.graph_node,
234-
&config.subgraphs.escrow.config,
235-
)
236-
.await
237-
};
238-
let v2_config = config
239-
.subgraphs
240-
.escrow_v2
241-
.as_ref()
242-
.map(|c| &c.config)
243-
.unwrap_or(&config.subgraphs.escrow.config);
244-
245-
let v2_watcher = indexer_monitor::escrow_accounts_v2(
246-
v2_subgraph,
247-
indexer_address,
248-
v2_config.syncing_interval_secs,
249-
true, // Reject thawing signers eagerly
250-
)
251-
.await
252-
.expect("Error creating escrow_accounts_v2 channel");
253-
254-
ServiceRouter::builder()
255-
.database(database.clone())
256-
.domain_separator(domain_separator.clone())
257-
.graph_node(config.graph_node)
258-
.http_client(http_client)
259-
.release(release)
260-
.indexer(config.indexer)
261-
.service(config.service)
262-
.blockchain(config.blockchain)
263-
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
264-
.network_subgraph(network_subgraph, config.subgraphs.network)
265-
.escrow_accounts_v2(v2_watcher)
266-
.build()
267-
}
172+
Err(e) => {
173+
tracing::error!(
174+
"Failed to initialize V2 escrow accounts: {}. Service cannot continue.",
175+
e
176+
);
177+
std::process::exit(1);
178+
}
179+
};
180+
181+
ServiceRouter::builder()
182+
.database(database.clone())
183+
.domain_separator(domain_separator.clone())
184+
.graph_node(config.graph_node)
185+
.http_client(http_client)
186+
.release(release)
187+
.indexer(config.indexer)
188+
.service(config.service)
189+
.blockchain(config.blockchain)
190+
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
191+
.network_subgraph(network_subgraph, config.subgraphs.network)
192+
.escrow_accounts_v1(v1_watcher)
193+
.escrow_accounts_v2(v2_watcher)
194+
.build()
195+
} else {
196+
tracing::info!(
197+
"No Horizon contracts detected - using Legacy (V1) mode with escrow accounts v1 only"
198+
);
199+
// Only create v1 watcher for legacy mode
200+
let escrow_subgraph_v1 = create_subgraph_client(
201+
http_client.clone(),
202+
&config.graph_node,
203+
&config.subgraphs.escrow.config,
204+
)
205+
.await;
206+
207+
let v1_watcher = indexer_monitor::escrow_accounts_v1(
208+
escrow_subgraph_v1,
209+
indexer_address,
210+
config.subgraphs.escrow.config.syncing_interval_secs,
211+
true, // Reject thawing signers eagerly
212+
)
213+
.await
214+
.expect("Error creating escrow_accounts_v1 channel");
215+
216+
ServiceRouter::builder()
217+
.database(database.clone())
218+
.domain_separator(domain_separator.clone())
219+
.graph_node(config.graph_node)
220+
.http_client(http_client)
221+
.release(release)
222+
.indexer(config.indexer)
223+
.service(config.service)
224+
.blockchain(config.blockchain)
225+
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
226+
.network_subgraph(network_subgraph, config.subgraphs.network)
227+
.escrow_accounts_v1(v1_watcher)
228+
.build()
268229
};
269230

270231
serve_metrics(config.metrics.get_socket_addr());

0 commit comments

Comments
 (0)