Skip to content

Commit a1b3f63

Browse files
committed
feat(tap-agent): finish v2 implementation
1 parent 8b4cb66 commit a1b3f63

File tree

4 files changed

+157
-39
lines changed

4 files changed

+157
-39
lines changed

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -631,14 +631,6 @@ impl State {
631631
sender_balance = self.sender_balance.to_u128(),
632632
"Denying sender."
633633
);
634-
// Check if this is horizon like sender and if it is actually enable,
635-
// otherwise just ignore.
636-
// FIXME: This should be removed once full horizon support
637-
// is implemented!
638-
if matches!(self.sender_type, SenderType::Horizon) && !self.config.horizon_enabled {
639-
return;
640-
}
641-
642634
SenderAccount::deny_sender(self.sender_type, &self.pgpool, self.sender).await;
643635
self.denied = true;
644636
SENDER_DENIED
@@ -865,14 +857,85 @@ impl Actor for SenderAccount {
865857
_ => vec![],
866858
}
867859
}
868-
// TODO Implement query for unfinalized v2 transactions
869-
// Depends on Escrow Subgraph Schema
870860
SenderType::Horizon => {
871861
if config.horizon_enabled {
872-
todo!("Implement query for unfinalized v2 transactions, It depends on Escrow Subgraph Schema")
862+
// V2 doesn't have transaction tracking like V1, but we can check if the RAVs
863+
// we're about to redeem are still the latest ones by querying LatestRavs.
864+
// If the subgraph has newer RAVs, it means ours were already redeemed.
865+
use indexer_query::latest_ravs_v2::{self, LatestRavs};
866+
867+
let collection_ids: Vec<String> = last_non_final_ravs
868+
.iter()
869+
.map(|(collection_id, _)| collection_id.clone())
870+
.collect();
871+
872+
if !collection_ids.is_empty() {
873+
// For V2, use the indexer address as the data service since the indexer
874+
// is providing the data service for the queries
875+
let data_service = config.indexer_address;
876+
877+
match escrow_subgraph
878+
.query::<LatestRavs, _>(latest_ravs_v2::Variables {
879+
payer: format!("{:x?}", sender_id),
880+
data_service: format!("{:x?}", data_service),
881+
service_provider: format!("{:x?}", config.indexer_address),
882+
collection_ids: collection_ids.clone(),
883+
})
884+
.await
885+
{
886+
Ok(Ok(response)) => {
887+
// Create a map of our current RAVs for easy lookup
888+
let our_ravs: HashMap<String, u128> = last_non_final_ravs
889+
.iter()
890+
.map(|(collection_id, value)| {
891+
let value_u128 = value
892+
.to_bigint()
893+
.and_then(|v| v.to_u128())
894+
.unwrap_or(0);
895+
(collection_id.clone(), value_u128)
896+
})
897+
.collect();
898+
899+
// Check which RAVs have been updated (indicating redemption)
900+
let mut finalized_allocation_ids = vec![];
901+
for rav in response.latest_ravs {
902+
if let Some(&our_value) = our_ravs.get(&rav.id) {
903+
// If the subgraph RAV has higher value, our RAV was redeemed
904+
if let Ok(subgraph_value) =
905+
rav.value_aggregate.parse::<u128>()
906+
{
907+
if subgraph_value > our_value {
908+
// Return collection ID string for filtering
909+
finalized_allocation_ids.push(rav.id);
910+
}
911+
}
912+
}
913+
}
914+
finalized_allocation_ids
915+
}
916+
Ok(Err(e)) => {
917+
tracing::warn!(
918+
error = %e,
919+
sender = %sender_id,
920+
"Failed to query V2 latest RAVs, assuming none are finalized"
921+
);
922+
vec![]
923+
}
924+
Err(e) => {
925+
tracing::warn!(
926+
error = %e,
927+
sender = %sender_id,
928+
"Failed to execute V2 latest RAVs query, assuming none are finalized"
929+
);
930+
vec![]
931+
}
932+
}
933+
} else {
934+
vec![]
935+
}
936+
} else {
937+
vec![]
873938
}
874-
// if we have any problems, we don't want to filter out
875-
vec![]
876939
}
877940
};
878941

crates/tap-agent/src/tap/context/rav.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ impl RavRead<tap_graph::v2::ReceiptAggregateVoucher> for TapAgentContext<Horizon
153153
type AdapterError = AdapterError;
154154

