Skip to content

Commit c41b5d9

Browse files
committed
fix(service): configure escrow watchers based on horizon settings
1 parent 131d955 commit c41b5d9

File tree

5 files changed

+304
-72
lines changed

5 files changed

+304
-72
lines changed

crates/service/src/middleware/sender.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use axum::{
66
middleware::Next,
77
response::Response,
88
};
9-
use indexer_monitor::EscrowAccounts;
9+
use indexer_monitor::{EscrowAccounts, EscrowAccountsError};
1010
use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain};
1111
use tokio::sync::watch;
1212

@@ -18,9 +18,9 @@ pub struct SenderState {
1818
/// Used to recover the signer address
1919
pub domain_separator: Eip712Domain,
2020
/// Used to get the sender address given the signer address if v1 receipt
21-
pub escrow_accounts_v1: watch::Receiver<EscrowAccounts>,
21+
pub escrow_accounts_v1: Option<watch::Receiver<EscrowAccounts>>,
2222
/// Used to get the sender address given the signer address if v2 receipt
23-
pub escrow_accounts_v2: watch::Receiver<EscrowAccounts>,
23+
pub escrow_accounts_v2: Option<watch::Receiver<EscrowAccounts>>,
2424
}
2525

2626
/// The current query Sender address
@@ -48,14 +48,24 @@ pub async fn sender_middleware(
4848
if let Some(receipt) = request.extensions().get::<TapReceipt>() {
4949
let signer = receipt.recover_signer(&state.domain_separator)?;
5050
let sender = match receipt {
51-
TapReceipt::V1(_) => state
52-
.escrow_accounts_v1
53-
.borrow()
54-
.get_sender_for_signer(&signer)?,
55-
TapReceipt::V2(_) => state
56-
.escrow_accounts_v2
57-
.borrow()
58-
.get_sender_for_signer(&signer)?,
51+
TapReceipt::V1(_) => {
52+
if let Some(ref escrow_accounts_v1) = state.escrow_accounts_v1 {
53+
escrow_accounts_v1.borrow().get_sender_for_signer(&signer)?
54+
} else {
55+
return Err(IndexerServiceError::EscrowAccount(
56+
EscrowAccountsError::NoSenderFound { signer },
57+
));
58+
}
59+
}
60+
TapReceipt::V2(_) => {
61+
if let Some(ref escrow_accounts_v2) = state.escrow_accounts_v2 {
62+
escrow_accounts_v2.borrow().get_sender_for_signer(&signer)?
63+
} else {
64+
return Err(IndexerServiceError::EscrowAccount(
65+
EscrowAccountsError::NoSenderFound { signer },
66+
));
67+
}
68+
}
5969
};
6070
request.extensions_mut().insert(Sender(sender));
6171
}
@@ -100,8 +110,8 @@ mod tests {
100110

101111
let state = SenderState {
102112
domain_separator: test_assets::TAP_EIP712_DOMAIN.clone(),
103-
escrow_accounts_v1,
104-
escrow_accounts_v2,
113+
escrow_accounts_v1: Some(escrow_accounts_v1),
114+
escrow_accounts_v2: Some(escrow_accounts_v2),
105115
};
106116

107117
let middleware = from_fn_with_state(state, sender_middleware);

crates/service/src/service.rs

Lines changed: 226 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use indexer_dips::{
1818
server::{DipsServer, DipsServerContext},
1919
signers::EscrowSignerValidator,
2020
};
21-
use indexer_monitor::{escrow_accounts_v1, DeploymentDetails, SubgraphClient};
21+
use indexer_monitor::{escrow_accounts_v1, escrow_accounts_v2, DeploymentDetails, SubgraphClient};
2222
use release::IndexerServiceRelease;
2323
use reqwest::Url;
2424
use tap_core::tap_eip712_domain;
@@ -78,12 +78,18 @@ pub async fn run() -> anyhow::Result<()> {
7878
)
7979
.await;
8080

81-
let escrow_subgraph = create_subgraph_client(
82-
http_client.clone(),
83-
&config.graph_node,
84-
&config.subgraphs.escrow.config,
85-
)
86-
.await;
81+
let escrow_subgraph_v2 = if let Some(ref escrow_v2_config) = config.subgraphs.escrow_v2 {
82+
Some(
83+
create_subgraph_client(
84+
http_client.clone(),
85+
&config.graph_node,
86+
&escrow_v2_config.config,
87+
)
88+
.await,
89+
)
90+
} else {
91+
None
92+
};
8793

8894
// Establish Database connection necessary for serving indexer management
8995
// requests with defined schema
@@ -105,19 +111,161 @@ pub async fn run() -> anyhow::Result<()> {
105111
let indexer_address = config.indexer.indexer_address;
106112
let ipfs_url = config.service.ipfs_url.clone();
107113

108-
let router = ServiceRouter::builder()
109-
.database(database.clone())
110-
.domain_separator(domain_separator.clone())
111-
.graph_node(config.graph_node)
112-
.http_client(http_client)
113-
.release(release)
114-
.indexer(config.indexer)
115-
.service(config.service)
116-
.blockchain(config.blockchain)
117-
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
118-
.network_subgraph(network_subgraph, config.subgraphs.network)
119-
.escrow_subgraph(escrow_subgraph, config.subgraphs.escrow)
120-
.build();
114+
// Capture individual fields needed for DIPS before they get moved
115+
let escrow_v1_query_url_for_dips = config.subgraphs.escrow.config.query_url.clone();
116+
let escrow_v2_query_url_for_dips = config
117+
.subgraphs
118+
.escrow_v2
119+
.as_ref()
120+
.map(|c| c.config.query_url.clone());
121+
122+
// Configure router with escrow watchers based on Horizon mode
123+
use indexer_config::HorizonMode;
124+
let router = match config.horizon.mode {
125+
HorizonMode::Legacy => {
126+
tracing::info!("Horizon mode: Legacy - using escrow accounts v1 only");
127+
// Only create v1 watcher for legacy mode
128+
let escrow_subgraph_v1 = create_subgraph_client(
129+
http_client.clone(),
130+
&config.graph_node,
131+
&config.subgraphs.escrow.config,
132+
)
133+
.await;
134+
135+
let v1_watcher = indexer_monitor::escrow_accounts_v1(
136+
escrow_subgraph_v1,
137+
indexer_address,
138+
config.subgraphs.escrow.config.syncing_interval_secs,
139+
true, // Reject thawing signers eagerly
140+
)
141+
.await
142+
.expect("Error creating escrow_accounts_v1 channel");
143+
144+
ServiceRouter::builder()
145+
.database(database.clone())
146+
.domain_separator(domain_separator.clone())
147+
.graph_node(config.graph_node)
148+
.http_client(http_client)
149+
.release(release)
150+
.indexer(config.indexer)
151+
.service(config.service)
152+
.blockchain(config.blockchain)
153+
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
154+
.network_subgraph(network_subgraph, config.subgraphs.network)
155+
.escrow_accounts_v1(v1_watcher)
156+
.build()
157+
}
158+
HorizonMode::Transition => {
159+
tracing::info!("Horizon mode: Transition - using both escrow accounts v1 and v2");
160+
// Create both watchers for transition mode using separate subgraph clients
161+
let escrow_subgraph_v1 = create_subgraph_client(
162+
http_client.clone(),
163+
&config.graph_node,
164+
&config.subgraphs.escrow.config,
165+
)
166+
.await;
167+
168+
let v1_watcher = indexer_monitor::escrow_accounts_v1(
169+
escrow_subgraph_v1,
170+
indexer_address,
171+
config.subgraphs.escrow.config.syncing_interval_secs,
172+
true, // Reject thawing signers eagerly
173+
)
174+
.await
175+
.expect("Error creating escrow_accounts_v1 channel");
176+
177+
if let Some(escrow_v2_subgraph) = escrow_subgraph_v2 {
178+
let v2_watcher = indexer_monitor::escrow_accounts_v2(
179+
escrow_v2_subgraph,
180+
indexer_address,
181+
config
182+
.subgraphs
183+
.escrow_v2
184+
.as_ref()
185+
.unwrap()
186+
.config
187+
.syncing_interval_secs,
188+
true, // Reject thawing signers eagerly
189+
)
190+
.await
191+
.expect("Error creating escrow_accounts_v2 channel");
192+
193+
ServiceRouter::builder()
194+
.database(database.clone())
195+
.domain_separator(domain_separator.clone())
196+
.graph_node(config.graph_node)
197+
.http_client(http_client)
198+
.release(release)
199+
.indexer(config.indexer)
200+
.service(config.service)
201+
.blockchain(config.blockchain)
202+
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
203+
.network_subgraph(network_subgraph, config.subgraphs.network)
204+
.escrow_accounts_v1(v1_watcher)
205+
.escrow_accounts_v2(v2_watcher)
206+
.build()
207+
} else {
208+
tracing::warn!("Horizon mode is Transition but no escrow_v2 configuration provided, falling back to v1 only");
209+
ServiceRouter::builder()
210+
.database(database.clone())
211+
.domain_separator(domain_separator.clone())
212+
.graph_node(config.graph_node)
213+
.http_client(http_client)
214+
.release(release)
215+
.indexer(config.indexer)
216+
.service(config.service)
217+
.blockchain(config.blockchain)
218+
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
219+
.network_subgraph(network_subgraph, config.subgraphs.network)
220+
.escrow_accounts_v1(v1_watcher)
221+
.build()
222+
}
223+
}
224+
HorizonMode::Full => {
225+
tracing::info!("Horizon mode: Full - using escrow accounts v2 only");
226+
// Only create v2 watcher for full Horizon mode
227+
let v2_subgraph = if let Some(escrow_v2_subgraph) = escrow_subgraph_v2 {
228+
escrow_v2_subgraph
229+
} else {
230+
tracing::warn!("Horizon mode is Full but no escrow_v2 configuration provided, falling back to escrow v1 endpoint for v2 queries");
231+
create_subgraph_client(
232+
http_client.clone(),
233+
&config.graph_node,
234+
&config.subgraphs.escrow.config,
235+
)
236+
.await
237+
};
238+
let v2_config = config
239+
.subgraphs
240+
.escrow_v2
241+
.as_ref()
242+
.map(|c| &c.config)
243+
.unwrap_or(&config.subgraphs.escrow.config);
244+
245+
let v2_watcher = indexer_monitor::escrow_accounts_v2(
246+
v2_subgraph,
247+
indexer_address,
248+
v2_config.syncing_interval_secs,
249+
true, // Reject thawing signers eagerly
250+
)
251+
.await
252+
.expect("Error creating escrow_accounts_v2 channel");
253+
254+
ServiceRouter::builder()
255+
.database(database.clone())
256+
.domain_separator(domain_separator.clone())
257+
.graph_node(config.graph_node)
258+
.http_client(http_client)
259+
.release(release)
260+
.indexer(config.indexer)
261+
.service(config.service)
262+
.blockchain(config.blockchain)
263+
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
264+
.network_subgraph(network_subgraph, config.subgraphs.network)
265+
.escrow_accounts_v2(v2_watcher)
266+
.build()
267+
}
268+
};
121269

122270
serve_metrics(config.metrics.get_socket_addr());
123271

@@ -143,14 +291,64 @@ pub async fn run() -> anyhow::Result<()> {
143291
Arc::new(IpfsClient::new(ipfs_url.as_str()).unwrap());
144292

145293
// TODO: Try to re-use the same watcher for both DIPS and TAP
146-
let watcher = escrow_accounts_v1(
147-
escrow_subgraph,
148-
indexer_address,
149-
Duration::from_secs(500),
150-
true,
151-
)
152-
.await
153-
.expect("Failed to create escrow accounts watcher");
294+
// DIPS is part of Horizon/v2, so use v2 escrow watcher when available
295+
let dips_http_client = reqwest::Client::builder()
296+
.timeout(Duration::from_secs(60))
297+
.build()
298+
.expect("Failed to init HTTP client");
299+
300+
let escrow_subgraph_for_dips = if let Some(ref escrow_v2_url) = escrow_v2_query_url_for_dips
301+
{
302+
tracing::info!("DIPS using v2 escrow subgraph");
303+
// Create subgraph client for v2
304+
Box::leak(Box::new(
305+
SubgraphClient::new(
306+
dips_http_client,
307+
None, // No local deployment
308+
DeploymentDetails::for_query_url_with_token(
309+
escrow_v2_url.clone(),
310+
None, // No auth token
311+
),
312+
)
313+
.await,
314+
))
315+
} else {
316+
tracing::info!("DIPS falling back to v1 escrow subgraph");
317+
// Create subgraph client for v1
318+
Box::leak(Box::new(
319+
SubgraphClient::new(
320+
dips_http_client,
321+
None, // No local deployment
322+
DeploymentDetails::for_query_url_with_token(
323+
escrow_v1_query_url_for_dips,
324+
None, // No auth token
325+
),
326+
)
327+
.await,
328+
))
329+
};
330+
331+
let watcher = if escrow_v2_query_url_for_dips.is_some() {
332+
// Use v2 watcher for DIPS when v2 is available
333+
escrow_accounts_v2(
334+
escrow_subgraph_for_dips,
335+
indexer_address,
336+
Duration::from_secs(500),
337+
true,
338+
)
339+
.await
340+
.expect("Failed to create escrow accounts v2 watcher for DIPS")
341+
} else {
342+
// Fall back to v1 watcher
343+
escrow_accounts_v1(
344+
escrow_subgraph_for_dips,
345+
indexer_address,
346+
Duration::from_secs(500),
347+
true,
348+
)
349+
.await
350+
.expect("Failed to create escrow accounts v1 watcher for DIPS")
351+
};
154352

155353
let registry = NetworksRegistry::from_latest_version().await.unwrap();
156354

0 commit comments

Comments
 (0)