Skip to content

Commit 9db0476

Browse files
committed
chore(agent): add db storage for invalid receipts
1 parent b1da199 commit 9db0476

File tree

2 files changed

+176
-26
lines changed

2 files changed

+176
-26
lines changed

crates/tap-agent/src/agent/sender_account_task.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -345,23 +345,44 @@ impl SenderAccountTask {
345345

346346
#[cfg(not(any(test, feature = "test")))]
347347
{
348-
// TODO: Implement production child task spawning
349-
// This requires proper integration with the actual SenderAllocationTask spawn method
350-
// that includes TAP manager, aggregator client, etc.
351-
//
352-
// For now, we'll skip this to maintain build compatibility
353-
// The proper implementation would look like:
354-
//
355-
// 1. Create aggregator client for this sender
356-
// 2. Set up TAP manager with proper configuration
357-
// 3. Spawn SenderAllocationTask with full parameters
358-
// 4. Set up proper parent-child communication channel
348+
// Create a self-reference handle for the child to communicate back
349+
let (self_tx, mut self_rx) = mpsc::channel::<SenderAccountMessage>(10);
359350

351+
// In production, we need to create a proper handle without test methods
352+
// For now, we'll create a simple wrapper that can send messages
353+
// This is a placeholder until full TAP manager integration
354+
355+
// Convert allocation_id to Address for TAP context
356+
let tap_allocation_id = match allocation_id {
357+
AllocationId::Legacy(id) => id.into_inner(),
358+
AllocationId::Horizon(id) => thegraph_core::AllocationId::from(id).into_inner(),
359+
};
360+
361+
// TODO: In production, we need proper TAP manager and aggregator client creation
362+
// This would require:
363+
// 1. Create TapAgentContext with proper configuration
364+
// 2. Create TapManager with domain separator and required checks
365+
// 3. Create aggregator client using sender_aggregator_endpoint
366+
// 4. Handle both Legacy and Horizon network versions properly
367+
//
368+
// For now, we'll log that production spawning needs implementation
360369
tracing::warn!(
361370
sender = %state.sender,
362371
allocation_id = ?allocation_id,
363-
"Production sender allocation spawning not yet implemented - requires TAP manager integration"
372+
tap_allocation_id = %tap_allocation_id,
373+
"Production sender allocation spawning requires TAP manager integration - not yet implemented"
364374
);
375+
376+
// Set up the message forwarder for when production implementation is complete
377+
tokio::spawn(async move {
378+
while let Some(msg) = self_rx.recv().await {
379+
tracing::debug!(
380+
message = ?msg,
381+
"Production child allocation task would send message to parent"
382+
);
383+
// In production, this would route messages back to the parent task
384+
}
385+
});
365386
}
366387

367388
Ok(())

crates/tap-agent/src/agent/sender_allocation_task.rs

Lines changed: 143 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use tap_core::{
3030

3131
#[cfg(any(test, feature = "test"))]
3232
use tap_core::receipt::checks::CheckList;
33-
use thegraph_core::alloy::primitives::Address;
33+
use thegraph_core::alloy::{hex::ToHexExt, primitives::Address};
3434

3535
/// Trait for creating dummy aggregator clients in tests
3636
#[cfg(any(test, feature = "test"))]
@@ -72,10 +72,9 @@ struct TaskState<T: NetworkVersion> {
7272
/// TAP Agent context for direct access to validation functions
7373
tap_context: TapAgentContext<T>,
7474
/// Database connection pool
75-
#[allow(dead_code)]
7675
pgpool: sqlx::PgPool,
7776
/// Allocation/collection identifier for TAP operations
78-
#[allow(dead_code)]
77+
#[allow(dead_code)] // Used in production code paths only
7978
tap_allocation_id: Address,
8079
/// Sender address
8180
#[allow(dead_code)]
@@ -637,7 +636,7 @@ where
637636
}
638637
}
639638
};
640-
639+
641640
#[cfg(not(test))]
642641
let signature_valid = match state.tap_context.verify_signer(signer_address).await {
643642
Ok(is_valid) => is_valid,
@@ -822,18 +821,148 @@ where
822821
tap_core::receipt::ReceiptWithState<tap_core::receipt::state::Failed, TapReceipt>,
823822
>,
824823
) -> anyhow::Result<()> {
825-
// For now, just log the invalid receipts
826-
// TODO: Store in database table for tracking
827-
for receipt_with_state in invalid_receipts {
828-
// Access the receipt through the correct field name
824+
if invalid_receipts.is_empty() {
825+
return Ok(());
826+
}
827+
828+
// Collect data for batch insert
829+
let mut signer_addresses = Vec::new();
830+
let mut signatures = Vec::new();
831+
let mut allocation_ids = Vec::new();
832+
let mut timestamps = Vec::new();
833+
let mut nonces = Vec::new();
834+
let mut values = Vec::new();
835+
let mut error_logs = Vec::new();
836+
837+
for receipt_with_state in &invalid_receipts {
829838
let receipt = receipt_with_state.signed_receipt();
830-
tracing::warn!(
831-
allocation_id = ?state.allocation_id,
832-
receipt_value = receipt.value(),
833-
receipt_timestamp = receipt.timestamp_ns(),
834-
"Invalid receipt detected"
835-
);
839+
// Get the failed checks error message
840+
let error_message = format!("Failed validation: {receipt_with_state:?}");
841+
842+
// Extract receipt details based on version
843+
match receipt {
844+
TapReceipt::V1(v1_receipt) => {
845+
// For V1, we'll store the signer from the notification or use a placeholder
846+
// In a real implementation, we'd recover the signer using domain separator
847+
let signer = Address::ZERO; // TODO: Get actual signer from notification
848+
signer_addresses.push(signer.encode_hex());
849+
signatures.push(v1_receipt.signature.as_bytes().to_vec());
850+
allocation_ids.push(v1_receipt.message.allocation_id.encode_hex());
851+
timestamps.push(v1_receipt.message.timestamp_ns as i64);
852+
nonces.push(v1_receipt.message.nonce as i64);
853+
values.push(v1_receipt.message.value as i64);
854+
error_logs.push(error_message.clone());
855+
}
856+
TapReceipt::V2(v2_receipt) => {
857+
// For V2, we'll store the signer from the notification or use a placeholder
858+
// In a real implementation, we'd recover the signer using domain separator
859+
let signer = Address::ZERO; // TODO: Get actual signer from notification
860+
signer_addresses.push(signer.encode_hex());
861+
signatures.push(v2_receipt.signature.as_bytes().to_vec());
862+
// For V2, we need the collection_id from the message
863+
allocation_ids.push(v2_receipt.message.collection_id.to_string());
864+
timestamps.push(v2_receipt.message.timestamp_ns as i64);
865+
nonces.push(v2_receipt.message.nonce as i64);
866+
values.push(v2_receipt.message.value as i64);
867+
error_logs.push(error_message.clone());
868+
}
869+
}
836870
}
871+
872+
// Store in appropriate table based on version
873+
match state.allocation_id {
874+
AllocationId::Legacy(_) => {
875+
// Store in scalar_tap_receipts_invalid table for V1 receipts
876+
if !signer_addresses.is_empty() {
877+
let result = sqlx::query!(
878+
r#"
879+
INSERT INTO scalar_tap_receipts_invalid
880+
(signer_address, signature, allocation_id, timestamp_ns, nonce, value, error_log)
881+
SELECT * FROM UNNEST($1::text[], $2::bytea[], $3::text[], $4::bigint[], $5::bigint[], $6::bigint[], $7::text[])
882+
"#,
883+
&signer_addresses,
884+
&signatures,
885+
&allocation_ids,
886+
&timestamps,
887+
&nonces,
888+
&values,
889+
&error_logs
890+
)
891+
.execute(&state.pgpool)
892+
.await;
893+
894+
match result {
895+
Ok(_) => {
896+
tracing::debug!(
897+
allocation_id = ?state.allocation_id,
898+
count = invalid_receipts.len(),
899+
"Stored invalid V1 receipts in scalar_tap_receipts_invalid"
900+
);
901+
}
902+
Err(e) => {
903+
tracing::error!(
904+
allocation_id = ?state.allocation_id,
905+
error = %e,
906+
"Failed to store invalid V1 receipts"
907+
);
908+
return Err(anyhow::anyhow!(
909+
"Database error storing invalid V1 receipts: {}",
910+
e
911+
));
912+
}
913+
}
914+
}
915+
}
916+
AllocationId::Horizon(_) => {
917+
// Store in tap_horizon_receipts_invalid table for V2 receipts
918+
if !signer_addresses.is_empty() {
919+
let result = sqlx::query!(
920+
r#"
921+
INSERT INTO tap_horizon_receipts_invalid
922+
(signer_address, signature, collection_id, timestamp_ns, nonce, value, error_log)
923+
SELECT * FROM UNNEST($1::text[], $2::bytea[], $3::text[], $4::bigint[], $5::bigint[], $6::bigint[], $7::text[])
924+
"#,
925+
&signer_addresses,
926+
&signatures,
927+
&allocation_ids,
928+
&timestamps,
929+
&nonces,
930+
&values,
931+
&error_logs
932+
)
933+
.execute(&state.pgpool)
934+
.await;
935+
936+
match result {
937+
Ok(_) => {
938+
tracing::debug!(
939+
allocation_id = ?state.allocation_id,
940+
count = invalid_receipts.len(),
941+
"Stored invalid V2 receipts in tap_horizon_receipts_invalid"
942+
);
943+
}
944+
Err(e) => {
945+
tracing::error!(
946+
allocation_id = ?state.allocation_id,
947+
error = %e,
948+
"Failed to store invalid V2 receipts"
949+
);
950+
return Err(anyhow::anyhow!(
951+
"Database error storing invalid V2 receipts: {}",
952+
e
953+
));
954+
}
955+
}
956+
}
957+
}
958+
}
959+
960+
tracing::info!(
961+
allocation_id = ?state.allocation_id,
962+
count = invalid_receipts.len(),
963+
"Stored invalid receipts in database"
964+
);
965+
837966
Ok(())
838967
}
839968

0 commit comments

Comments
 (0)