155155
async fn last_rav(&self) -> Result<Option<tap_graph::v2::SignedRav>, Self::AdapterError> {
156-
// TODO add data service filter
157156
let row = sqlx::query!(
158157
r#"
159158
SELECT
@@ -169,10 +168,12 @@ impl RavRead<tap_graph::v2::ReceiptAggregateVoucher> for TapAgentContext<Horizon
169168
WHERE
170169
collection_id = $1
171170
AND payer = $2
172-
AND service_provider = $3
171+
AND data_service = $3
172+
AND service_provider = $4
173173
"#,
174174
CollectionId::from(self.allocation_id).encode_hex(),
175175
self.sender.encode_hex(),
176+
self.indexer_address.encode_hex(),
176177
self.indexer_address.encode_hex()
177178
)
178179
.fetch_optional(&self.pgpool)

crates/tap-agent/src/tap/context/receipt.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,6 @@ impl ReceiptRead<TapReceipt> for TapAgentContext<Horizon> {
224224
error: format!("{e:?}."),
225225
})?;
226226

227-
// TODO filter by data_service when we have multiple data services
228-
229227
let records = sqlx::query!(
230228
r#"
231229
SELECT
@@ -242,15 +240,17 @@ impl ReceiptRead<TapReceipt> for TapAgentContext<Horizon> {
242240
WHERE
243241
collection_id = $1
244242
AND payer = $2
245-
AND service_provider = $3
246-
AND signer_address IN (SELECT unnest($4::text[]))
247-
AND $5::numrange @> timestamp_ns
243+
AND data_service = $3
244+
AND service_provider = $4
245+
AND signer_address IN (SELECT unnest($5::text[]))
246+
AND $6::numrange @> timestamp_ns
248247
ORDER BY timestamp_ns ASC
249-
LIMIT $6
248+
LIMIT $7
250249
"#,
251250
CollectionId::from(self.allocation_id).encode_hex(),
252251
self.sender.encode_hex(),
253252
self.indexer_address.encode_hex(),
253+
self.indexer_address.encode_hex(),
254254
&signers,
255255
rangebounds_to_pgrange(timestamp_range_ns),
256256
(receipts_limit + 1) as i64,
@@ -364,13 +364,15 @@ impl ReceiptDelete for TapAgentContext<Horizon> {
364364
AND signer_address IN (SELECT unnest($2::text[]))
365365
AND $3::numrange @> timestamp_ns
366366
AND payer = $4
367-
AND service_provider = $5
367+
AND data_service = $5
368+
AND service_provider = $6
368369
"#,
369370
CollectionId::from(self.allocation_id).encode_hex(),
370371
&signers,
371372
rangebounds_to_pgrange(timestamp_ns),
372373
self.sender.encode_hex(),
373374
self.indexer_address.encode_hex(),
375+
self.indexer_address.encode_hex(),
374376
)
375377
.execute(&self.pgpool)
376378
.await?;

crates/tap-agent/src/test.rs

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ pub fn create_rav_v2(
329329
timestampNs: timestamp_ns,
330330
valueAggregate: value_aggregate,
331331
payer: SENDER.1,
332-
dataService: Address::ZERO,
332+
dataService: INDEXER.1, // Use the same indexer address as the context
333333
serviceProvider: INDEXER.1,
334334
metadata: Bytes::new(),
335335
},
@@ -369,7 +369,7 @@ impl CreateReceipt for Horizon {
369369
collection_id,
370370
payer: SENDER.1,
371371
service_provider: INDEXER.1,
372-
data_service: Address::ZERO,
372+
data_service: INDEXER.1, // Use the same indexer address as the context
373373
nonce,
374374
timestamp_ns,
375375
value,
@@ -525,21 +525,38 @@ pub async fn store_batch_receipts(
525525
let mut values = Vec::with_capacity(receipts_len);
526526

527527
for receipt in receipts {
528-
let receipt = match receipt.signed_receipt() {
529-
TapReceipt::V1(receipt) => receipt,
530-
TapReceipt::V2(_) => unimplemented!("V2 receipts not supported"),
528+
match receipt.signed_receipt() {
529+
TapReceipt::V1(receipt) => {
530+
signers.push(
531+
receipt
532+
.recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR)
533+
.unwrap()
534+
.encode_hex(),
535+
);
536+
signatures.push(receipt.signature.as_bytes().to_vec());
537+
allocation_ids.push(receipt.message.allocation_id.encode_hex().to_string());
538+
timestamps.push(BigDecimal::from(receipt.message.timestamp_ns));
539+
nonces.push(BigDecimal::from(receipt.message.nonce));
540+
values.push(BigDecimal::from(receipt.message.value));
541+
}
542+
TapReceipt::V2(receipt) => {
543+
use thegraph_core::CollectionId;
544+
// For V2, store collection_id in the allocation_id field (as per the database reuse strategy)
545+
let collection_id_as_allocation =
546+
CollectionId::from(receipt.message.collection_id).as_address();
547+
signers.push(
548+
receipt
549+
.recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR)
550+
.unwrap()
551+
.encode_hex(),
552+
);
553+
signatures.push(receipt.signature.as_bytes().to_vec());
554+
allocation_ids.push(collection_id_as_allocation.encode_hex().to_string());
555+
timestamps.push(BigDecimal::from(receipt.message.timestamp_ns));
556+
nonces.push(BigDecimal::from(receipt.message.nonce));
557+
values.push(BigDecimal::from(receipt.message.value));
558+
}
531559
};
532-
signers.push(
533-
receipt
534-
.recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR)
535-
.unwrap()
536-
.encode_hex(),
537-
);
538-
signatures.push(receipt.signature.as_bytes().to_vec());
539-
allocation_ids.push(receipt.message.allocation_id.encode_hex().to_string());
540-
timestamps.push(BigDecimal::from(receipt.message.timestamp_ns));
541-
nonces.push(BigDecimal::from(receipt.message.nonce));
542-
values.push(BigDecimal::from(receipt.message.value));
543560
}
544561
let _ = sqlx::query!(
545562
r#"INSERT INTO scalar_tap_receipts (
@@ -579,7 +596,7 @@ pub async fn store_invalid_receipt(
579596
) -> anyhow::Result<u64> {
580597
match signed_receipt {
581598
TapReceipt::V1(signed_receipt) => store_invalid_receipt_v1(pgpool, signed_receipt).await,
582-
TapReceipt::V2(_) => unimplemented!("V2 not supported"),
599+
TapReceipt::V2(signed_receipt) => store_invalid_receipt_v2(pgpool, signed_receipt).await,
583600
}
584601
}
585602

@@ -613,6 +630,41 @@ pub async fn store_invalid_receipt_v1(
613630
Ok(id)
614631
}
615632

633+
pub async fn store_invalid_receipt_v2(
634+
pgpool: &PgPool,
635+
signed_receipt: &tap_graph::v2::SignedReceipt,
636+
) -> anyhow::Result<u64> {
637+
use thegraph_core::CollectionId;
638+
let encoded_signature = signed_receipt.signature.as_bytes().to_vec();
639+
640+
// Store collection_id in allocation_id field (database reuse strategy)
641+
let collection_id_as_allocation =
642+
CollectionId::from(signed_receipt.message.collection_id).as_address();
643+
644+
let record = sqlx::query!(
645+
r#"
646+
INSERT INTO scalar_tap_receipts_invalid (signer_address, signature, allocation_id, timestamp_ns, nonce, value)
647+
VALUES ($1, $2, $3, $4, $5, $6)
648+
RETURNING id
649+
"#,
650+
signed_receipt
651+
.recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR)
652+
.unwrap()
653+
.encode_hex(),
654+
encoded_signature,
655+
collection_id_as_allocation.encode_hex(),
656+
BigDecimal::from(signed_receipt.message.timestamp_ns),
657+
BigDecimal::from(signed_receipt.message.nonce),
658+
BigDecimal::from(BigInt::from(signed_receipt.message.value)),
659+
)
660+
.fetch_one(pgpool)
661+
.await?;
662+
663+
// id is BIGSERIAL, so it should be safe to cast to u64.
664+
let id: u64 = record.id.try_into()?;
665+
Ok(id)
666+
}
667+
616668
/// Fixture to generate a wallet and address
617669
pub fn wallet(index: u32) -> (PrivateKeySigner, Address) {
618670
let wallet: PrivateKeySigner= MnemonicBuilder::<English>::default()

0 commit comments

Comments
 (0)