@@ -18,7 +18,7 @@ use indexer_dips::{
1818 server:: { DipsServer , DipsServerContext } ,
1919 signers:: EscrowSignerValidator ,
2020} ;
21- use indexer_monitor:: { escrow_accounts_v1, DeploymentDetails , SubgraphClient } ;
21+ use indexer_monitor:: { escrow_accounts_v1, escrow_accounts_v2 , DeploymentDetails , SubgraphClient } ;
2222use release:: IndexerServiceRelease ;
2323use reqwest:: Url ;
2424use tap_core:: tap_eip712_domain;
@@ -78,12 +78,18 @@ pub async fn run() -> anyhow::Result<()> {
7878 )
7979 . await ;
8080
81- let escrow_subgraph = create_subgraph_client (
82- http_client. clone ( ) ,
83- & config. graph_node ,
84- & config. subgraphs . escrow . config ,
85- )
86- . await ;
81+ let escrow_subgraph_v2 = if let Some ( ref escrow_v2_config) = config. subgraphs . escrow_v2 {
82+ Some (
83+ create_subgraph_client (
84+ http_client. clone ( ) ,
85+ & config. graph_node ,
86+ & escrow_v2_config. config ,
87+ )
88+ . await ,
89+ )
90+ } else {
91+ None
92+ } ;
8793
8894 // Establish Database connection necessary for serving indexer management
8995 // requests with defined schema
@@ -105,19 +111,161 @@ pub async fn run() -> anyhow::Result<()> {
105111 let indexer_address = config. indexer . indexer_address ;
106112 let ipfs_url = config. service . ipfs_url . clone ( ) ;
107113
108- let router = ServiceRouter :: builder ( )
109- . database ( database. clone ( ) )
110- . domain_separator ( domain_separator. clone ( ) )
111- . graph_node ( config. graph_node )
112- . http_client ( http_client)
113- . release ( release)
114- . indexer ( config. indexer )
115- . service ( config. service )
116- . blockchain ( config. blockchain )
117- . timestamp_buffer_secs ( config. tap . rav_request . timestamp_buffer_secs )
118- . network_subgraph ( network_subgraph, config. subgraphs . network )
119- . escrow_subgraph ( escrow_subgraph, config. subgraphs . escrow )
120- . build ( ) ;
114+ // Capture individual fields needed for DIPS before they get moved
115+ let escrow_v1_query_url_for_dips = config. subgraphs . escrow . config . query_url . clone ( ) ;
116+ let escrow_v2_query_url_for_dips = config
117+ . subgraphs
118+ . escrow_v2
119+ . as_ref ( )
120+ . map ( |c| c. config . query_url . clone ( ) ) ;
121+
122+ // Configure router with escrow watchers based on Horizon mode
123+ use indexer_config:: HorizonMode ;
124+ let router = match config. horizon . mode {
125+ HorizonMode :: Legacy => {
126+ tracing:: info!( "Horizon mode: Legacy - using escrow accounts v1 only" ) ;
127+ // Only create v1 watcher for legacy mode
128+ let escrow_subgraph_v1 = create_subgraph_client (
129+ http_client. clone ( ) ,
130+ & config. graph_node ,
131+ & config. subgraphs . escrow . config ,
132+ )
133+ . await ;
134+
135+ let v1_watcher = indexer_monitor:: escrow_accounts_v1 (
136+ escrow_subgraph_v1,
137+ indexer_address,
138+ config. subgraphs . escrow . config . syncing_interval_secs ,
139+ true , // Reject thawing signers eagerly
140+ )
141+ . await
142+ . expect ( "Error creating escrow_accounts_v1 channel" ) ;
143+
144+ ServiceRouter :: builder ( )
145+ . database ( database. clone ( ) )
146+ . domain_separator ( domain_separator. clone ( ) )
147+ . graph_node ( config. graph_node )
148+ . http_client ( http_client)
149+ . release ( release)
150+ . indexer ( config. indexer )
151+ . service ( config. service )
152+ . blockchain ( config. blockchain )
153+ . timestamp_buffer_secs ( config. tap . rav_request . timestamp_buffer_secs )
154+ . network_subgraph ( network_subgraph, config. subgraphs . network )
155+ . escrow_accounts_v1 ( v1_watcher)
156+ . build ( )
157+ }
158+ HorizonMode :: Transition => {
159+ tracing:: info!( "Horizon mode: Transition - using both escrow accounts v1 and v2" ) ;
160+ // Create both watchers for transition mode using separate subgraph clients
161+ let escrow_subgraph_v1 = create_subgraph_client (
162+ http_client. clone ( ) ,
163+ & config. graph_node ,
164+ & config. subgraphs . escrow . config ,
165+ )
166+ . await ;
167+
168+ let v1_watcher = indexer_monitor:: escrow_accounts_v1 (
169+ escrow_subgraph_v1,
170+ indexer_address,
171+ config. subgraphs . escrow . config . syncing_interval_secs ,
172+ true , // Reject thawing signers eagerly
173+ )
174+ . await
175+ . expect ( "Error creating escrow_accounts_v1 channel" ) ;
176+
177+ if let Some ( escrow_v2_subgraph) = escrow_subgraph_v2 {
178+ let v2_watcher = indexer_monitor:: escrow_accounts_v2 (
179+ escrow_v2_subgraph,
180+ indexer_address,
181+ config
182+ . subgraphs
183+ . escrow_v2
184+ . as_ref ( )
185+ . unwrap ( )
186+ . config
187+ . syncing_interval_secs ,
188+ true , // Reject thawing signers eagerly
189+ )
190+ . await
191+ . expect ( "Error creating escrow_accounts_v2 channel" ) ;
192+
193+ ServiceRouter :: builder ( )
194+ . database ( database. clone ( ) )
195+ . domain_separator ( domain_separator. clone ( ) )
196+ . graph_node ( config. graph_node )
197+ . http_client ( http_client)
198+ . release ( release)
199+ . indexer ( config. indexer )
200+ . service ( config. service )
201+ . blockchain ( config. blockchain )
202+ . timestamp_buffer_secs ( config. tap . rav_request . timestamp_buffer_secs )
203+ . network_subgraph ( network_subgraph, config. subgraphs . network )
204+ . escrow_accounts_v1 ( v1_watcher)
205+ . escrow_accounts_v2 ( v2_watcher)
206+ . build ( )
207+ } else {
208+ tracing:: warn!( "Horizon mode is Transition but no escrow_v2 configuration provided, falling back to v1 only" ) ;
209+ ServiceRouter :: builder ( )
210+ . database ( database. clone ( ) )
211+ . domain_separator ( domain_separator. clone ( ) )
212+ . graph_node ( config. graph_node )
213+ . http_client ( http_client)
214+ . release ( release)
215+ . indexer ( config. indexer )
216+ . service ( config. service )
217+ . blockchain ( config. blockchain )
218+ . timestamp_buffer_secs ( config. tap . rav_request . timestamp_buffer_secs )
219+ . network_subgraph ( network_subgraph, config. subgraphs . network )
220+ . escrow_accounts_v1 ( v1_watcher)
221+ . build ( )
222+ }
223+ }
224+ HorizonMode :: Full => {
225+ tracing:: info!( "Horizon mode: Full - using escrow accounts v2 only" ) ;
226+ // Only create v2 watcher for full Horizon mode
227+ let v2_subgraph = if let Some ( escrow_v2_subgraph) = escrow_subgraph_v2 {
228+ escrow_v2_subgraph
229+ } else {
230+ tracing:: warn!( "Horizon mode is Full but no escrow_v2 configuration provided, falling back to escrow v1 endpoint for v2 queries" ) ;
231+ create_subgraph_client (
232+ http_client. clone ( ) ,
233+ & config. graph_node ,
234+ & config. subgraphs . escrow . config ,
235+ )
236+ . await
237+ } ;
238+ let v2_config = config
239+ . subgraphs
240+ . escrow_v2
241+ . as_ref ( )
242+ . map ( |c| & c. config )
243+ . unwrap_or ( & config. subgraphs . escrow . config ) ;
244+
245+ let v2_watcher = indexer_monitor:: escrow_accounts_v2 (
246+ v2_subgraph,
247+ indexer_address,
248+ v2_config. syncing_interval_secs ,
249+ true , // Reject thawing signers eagerly
250+ )
251+ . await
252+ . expect ( "Error creating escrow_accounts_v2 channel" ) ;
253+
254+ ServiceRouter :: builder ( )
255+ . database ( database. clone ( ) )
256+ . domain_separator ( domain_separator. clone ( ) )
257+ . graph_node ( config. graph_node )
258+ . http_client ( http_client)
259+ . release ( release)
260+ . indexer ( config. indexer )
261+ . service ( config. service )
262+ . blockchain ( config. blockchain )
263+ . timestamp_buffer_secs ( config. tap . rav_request . timestamp_buffer_secs )
264+ . network_subgraph ( network_subgraph, config. subgraphs . network )
265+ . escrow_accounts_v2 ( v2_watcher)
266+ . build ( )
267+ }
268+ } ;
121269
122270 serve_metrics ( config. metrics . get_socket_addr ( ) ) ;
123271
@@ -143,14 +291,64 @@ pub async fn run() -> anyhow::Result<()> {
143291 Arc :: new ( IpfsClient :: new ( ipfs_url. as_str ( ) ) . unwrap ( ) ) ;
144292
145293 // TODO: Try to re-use the same watcher for both DIPS and TAP
146- let watcher = escrow_accounts_v1 (
147- escrow_subgraph,
148- indexer_address,
149- Duration :: from_secs ( 500 ) ,
150- true ,
151- )
152- . await
153- . expect ( "Failed to create escrow accounts watcher" ) ;
294+ // DIPS is part of Horizon/v2, so use v2 escrow watcher when available
295+ let dips_http_client = reqwest:: Client :: builder ( )
296+ . timeout ( Duration :: from_secs ( 60 ) )
297+ . build ( )
298+ . expect ( "Failed to init HTTP client" ) ;
299+
300+ let escrow_subgraph_for_dips = if let Some ( ref escrow_v2_url) = escrow_v2_query_url_for_dips
301+ {
302+ tracing:: info!( "DIPS using v2 escrow subgraph" ) ;
303+ // Create subgraph client for v2
304+ Box :: leak ( Box :: new (
305+ SubgraphClient :: new (
306+ dips_http_client,
307+ None , // No local deployment
308+ DeploymentDetails :: for_query_url_with_token (
309+ escrow_v2_url. clone ( ) ,
310+ None , // No auth token
311+ ) ,
312+ )
313+ . await ,
314+ ) )
315+ } else {
316+ tracing:: info!( "DIPS falling back to v1 escrow subgraph" ) ;
317+ // Create subgraph client for v1
318+ Box :: leak ( Box :: new (
319+ SubgraphClient :: new (
320+ dips_http_client,
321+ None , // No local deployment
322+ DeploymentDetails :: for_query_url_with_token (
323+ escrow_v1_query_url_for_dips,
324+ None , // No auth token
325+ ) ,
326+ )
327+ . await ,
328+ ) )
329+ } ;
330+
331+ let watcher = if escrow_v2_query_url_for_dips. is_some ( ) {
332+ // Use v2 watcher for DIPS when v2 is available
333+ escrow_accounts_v2 (
334+ escrow_subgraph_for_dips,
335+ indexer_address,
336+ Duration :: from_secs ( 500 ) ,
337+ true ,
338+ )
339+ . await
340+ . expect ( "Failed to create escrow accounts v2 watcher for DIPS" )
341+ } else {
342+ // Fall back to v1 watcher
343+ escrow_accounts_v1 (
344+ escrow_subgraph_for_dips,
345+ indexer_address,
346+ Duration :: from_secs ( 500 ) ,
347+ true ,
348+ )
349+ . await
350+ . expect ( "Failed to create escrow accounts v1 watcher for DIPS" )
351+ } ;
154352
155353 let registry = NetworksRegistry :: from_latest_version ( ) . await . unwrap ( ) ;
156354
0 commit comments