Skip to content

Commit 8c06699

Browse files
committed
fix: configure escrow watchers based on horizon settings
1 parent 83493c5 commit 8c06699

File tree

6 files changed

+135
-64
lines changed

6 files changed

+135
-64
lines changed

crates/service/src/middleware/sender.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use axum::{
66
middleware::Next,
77
response::Response,
88
};
9-
use indexer_monitor::EscrowAccounts;
9+
use indexer_monitor::{EscrowAccounts, EscrowAccountsError};
1010
use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain};
1111
use tokio::sync::watch;
1212

@@ -18,9 +18,9 @@ pub struct SenderState {
1818
/// Used to recover the signer address
1919
pub domain_separator: Eip712Domain,
2020
/// Used to get the sender address given the signer address if v1 receipt
21-
pub escrow_accounts_v1: watch::Receiver<EscrowAccounts>,
21+
pub escrow_accounts_v1: Option<watch::Receiver<EscrowAccounts>>,
2222
/// Used to get the sender address given the signer address if v2 receipt
23-
pub escrow_accounts_v2: watch::Receiver<EscrowAccounts>,
23+
pub escrow_accounts_v2: Option<watch::Receiver<EscrowAccounts>>,
2424
}
2525

2626
/// The current query Sender address
@@ -48,14 +48,24 @@ pub async fn sender_middleware(
4848
if let Some(receipt) = request.extensions().get::<TapReceipt>() {
4949
let signer = receipt.recover_signer(&state.domain_separator)?;
5050
let sender = match receipt {
51-
TapReceipt::V1(_) => state
52-
.escrow_accounts_v1
53-
.borrow()
54-
.get_sender_for_signer(&signer)?,
55-
TapReceipt::V2(_) => state
56-
.escrow_accounts_v2
57-
.borrow()
58-
.get_sender_for_signer(&signer)?,
51+
TapReceipt::V1(_) => {
52+
if let Some(ref escrow_accounts_v1) = state.escrow_accounts_v1 {
53+
escrow_accounts_v1.borrow().get_sender_for_signer(&signer)?
54+
} else {
55+
return Err(IndexerServiceError::EscrowAccount(
56+
EscrowAccountsError::NoSenderFound { signer },
57+
));
58+
}
59+
}
60+
TapReceipt::V2(_) => {
61+
if let Some(ref escrow_accounts_v2) = state.escrow_accounts_v2 {
62+
escrow_accounts_v2.borrow().get_sender_for_signer(&signer)?
63+
} else {
64+
return Err(IndexerServiceError::EscrowAccount(
65+
EscrowAccountsError::NoSenderFound { signer },
66+
));
67+
}
68+
}
5969
};
6070
request.extensions_mut().insert(Sender(sender));
6171
}
@@ -100,8 +110,8 @@ mod tests {
100110

101111
let state = SenderState {
102112
domain_separator: test_assets::TAP_EIP712_DOMAIN.clone(),
103-
escrow_accounts_v1,
104-
escrow_accounts_v2,
113+
escrow_accounts_v1: Some(escrow_accounts_v1),
114+
escrow_accounts_v2: Some(escrow_accounts_v2),
105115
};
106116

107117
let middleware = from_fn_with_state(state, sender_middleware);

crates/service/src/service.rs

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -105,19 +105,58 @@ pub async fn run() -> anyhow::Result<()> {
105105
let indexer_address = config.indexer.indexer_address;
106106
let ipfs_url = config.service.ipfs_url.clone();
107107

108-
let router = ServiceRouter::builder()
109-
.database(database.clone())
110-
.domain_separator(domain_separator.clone())
111-
.graph_node(config.graph_node)
112-
.http_client(http_client)
113-
.release(release)
114-
.indexer(config.indexer)
115-
.service(config.service)
116-
.blockchain(config.blockchain)
117-
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
118-
.network_subgraph(network_subgraph, config.subgraphs.network)
119-
.escrow_subgraph(escrow_subgraph, config.subgraphs.escrow)
120-
.build();
108+
// Configure router with escrow watchers based on Horizon setting
109+
let router = if config.horizon.enabled {
110+
tracing::info!("Horizon support enabled - using escrow accounts v2");
111+
// Only create v2 watcher when Horizon is enabled
112+
let v2_watcher = indexer_monitor::escrow_accounts_v2(
113+
escrow_subgraph,
114+
indexer_address,
115+
config.subgraphs.escrow.config.syncing_interval_secs,
116+
true, // Reject thawing signers eagerly
117+
)
118+
.await
119+
.expect("Error creating escrow_accounts_v2 channel");
120+
121+
ServiceRouter::builder()
122+
.database(database.clone())
123+
.domain_separator(domain_separator.clone())
124+
.graph_node(config.graph_node)
125+
.http_client(http_client)
126+
.release(release)
127+
.indexer(config.indexer)
128+
.service(config.service)
129+
.blockchain(config.blockchain)
130+
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
131+
.network_subgraph(network_subgraph, config.subgraphs.network)
132+
.escrow_accounts_v2(v2_watcher)
133+
.build()
134+
} else {
135+
tracing::info!("Horizon support disabled - using escrow accounts v1");
136+
// Only create v1 watcher when Horizon is disabled
137+
let v1_watcher = indexer_monitor::escrow_accounts_v1(
138+
escrow_subgraph,
139+
indexer_address,
140+
config.subgraphs.escrow.config.syncing_interval_secs,
141+
true, // Reject thawing signers eagerly
142+
)
143+
.await
144+
.expect("Error creating escrow_accounts_v1 channel");
145+
146+
ServiceRouter::builder()
147+
.database(database.clone())
148+
.domain_separator(domain_separator.clone())
149+
.graph_node(config.graph_node)
150+
.http_client(http_client)
151+
.release(release)
152+
.indexer(config.indexer)
153+
.service(config.service)
154+
.blockchain(config.blockchain)
155+
.timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs)
156+
.network_subgraph(network_subgraph, config.subgraphs.network)
157+
.escrow_accounts_v1(v1_watcher)
158+
.build()
159+
};
121160

122161
serve_metrics(config.metrics.get_socket_addr());
123162

crates/service/src/service/router.rs

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -140,33 +140,42 @@ impl ServiceRouter {
140140
// Monitor escrow accounts v1
141141
// if not provided, create monitor from subgraph
142142
let escrow_accounts_v1 = match (self.escrow_accounts_v1, self.escrow_subgraph.as_ref()) {
143-
(Some(escrow_account), _) => escrow_account,
144-
(_, Some((escrow_subgraph, escrow))) => escrow_accounts_v1(
145-
escrow_subgraph,
146-
indexer_address,
147-
escrow.config.syncing_interval_secs,
148-
true, // Reject thawing signers eagerly
149-
)
150-
.await
151-
.expect("Error creating escrow_accounts channel"),
152-
(None, None) => panic!("No escrow accounts or escrow subgraph was provided"),
143+
(Some(escrow_account), _) => Some(escrow_account),
144+
(_, Some((escrow_subgraph, escrow))) => Some(
145+
escrow_accounts_v1(
146+
escrow_subgraph,
147+
indexer_address,
148+
escrow.config.syncing_interval_secs,
149+
true, // Reject thawing signers eagerly
150+
)
151+
.await
152+
.expect("Error creating escrow_accounts_v1 channel"),
153+
),
154+
(None, None) => None,
153155
};
154156

155157
// Monitor escrow accounts v2
156158
// if not provided, create monitor from subgraph
157159
let escrow_accounts_v2 = match (self.escrow_accounts_v2, self.escrow_subgraph.as_ref()) {
158-
(Some(escrow_account), _) => escrow_account,
159-
(_, Some((escrow_subgraph, escrow))) => escrow_accounts_v2(
160-
escrow_subgraph,
161-
indexer_address,
162-
escrow.config.syncing_interval_secs,
163-
true, // Reject thawing signers eagerly
164-
)
165-
.await
166-
.expect("Error creating escrow_accounts channel"),
167-
(None, None) => panic!("No escrow accounts or escrow subgraph was provided"),
160+
(Some(escrow_account), _) => Some(escrow_account),
161+
(_, Some((escrow_subgraph, escrow))) => Some(
162+
escrow_accounts_v2(
163+
escrow_subgraph,
164+
indexer_address,
165+
escrow.config.syncing_interval_secs,
166+
true, // Reject thawing signers eagerly
167+
)
168+
.await
169+
.expect("Error creating escrow_accounts_v2 channel"),
170+
),
171+
(None, None) => None,
168172
};
169173

174+
// Ensure at least one escrow accounts watcher is available
175+
if escrow_accounts_v1.is_none() && escrow_accounts_v2.is_none() {
176+
panic!("At least one escrow accounts watcher (v1 or v2) must be provided");
177+
}
178+
170179
// Monitor dispute manager address
171180
// if not provided, create monitor from subgraph
172181
let dispute_manager = match (self.dispute_manager, self.network_subgraph.as_ref()) {

crates/service/src/tap.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ impl IndexerTapContext {
5151
pub async fn get_checks(
5252
pgpool: PgPool,
5353
indexer_allocations: Receiver<HashMap<Address, Allocation>>,
54-
escrow_accounts_v1: Receiver<EscrowAccounts>,
55-
escrow_accounts_v2: Receiver<EscrowAccounts>,
54+
escrow_accounts_v1: Option<Receiver<EscrowAccounts>>,
55+
escrow_accounts_v2: Option<Receiver<EscrowAccounts>>,
5656
timestamp_error_tolerance: Duration,
5757
receipt_max_value: u128,
5858
) -> Vec<ReceiptCheck<TapReceipt>> {

crates/service/src/tap/checks/sender_balance_check.rs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ use crate::{
1313
};
1414

1515
pub struct SenderBalanceCheck {
16-
escrow_accounts_v1: Receiver<EscrowAccounts>,
17-
escrow_accounts_v2: Receiver<EscrowAccounts>,
16+
escrow_accounts_v1: Option<Receiver<EscrowAccounts>>,
17+
escrow_accounts_v2: Option<Receiver<EscrowAccounts>>,
1818
}
1919

2020
impl SenderBalanceCheck {
2121
pub fn new(
22-
escrow_accounts_v1: Receiver<EscrowAccounts>,
23-
escrow_accounts_v2: Receiver<EscrowAccounts>,
22+
escrow_accounts_v1: Option<Receiver<EscrowAccounts>>,
23+
escrow_accounts_v2: Option<Receiver<EscrowAccounts>>,
2424
) -> Self {
2525
Self {
2626
escrow_accounts_v1,
@@ -36,17 +36,32 @@ impl Check<TapReceipt> for SenderBalanceCheck {
3636
ctx: &tap_core::receipt::Context,
3737
receipt: &CheckingReceipt,
3838
) -> CheckResult {
39-
let escrow_accounts_snapshot_v1 = self.escrow_accounts_v1.borrow();
40-
let escrow_accounts_snapshot_v2 = self.escrow_accounts_v2.borrow();
41-
4239
let Sender(receipt_sender) = ctx
4340
.get::<Sender>()
4441
.ok_or(CheckError::Failed(anyhow::anyhow!("Could not find sender")))?;
4542

4643
// get balance for escrow account given receipt type
4744
let balance_result = match receipt.signed_receipt() {
48-
TapReceipt::V1(_) => escrow_accounts_snapshot_v1.get_balance_for_sender(receipt_sender),
49-
TapReceipt::V2(_) => escrow_accounts_snapshot_v2.get_balance_for_sender(receipt_sender),
45+
TapReceipt::V1(_) => {
46+
if let Some(ref escrow_accounts_v1) = self.escrow_accounts_v1 {
47+
let escrow_accounts_snapshot_v1 = escrow_accounts_v1.borrow();
48+
escrow_accounts_snapshot_v1.get_balance_for_sender(receipt_sender)
49+
} else {
50+
return Err(CheckError::Failed(anyhow!(
51+
"Receipt v1 received but no escrow accounts v1 watcher is available"
52+
)));
53+
}
54+
}
55+
TapReceipt::V2(_) => {
56+
if let Some(ref escrow_accounts_v2) = self.escrow_accounts_v2 {
57+
let escrow_accounts_snapshot_v2 = escrow_accounts_v2.borrow();
58+
escrow_accounts_snapshot_v2.get_balance_for_sender(receipt_sender)
59+
} else {
60+
return Err(CheckError::Failed(anyhow!(
61+
"Receipt v2 received but no escrow accounts v2 watcher is available"
62+
)));
63+
}
64+
}
5065
};
5166

5267
// Check that the sender has a non-zero balance -- more advanced accounting is done in

crates/tap-agent/src/agent.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -176,14 +176,12 @@ pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandl
176176

177177
let config = Box::leak(Box::new({
178178
let mut config = SenderAccountConfig::from_config(&CONFIG);
179-
// Enable Horizon support
180-
config.horizon_enabled = true;
181-
// Add a warning log so operators know their setting was ignore
179+
// Use the configuration setting for Horizon support
180+
config.horizon_enabled = CONFIG.horizon.enabled;
182181
if CONFIG.horizon.enabled {
183-
tracing::warn!(
184-
"Horizon support is configured as enabled but has been forcibly disabled as it's not fully supported yet. \
185-
This is a temporary measure until Horizon support is stable."
186-
);
182+
tracing::info!("Horizon support is enabled");
183+
} else {
184+
tracing::info!("Horizon support is disabled");
187185
}
188186
config
189187
}));

0 commit comments

Comments
 (0)