Skip to content

Commit 1ce3ed0

Browse files
committed
fix(tap-agent): enforce Horizon bucket isolation for RAV/receipt queries
- Use full Horizon identity (payer, data_service, service_provider, collection_id) in reads to prevent cross-bucket reads. - sender_account.rs: last non-final RAVs lookup now filters by service_provider and data_service. - sender_accounts_manager.rs: non-final RAV aggregation constrained to the configured bucket via WHERE data_service = $1 AND service_provider = $2. - sender_allocation.rs (Horizon): - calculate_invalid_receipts_fee() now filters by collection_id, payer, service_provider, and data_service. - calculate_fee_until_last_id() now filters by collection_id, payer, service_provider, and data_service (and updated SQLx arg indices). - Rationale: Aligns with tap_horizon_ravs PK, matches Horizon’s identity model, avoids mixing state across different data services/providers, and complements the service-side DataServiceCheck which only guards ingestion.
1 parent 8f9e6ae commit 1ce3ed0

File tree

3 files changed

+46
-9
lines changed

3 files changed

+46
-9
lines changed

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ pub struct SenderAccountArgs {
275275
/// Watcher that returns a list of escrow accounts for current indexer
276276
pub escrow_accounts: Receiver<EscrowAccounts>,
277277
/// Watcher of normalized allocation IDs (Legacy/Horizon) for this sender type
278-
pub indexer_allocations: Receiver<HashSet<AllocationId>>,
278+
pub indexer_allocations: Receiver<HashSet<AllocationId>>,
279279
/// SubgraphClient of the escrow subgraph
280280
pub escrow_subgraph: &'static SubgraphClient,
281281
/// SubgraphClient of the network subgraph
@@ -888,9 +888,18 @@ impl Actor for SenderAccount {
888888
r#"
889889
SELECT collection_id, value_aggregate
890890
FROM tap_horizon_ravs
891-
WHERE payer = $1 AND last AND NOT final;
891+
WHERE payer = $1
892+
AND service_provider = $2
893+
AND data_service = $3
894+
AND last AND NOT final;
892895
"#,
893896
sender_id.encode_hex(),
897+
// service_provider is the indexer address; data_service comes from TapMode config
898+
config.indexer_address.encode_hex(),
899+
config
900+
.tap_mode
901+
.require_subgraph_service_address()
902+
.encode_hex(),
894903
)
895904
.fetch_all(&pgpool)
896905
.await

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -935,8 +935,16 @@ impl State {
935935
payer,
936936
ARRAY_AGG(DISTINCT collection_id) FILTER (WHERE NOT last) AS allocation_ids
937937
FROM tap_horizon_ravs
938+
WHERE data_service = $1 AND service_provider = $2
938939
GROUP BY payer
939-
"#
940+
"#,
941+
// Constrain to our Horizon bucket to avoid conflating RAVs across services/providers
942+
self
943+
.config
944+
.tap_mode
945+
.require_subgraph_service_address()
946+
.encode_hex(),
947+
self.config.indexer_address.encode_hex()
940948
)
941949
.fetch_all(&self.pgpool)
942950
.await

crates/tap-agent/src/agent/sender_allocation.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,13 +1277,19 @@ impl DatabaseInteractions for SenderAllocationState<Horizon> {
12771277
WHERE timestamp_ns BETWEEN $1 AND $2
12781278
AND collection_id = $3
12791279
AND service_provider = $4
1280-
AND signer_address IN (SELECT unnest($5::text[]));
1280+
AND payer = $5
1281+
AND data_service = $6
1282+
AND signer_address IN (SELECT unnest($7::text[]));
12811283
"#,
12821284
BigDecimal::from(min_timestamp),
12831285
BigDecimal::from(max_timestamp),
12841286
// self.allocation_id is already a CollectionId in Horizon state
12851287
self.allocation_id.encode_hex(),
12861288
self.indexer_address.encode_hex(),
1289+
self.sender.encode_hex(),
1290+
self.data_service
1291+
.expect("data_service should be available in Horizon mode")
1292+
.encode_hex(),
12871293
&signers,
12881294
)
12891295
.execute(&self.pgpool)
@@ -1305,10 +1311,18 @@ impl DatabaseInteractions for SenderAllocationState<Horizon> {
13051311
tap_horizon_receipts_invalid
13061312
WHERE
13071313
collection_id = $1
1308-
AND signer_address IN (SELECT unnest($2::text[]))
1314+
AND service_provider = $2
1315+
AND payer = $3
1316+
AND data_service = $4
1317+
AND signer_address IN (SELECT unnest($5::text[]))
13091318
"#,
13101319
// self.allocation_id is already a CollectionId in Horizon state
13111320
self.allocation_id.encode_hex(),
1321+
self.indexer_address.encode_hex(),
1322+
self.sender.encode_hex(),
1323+
self.data_service
1324+
.expect("data_service should be available in Horizon mode")
1325+
.encode_hex(),
13121326
&signers
13131327
)
13141328
.fetch_one(&self.pgpool)
@@ -1353,13 +1367,19 @@ impl DatabaseInteractions for SenderAllocationState<Horizon> {
13531367
WHERE
13541368
collection_id = $1
13551369
AND service_provider = $2
1356-
AND id <= $3
1357-
AND signer_address IN (SELECT unnest($4::text[]))
1358-
AND timestamp_ns > $5
1370+
AND payer = $3
1371+
AND data_service = $4
1372+
AND id <= $5
1373+
AND signer_address IN (SELECT unnest($6::text[]))
1374+
AND timestamp_ns > $7
13591375
"#,
13601376
// self.allocation_id is already a CollectionId in Horizon state
13611377
self.allocation_id.encode_hex(),
13621378
self.indexer_address.encode_hex(),
1379+
self.sender.encode_hex(),
1380+
self.data_service
1381+
.expect("data_service should be available in Horizon mode")
1382+
.encode_hex(),
13631383
last_id,
13641384
&signers,
13651385
BigDecimal::from(
@@ -1399,7 +1419,7 @@ impl DatabaseInteractions for SenderAllocationState<Horizon> {
13991419
allocation_id = %self.allocation_id,
14001420
"Marking rav as last!",
14011421
);
1402-
// TODO add service_provider filter
1422+
14031423
let updated_rows = sqlx::query!(
14041424
r#"
14051425
UPDATE tap_horizon_ravs

0 commit comments

Comments
 (0)