Skip to content

Commit b32fcc1

Browse files
committed
chore(agent): add db storage for invalid receipts
1 parent f2144b8 commit b32fcc1

File tree

1 file changed

+142
-14
lines changed

1 file changed

+142
-14
lines changed

crates/tap-agent/src/agent/sender_allocation_task.rs

Lines changed: 142 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use tap_core::{
3030

3131
#[cfg(any(test, feature = "test"))]
3232
use tap_core::receipt::checks::CheckList;
33-
use thegraph_core::alloy::primitives::Address;
33+
use thegraph_core::alloy::{hex::ToHexExt, primitives::Address};
3434

3535
/// Trait for creating dummy aggregator clients in tests
3636
#[cfg(any(test, feature = "test"))]
@@ -72,10 +72,8 @@ struct TaskState<T: NetworkVersion> {
7272
/// TAP Agent context for direct access to validation functions
7373
tap_context: TapAgentContext<T>,
7474
/// Database connection pool
75-
#[allow(dead_code)]
7675
pgpool: sqlx::PgPool,
7776
/// Allocation/collection identifier for TAP operations
78-
#[allow(dead_code)]
7977
tap_allocation_id: Address,
8078
/// Sender address
8179
#[allow(dead_code)]
@@ -637,7 +635,7 @@ where
637635
}
638636
}
639637
};
640-
638+
641639
#[cfg(not(test))]
642640
let signature_valid = match state.tap_context.verify_signer(signer_address).await {
643641
Ok(is_valid) => is_valid,
@@ -822,18 +820,148 @@ where
822820
tap_core::receipt::ReceiptWithState<tap_core::receipt::state::Failed, TapReceipt>,
823821
>,
824822
) -> anyhow::Result<()> {
825-
// For now, just log the invalid receipts
826-
// TODO: Store in database table for tracking
827-
for receipt_with_state in invalid_receipts {
828-
// Access the receipt through the correct field name
823+
if invalid_receipts.is_empty() {
824+
return Ok(());
825+
}
826+
827+
// Collect data for batch insert
828+
let mut signer_addresses = Vec::new();
829+
let mut signatures = Vec::new();
830+
let mut allocation_ids = Vec::new();
831+
let mut timestamps = Vec::new();
832+
let mut nonces = Vec::new();
833+
let mut values = Vec::new();
834+
let mut error_logs = Vec::new();
835+
836+
for receipt_with_state in &invalid_receipts {
829837
let receipt = receipt_with_state.signed_receipt();
830-
tracing::warn!(
831-
allocation_id = ?state.allocation_id,
832-
receipt_value = receipt.value(),
833-
receipt_timestamp = receipt.timestamp_ns(),
834-
"Invalid receipt detected"
835-
);
838+
// Get the failed checks error message
839+
let error_message = format!("Failed validation: {:?}", receipt_with_state);
840+
841+
// Extract receipt details based on version
842+
match receipt {
843+
TapReceipt::V1(v1_receipt) => {
844+
// For V1, we'll store the signer from the notification or use a placeholder
845+
// In a real implementation, we'd recover the signer using domain separator
846+
let signer = Address::ZERO; // TODO: Get actual signer from notification
847+
signer_addresses.push(signer.encode_hex());
848+
signatures.push(v1_receipt.signature.as_bytes().to_vec());
849+
allocation_ids.push(v1_receipt.message.allocation_id.encode_hex());
850+
timestamps.push(v1_receipt.message.timestamp_ns as i64);
851+
nonces.push(v1_receipt.message.nonce as i64);
852+
values.push(v1_receipt.message.value as i64);
853+
error_logs.push(error_message.clone());
854+
}
855+
TapReceipt::V2(v2_receipt) => {
856+
// For V2, we'll store the signer from the notification or use a placeholder
857+
// In a real implementation, we'd recover the signer using domain separator
858+
let signer = Address::ZERO; // TODO: Get actual signer from notification
859+
signer_addresses.push(signer.encode_hex());
860+
signatures.push(v2_receipt.signature.as_bytes().to_vec());
861+
// For V2, we need the collection_id from the message
862+
allocation_ids.push(v2_receipt.message.collection_id.to_string());
863+
timestamps.push(v2_receipt.message.timestamp_ns as i64);
864+
nonces.push(v2_receipt.message.nonce as i64);
865+
values.push(v2_receipt.message.value as i64);
866+
error_logs.push(error_message.clone());
867+
}
868+
}
836869
}
870+
871+
// Store in appropriate table based on version
872+
match state.allocation_id {
873+
AllocationId::Legacy(_) => {
874+
// Store in scalar_tap_receipts_invalid table for V1 receipts
875+
if !signer_addresses.is_empty() {
876+
let result = sqlx::query!(
877+
r#"
878+
INSERT INTO scalar_tap_receipts_invalid
879+
(signer_address, signature, allocation_id, timestamp_ns, nonce, value, error_log)
880+
SELECT * FROM UNNEST($1::text[], $2::bytea[], $3::text[], $4::bigint[], $5::bigint[], $6::bigint[], $7::text[])
881+
"#,
882+
&signer_addresses,
883+
&signatures,
884+
&allocation_ids,
885+
&timestamps,
886+
&nonces,
887+
&values,
888+
&error_logs
889+
)
890+
.execute(&state.pgpool)
891+
.await;
892+
893+
match result {
894+
Ok(_) => {
895+
tracing::debug!(
896+
allocation_id = ?state.allocation_id,
897+
count = invalid_receipts.len(),
898+
"Stored invalid V1 receipts in scalar_tap_receipts_invalid"
899+
);
900+
}
901+
Err(e) => {
902+
tracing::error!(
903+
allocation_id = ?state.allocation_id,
904+
error = %e,
905+
"Failed to store invalid V1 receipts"
906+
);
907+
return Err(anyhow::anyhow!(
908+
"Database error storing invalid V1 receipts: {}",
909+
e
910+
));
911+
}
912+
}
913+
}
914+
}
915+
AllocationId::Horizon(_) => {
916+
// Store in tap_horizon_receipts_invalid table for V2 receipts
917+
if !signer_addresses.is_empty() {
918+
let result = sqlx::query!(
919+
r#"
920+
INSERT INTO tap_horizon_receipts_invalid
921+
(signer_address, signature, collection_id, timestamp_ns, nonce, value, error_log)
922+
SELECT * FROM UNNEST($1::text[], $2::bytea[], $3::text[], $4::bigint[], $5::bigint[], $6::bigint[], $7::text[])
923+
"#,
924+
&signer_addresses,
925+
&signatures,
926+
&allocation_ids,
927+
&timestamps,
928+
&nonces,
929+
&values,
930+
&error_logs
931+
)
932+
.execute(&state.pgpool)
933+
.await;
934+
935+
match result {
936+
Ok(_) => {
937+
tracing::debug!(
938+
allocation_id = ?state.allocation_id,
939+
count = invalid_receipts.len(),
940+
"Stored invalid V2 receipts in tap_horizon_receipts_invalid"
941+
);
942+
}
943+
Err(e) => {
944+
tracing::error!(
945+
allocation_id = ?state.allocation_id,
946+
error = %e,
947+
"Failed to store invalid V2 receipts"
948+
);
949+
return Err(anyhow::anyhow!(
950+
"Database error storing invalid V2 receipts: {}",
951+
e
952+
));
953+
}
954+
}
955+
}
956+
}
957+
}
958+
959+
tracing::info!(
960+
allocation_id = ?state.allocation_id,
961+
count = invalid_receipts.len(),
962+
"Stored invalid receipts in database"
963+
);
964+
837965
Ok(())
838966
}
839967

0 commit comments

Comments
 (0)