Skip to content

Commit 88d69f9

Browse files
committed
feat(service): use network subgraph for v2 escrow accounts
1 parent a943460 commit 88d69f9

File tree

5 files changed

+265
-72
lines changed

5 files changed

+265
-72
lines changed

crates/service/src/middleware/sender.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use axum::{
66
middleware::Next,
77
response::Response,
88
};
9-
use indexer_monitor::EscrowAccounts;
9+
use indexer_monitor::{EscrowAccounts, EscrowAccountsError};
1010
use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain};
1111
use tokio::sync::watch;
1212

@@ -18,9 +18,9 @@ pub struct SenderState {
1818
/// Used to recover the signer address
1919
pub domain_separator: Eip712Domain,
2020
/// Used to get the sender address given the signer address if v1 receipt
21-
pub escrow_accounts_v1: watch::Receiver<EscrowAccounts>,
21+
pub escrow_accounts_v1: Option<watch::Receiver<EscrowAccounts>>,
2222
/// Used to get the sender address given the signer address if v2 receipt
23-
pub escrow_accounts_v2: watch::Receiver<EscrowAccounts>,
23+
pub escrow_accounts_v2: Option<watch::Receiver<EscrowAccounts>>,
2424
}
2525

2626
/// The current query Sender address
@@ -48,14 +48,24 @@ pub async fn sender_middleware(
4848
if let Some(receipt) = request.extensions().get::<TapReceipt>() {
4949
let signer = receipt.recover_signer(&state.domain_separator)?;
5050
let sender = match receipt {
51-
TapReceipt::V1(_) => state
52-
.escrow_accounts_v1
53-
.borrow()
54-
.get_sender_for_signer(&signer)?,
55-
TapReceipt::V2(_) => state
56-
.escrow_accounts_v2
57-
.borrow()
58-
.get_sender_for_signer(&signer)?,
51+
TapReceipt::V1(_) => {
52+
if let Some(ref escrow_accounts_v1) = state.escrow_accounts_v1 {
53+
escrow_accounts_v1.borrow().get_sender_for_signer(&signer)?
54+
} else {
55+
return Err(IndexerServiceError::EscrowAccount(
56+
EscrowAccountsError::NoSenderFound { signer },
57+
));
58+
}
59+
}
60+
TapReceipt::V2(_) => {
61+
if let Some(ref escrow_accounts_v2) = state.escrow_accounts_v2 {
62+
escrow_accounts_v2.borrow().get_sender_for_signer(&signer)?
63+
} else {
64+
return Err(IndexerServiceError::EscrowAccount(
65+
EscrowAccountsError::NoSenderFound { signer },
66+
));
67+
}
68+
}
5969
};
6070
request.extensions_mut().insert(Sender(sender));
6171
}
@@ -100,8 +110,8 @@ mod tests {
100110

101111
let state = SenderState {
102112
domain_separator: test_assets::TAP_EIP712_DOMAIN.clone(),
103-
escrow_accounts_v1,
104-
escrow_accounts_v2,
113+
escrow_accounts_v1: Some(escrow_accounts_v1),
114+
escrow_accounts_v2: Some(escrow_accounts_v2),
105115
};
106116

107117
let middleware = from_fn_with_state(state, sender_middleware);

crates/service/src/service.rs

Lines changed: 187 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use indexer_dips::{
1818
server::{DipsServer, DipsServerContext},
1919
signers::EscrowSignerValidator,
2020
};
21-
use indexer_monitor::{escrow_accounts_v1, DeploymentDetails, SubgraphClient};
21+
use indexer_monitor::{escrow_accounts_v1, escrow_accounts_v2, DeploymentDetails, SubgraphClient};
2222
use release::IndexerServiceRelease;
2323
use reqwest::Url;
2424
use tap_core::tap_eip712_domain;
@@ -78,12 +78,7 @@ pub async fn run() -> anyhow::Result<()> {
7878
)
7979
.await;
8080

81-
let escrow_subgraph = create_subgraph_client(
82-
http_client.clone(),
83-
&config.graph_node,
84-
&config.subgraphs.escrow.config,
85-
)
86-
.await;
81+
// V2 escrow accounts are in the network subgraph, not a separate escrow_v2 subgraph
8782

8883
// Establish Database connection necessary for serving indexer management
8984
// requests with defined schema
@@ -105,19 +100,133 @@ pub async fn run() -> anyhow::Result<()> {
105100
let indexer_address = config.indexer.indexer_address;
106101
let ipfs_url = config.service.ipfs_url.clone();
107102

108-
let router = ServiceRouter::builder()
109-
.database(database.clone())
110-
.domain_separator(domain_separator.clone())
111-
.graph_node(config.graph_node)
112-
.http_client(http_client)
113-
.release(release)
114-
.indexer(config.indexer)
115-
.service(config.service)
116-
.blockchain(config.blockchain)
117-
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
118-
.network_subgraph(network_subgraph, config.subgraphs.network)
119-
.escrow_subgraph(escrow_subgraph, config.subgraphs.escrow)
120-
.build();
103+
// Capture individual fields needed for DIPS before they get moved
104+
let escrow_v1_query_url_for_dips = config.subgraphs.escrow.config.query_url.clone();
105+
// V2 escrow accounts are in the network subgraph
106+
let escrow_v2_query_url_for_dips = Some(config.subgraphs.network.config.query_url.clone());
107+
108+
// Determine if we should check for Horizon contracts and potentially enable hybrid mode:
109+
// - If horizon.enabled = false: Pure legacy mode, no Horizon detection
110+
// - If horizon.enabled = true: Check if Horizon contracts are active in the network
111+
let is_horizon_active = if config.horizon.enabled {
112+
tracing::info!("Horizon migration support enabled - checking if Horizon contracts are active in the network");
113+
match indexer_monitor::is_horizon_active(network_subgraph).await {
114+
Ok(active) => {
115+
if active {
116+
tracing::info!("Horizon contracts detected in network subgraph - enabling hybrid migration mode");
117+
tracing::info!("Mode: Accept new V2 receipts only, continue processing existing V1 receipts for RAVs");
118+
} else {
119+
tracing::info!("Horizon contracts not yet active in network subgraph - remaining in legacy mode");
120+
}
121+
active
122+
}
123+
Err(e) => {
124+
tracing::warn!(
125+
"Failed to detect Horizon contracts: {}. Remaining in legacy mode.",
126+
e
127+
);
128+
false
129+
}
130+
}
131+
} else {
132+
tracing::info!(
133+
"Horizon migration support disabled in configuration - using pure legacy mode"
134+
);
135+
false
136+
};
137+
138+
// Configure router with escrow watchers based on automatic Horizon detection
139+
let router = if is_horizon_active {
140+
tracing::info!("Horizon contracts detected - using Horizon migration mode: V2 receipts only, but processing existing V1 receipts");
141+
142+
// Create V1 escrow watcher for processing existing receipts
143+
let escrow_subgraph_v1 = create_subgraph_client(
144+
http_client.clone(),
145+
&config.graph_node,
146+
&config.subgraphs.escrow.config,
147+
)
148+
.await;
149+
150+
let v1_watcher = indexer_monitor::escrow_accounts_v1(
151+
escrow_subgraph_v1,
152+
indexer_address,
153+
config.subgraphs.escrow.config.syncing_interval_secs,
154+
true, // Reject thawing signers eagerly
155+
)
156+
.await
157+
.expect("Error creating escrow_accounts_v1 channel");
158+
159+
// Create V2 escrow watcher for new receipts (V2 escrow accounts are in the network subgraph)
160+
let v2_watcher = match indexer_monitor::escrow_accounts_v2(
161+
network_subgraph,
162+
indexer_address,
163+
config.subgraphs.network.config.syncing_interval_secs,
164+
true, // Reject thawing signers eagerly
165+
)
166+
.await
167+
{
168+
Ok(watcher) => {
169+
tracing::info!("V2 escrow accounts successfully initialized from network subgraph");
170+
watcher
171+
}
172+
Err(e) => {
173+
tracing::error!(
174+
"Failed to initialize V2 escrow accounts: {}. Service cannot continue.",
175+
e
176+
);
177+
std::process::exit(1);
178+
}
179+
};
180+
181+
ServiceRouter::builder()
182+
.database(database.clone())
183+
.domain_separator(domain_separator.clone())
184+
.graph_node(config.graph_node)
185+
.http_client(http_client)
186+
.release(release)
187+
.indexer(config.indexer)
188+
.service(config.service)
189+
.blockchain(config.blockchain)
190+
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
191+
.network_subgraph(network_subgraph, config.subgraphs.network)
192+
.escrow_accounts_v1(v1_watcher)
193+
.escrow_accounts_v2(v2_watcher)
194+
.build()
195+
} else {
196+
tracing::info!(
197+
"No Horizon contracts detected - using Legacy (V1) mode with escrow accounts v1 only"
198+
);
199+
// Only create v1 watcher for legacy mode
200+
let escrow_subgraph_v1 = create_subgraph_client(
201+
http_client.clone(),
202+
&config.graph_node,
203+
&config.subgraphs.escrow.config,
204+
)
205+
.await;
206+
207+
let v1_watcher = indexer_monitor::escrow_accounts_v1(
208+
escrow_subgraph_v1,
209+
indexer_address,
210+
config.subgraphs.escrow.config.syncing_interval_secs,
211+
true, // Reject thawing signers eagerly
212+
)
213+
.await
214+
.expect("Error creating escrow_accounts_v1 channel");
215+
216+
ServiceRouter::builder()
217+
.database(database.clone())
218+
.domain_separator(domain_separator.clone())
219+
.graph_node(config.graph_node)
220+
.http_client(http_client)
221+
.release(release)
222+
.indexer(config.indexer)
223+
.service(config.service)
224+
.blockchain(config.blockchain)
225+
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
226+
.network_subgraph(network_subgraph, config.subgraphs.network)
227+
.escrow_accounts_v1(v1_watcher)
228+
.build()
229+
};
121230

122231
serve_metrics(config.metrics.get_socket_addr());
123232

@@ -143,14 +252,64 @@ pub async fn run() -> anyhow::Result<()> {
143252
Arc::new(IpfsClient::new(ipfs_url.as_str()).unwrap());
144253

145254
// TODO: Try to re-use the same watcher for both DIPS and TAP
146-
let watcher = escrow_accounts_v1(
147-
escrow_subgraph,
148-
indexer_address,
149-
Duration::from_secs(500),
150-
true,
151-
)
152-
.await
153-
.expect("Failed to create escrow accounts watcher");
255+
// DIPS is part of Horizon/v2, so use v2 escrow watcher when available
256+
let dips_http_client = reqwest::Client::builder()
257+
.timeout(Duration::from_secs(60))
258+
.build()
259+
.expect("Failed to init HTTP client");
260+
261+
let escrow_subgraph_for_dips = if let Some(ref escrow_v2_url) = escrow_v2_query_url_for_dips
262+
{
263+
tracing::info!("DIPS using v2 escrow subgraph");
264+
// Create subgraph client for v2
265+
Box::leak(Box::new(
266+
SubgraphClient::new(
267+
dips_http_client,
268+
None, // No local deployment
269+
DeploymentDetails::for_query_url_with_token(
270+
escrow_v2_url.clone(),
271+
None, // No auth token
272+
),
273+
)
274+
.await,
275+
))
276+
} else {
277+
tracing::info!("DIPS falling back to v1 escrow subgraph");
278+
// Create subgraph client for v1
279+
Box::leak(Box::new(
280+
SubgraphClient::new(
281+
dips_http_client,
282+
None, // No local deployment
283+
DeploymentDetails::for_query_url_with_token(
284+
escrow_v1_query_url_for_dips,
285+
None, // No auth token
286+
),
287+
)
288+
.await,
289+
))
290+
};
291+
292+
let watcher = if escrow_v2_query_url_for_dips.is_some() {
293+
// Use v2 watcher for DIPS when v2 is available
294+
escrow_accounts_v2(
295+
escrow_subgraph_for_dips,
296+
indexer_address,
297+
Duration::from_secs(500),
298+
true,
299+
)
300+
.await
301+
.expect("Failed to create escrow accounts v2 watcher for DIPS")
302+
} else {
303+
// Fall back to v1 watcher
304+
escrow_accounts_v1(
305+
escrow_subgraph_for_dips,
306+
indexer_address,
307+
Duration::from_secs(500),
308+
true,
309+
)
310+
.await
311+
.expect("Failed to create escrow accounts v1 watcher for DIPS")
312+
};
154313

155314
let registry = NetworksRegistry::from_latest_version().await.unwrap();
156315

crates/service/src/service/router.rs

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -140,33 +140,42 @@ impl ServiceRouter {
140140
// Monitor escrow accounts v1
141141
// if not provided, create monitor from subgraph
142142
let escrow_accounts_v1 = match (self.escrow_accounts_v1, self.escrow_subgraph.as_ref()) {
143-
(Some(escrow_account), _) => escrow_account,
144-
(_, Some((escrow_subgraph, escrow))) => escrow_accounts_v1(
145-
escrow_subgraph,
146-
indexer_address,
147-
escrow.config.syncing_interval_secs,
148-
true, // Reject thawing signers eagerly
149-
)
150-
.await
151-
.expect("Error creating escrow_accounts channel"),
152-
(None, None) => panic!("No escrow accounts or escrow subgraph was provided"),
143+
(Some(escrow_account), _) => Some(escrow_account),
144+
(_, Some((escrow_subgraph, escrow))) => Some(
145+
escrow_accounts_v1(
146+
escrow_subgraph,
147+
indexer_address,
148+
escrow.config.syncing_interval_secs,
149+
true, // Reject thawing signers eagerly
150+
)
151+
.await
152+
.expect("Error creating escrow_accounts_v1 channel"),
153+
),
154+
(None, None) => None,
153155
};
154156

155157
// Monitor escrow accounts v2
156158
// if not provided, create monitor from subgraph
157159
let escrow_accounts_v2 = match (self.escrow_accounts_v2, self.escrow_subgraph.as_ref()) {
158-
(Some(escrow_account), _) => escrow_account,
159-
(_, Some((escrow_subgraph, escrow))) => escrow_accounts_v2(
160-
escrow_subgraph,
161-
indexer_address,
162-
escrow.config.syncing_interval_secs,
163-
true, // Reject thawing signers eagerly
164-
)
165-
.await
166-
.expect("Error creating escrow_accounts channel"),
167-
(None, None) => panic!("No escrow accounts or escrow subgraph was provided"),
160+
(Some(escrow_account), _) => Some(escrow_account),
161+
(_, Some((escrow_subgraph, escrow))) => Some(
162+
escrow_accounts_v2(
163+
escrow_subgraph,
164+
indexer_address,
165+
escrow.config.syncing_interval_secs,
166+
true, // Reject thawing signers eagerly
167+
)
168+
.await
169+
.expect("Error creating escrow_accounts_v2 channel"),
170+
),
171+
(None, None) => None,
168172
};
169173

174+
// Ensure at least one escrow accounts watcher is available
175+
if escrow_accounts_v1.is_none() && escrow_accounts_v2.is_none() {
176+
panic!("At least one escrow accounts watcher (v1 or v2) must be provided");
177+
}
178+
170179
// Monitor dispute manager address
171180
// if not provided, create monitor from subgraph
172181
let dispute_manager = match (self.dispute_manager, self.network_subgraph.as_ref()) {

crates/service/src/tap.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ impl IndexerTapContext {
5151
pub async fn get_checks(
5252
pgpool: PgPool,
5353
indexer_allocations: Receiver<HashMap<Address, Allocation>>,
54-
escrow_accounts_v1: Receiver<EscrowAccounts>,
55-
escrow_accounts_v2: Receiver<EscrowAccounts>,
54+
escrow_accounts_v1: Option<Receiver<EscrowAccounts>>,
55+
escrow_accounts_v2: Option<Receiver<EscrowAccounts>>,
5656
timestamp_error_tolerance: Duration,
5757
receipt_max_value: u128,
5858
) -> Vec<ReceiptCheck<TapReceipt>> {

0 commit comments

Comments
 (0)