Skip to content

Commit 7d06dd7

Browse files
committed
feat(tap-agent): add rav integration in sender allocation task
1 parent 5c73998 commit 7d06dd7

File tree

1 file changed

+108
-53
lines changed

1 file changed

+108
-53
lines changed

crates/tap-agent/src/agent/sender_allocation_task.rs

Lines changed: 108 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ use crate::{
2323
tap::context::{NetworkVersion, TapAgentContext},
2424
};
2525
use indexer_receipt::TapReceipt;
26-
use tap_core::{manager::Manager as TapManager, receipt::checks::CheckList};
26+
use tap_core::{
27+
manager::Manager as TapManager,
28+
receipt::{checks::CheckList, Context, WithValueAndTimestamp},
29+
};
2730
use thegraph_core::alloy::primitives::Address;
2831

2932
/// Message types for SenderAllocationTask - matches original SenderAllocationMessage
@@ -172,7 +175,15 @@ impl ErrorTrackingState {
172175
}
173176
}
174177

175-
impl<T: NetworkVersion> SenderAllocationTask<T> {
178+
impl<T> SenderAllocationTask<T>
179+
where
180+
T: NetworkVersion,
181+
TapAgentContext<T>: tap_core::manager::adapters::ReceiptRead<TapReceipt>
182+
+ tap_core::manager::adapters::ReceiptDelete
183+
+ tap_core::manager::adapters::RavRead<T::Rav>
184+
+ tap_core::manager::adapters::RavStore<T::Rav>
185+
+ tap_core::manager::adapters::SignatureChecker,
186+
{
176187
/// Spawn a new SenderAllocationTask with minimal arguments (for testing)
177188
#[cfg(any(test, feature = "test"))]
178189
pub async fn spawn_simple(
@@ -595,77 +606,121 @@ impl<T: NetworkVersion> SenderAllocationTask<T> {
595606
true // Most receipts have valid signatures
596607
}
597608

598-
/// Simulate TAP manager RAV creation workflow
599-
/// TODO: Replace with real TAP manager.create_rav_request() call
600-
async fn create_rav_with_tap_manager_simulation(
601-
state: &TaskState<T>,
609+
/// Create RAV using the real TAP manager
610+
async fn create_rav_with_tap_manager(
611+
state: &mut TaskState<T>,
602612
) -> anyhow::Result<RavInformation> {
603613
tracing::debug!(
604614
allocation_id = ?state.allocation_id,
605615
receipt_count = state.unaggregated_fees.counter,
606616
total_value = state.unaggregated_fees.value,
607-
"TAP manager simulation: Starting RAV creation"
617+
"Creating RAV request via TAP manager"
608618
);
609619

610-
// Simulate TAP manager workflow:
611-
// 1. Retrieve receipts from database (simulated)
612-
// 2. Validate receipts (simulated)
613-
// 3. Create aggregation request to sender's aggregator (simulated)
614-
// 4. Handle aggregator response (simulated)
620+
// Call TAP manager to create RAV request
621+
// The TAP manager will:
622+
// 1. Retrieve receipts from database based on timestamp range
623+
// 2. Validate receipts (signatures, values, etc.)
624+
// 3. Separate valid and invalid receipts
625+
// 4. Create an expected RAV from valid receipts
626+
let rav_request_result = state
627+
.tap_manager
628+
.create_rav_request(
629+
&Context::new(),
630+
0, // timestamp_buffer_ns - use 0 for no buffer
631+
None, // rav_request_receipt_limit - process all receipts
632+
)
633+
.await;
615634

616-
// Step 1: Simulate database receipt retrieval
617-
tracing::debug!(
618-
allocation_id = ?state.allocation_id,
619-
"TAP manager simulation: Retrieving receipts from database"
620-
);
621-
// In real implementation: state.tap_manager.retrieve_receipts_in_timestamp_range()
622-
// Database connection available: state.pgpool
635+
let rav_request = match rav_request_result {
636+
Ok(request) => request,
637+
Err(e) => {
638+
tracing::error!(
639+
allocation_id = ?state.allocation_id,
640+
error = %e,
641+
"TAP manager failed to create RAV request"
642+
);
643+
return Err(anyhow::anyhow!("TAP manager error: {}", e));
644+
}
645+
};
623646

624-
// Step 2: Simulate receipt validation
625-
tracing::debug!(
626-
allocation_id = ?state.allocation_id,
627-
"TAP manager simulation: Validating receipts"
628-
);
629-
// In real implementation: TAP manager handles signature verification automatically
630-
// Uses sender address (state.sender) and indexer address (state.indexer_address) for validation
647+
// Check if we have an expected RAV
648+
let expected_rav = match rav_request.expected_rav {
649+
Ok(rav) => rav,
650+
Err(e) => {
651+
tracing::debug!(
652+
allocation_id = ?state.allocation_id,
653+
valid_receipts = rav_request.valid_receipts.len(),
654+
invalid_receipts = rav_request.invalid_receipts.len(),
655+
error = %e,
656+
"RAV aggregation failed"
657+
);
631658

632-
// Step 3: Simulate aggregator communication
633-
tracing::debug!(
634-
allocation_id = ?state.allocation_id,
635-
"TAP manager simulation: Sending aggregation request to aggregator"
636-
);
659+
// Store invalid receipts if any
660+
if !rav_request.invalid_receipts.is_empty() {
661+
Self::store_invalid_receipts(state, rav_request.invalid_receipts).await?;
662+
}
637663

638-
// Simulate potential aggregator communication failure (5% chance)
639-
if state.unaggregated_fees.counter % 20 == 0 {
640-
return Err(anyhow::anyhow!(
641-
"Simulated aggregator communication timeout"
642-
));
664+
return Err(anyhow::anyhow!("RAV aggregation failed: {}", e));
665+
}
666+
};
667+
668+
// Store invalid receipts if any
669+
let invalid_receipts_count = rav_request.invalid_receipts.len();
670+
if !rav_request.invalid_receipts.is_empty() {
671+
Self::store_invalid_receipts(state, rav_request.invalid_receipts).await?;
643672
}
644673

645-
// Step 4: Simulate successful RAV creation
674+
tracing::info!(
675+
allocation_id = ?state.allocation_id,
676+
valid_receipts = rav_request.valid_receipts.len(),
677+
invalid_receipts = invalid_receipts_count,
678+
rav_value = expected_rav.value(),
679+
"TAP manager created RAV request"
680+
);
681+
682+
// TODO: Send to aggregator for signing
683+
// For now, we'll return the unsigned RAV info
646684
let rav_info = RavInformation {
647685
allocation_id: state.tap_allocation_id,
648-
value_aggregate: state.unaggregated_fees.value,
686+
value_aggregate: expected_rav.value(),
649687
};
650688

651-
tracing::debug!(
652-
allocation_id = ?state.allocation_id,
653-
rav_value = rav_info.value_aggregate,
654-
"TAP manager simulation: RAV creation successful"
655-
);
689+
// TODO: Once we have the signed RAV from aggregator:
690+
// state.tap_manager.verify_and_store_rav(expected_rav, signed_rav).await?;
656691

657-
// Simulate database operations for RAV storage
658-
tracing::debug!(
659-
allocation_id = ?state.allocation_id,
660-
"TAP manager simulation: Storing RAV and cleaning up processed receipts"
661-
);
662-
// In real implementation:
663-
// - state.tap_manager.update_last_rav() - stores RAV in database via state.pgpool
664-
// - state.tap_manager.remove_obsolete_receipts() - cleans up processed receipts
692+
// Clean up processed receipts
693+
state
694+
.tap_manager
695+
.remove_obsolete_receipts()
696+
.await
697+
.map_err(|e| anyhow::anyhow!("Failed to remove obsolete receipts: {}", e))?;
665698

666699
Ok(rav_info)
667700
}
668701

702+
/// Store invalid receipts for tracking
703+
async fn store_invalid_receipts(
704+
state: &TaskState<T>,
705+
invalid_receipts: Vec<
706+
tap_core::receipt::ReceiptWithState<tap_core::receipt::state::Failed, TapReceipt>,
707+
>,
708+
) -> anyhow::Result<()> {
709+
// For now, just log the invalid receipts
710+
// TODO: Store in database table for tracking
711+
for receipt_with_state in invalid_receipts {
712+
// Access the receipt through the correct field name
713+
let receipt = receipt_with_state.signed_receipt();
714+
tracing::warn!(
715+
allocation_id = ?state.allocation_id,
716+
receipt_value = receipt.value(),
717+
receipt_timestamp = receipt.timestamp_ns(),
718+
"Invalid receipt detected"
719+
);
720+
}
721+
Ok(())
722+
}
723+
669724
/// Handle RAV request - with real TAP manager integration
670725
async fn handle_rav_request(state: &mut TaskState<T>) -> anyhow::Result<()> {
671726
let start_time = Instant::now();
@@ -686,8 +741,8 @@ impl<T: NetworkVersion> SenderAllocationTask<T> {
686741
"Creating RAV for aggregated receipts"
687742
);
688743

689-
// Enhanced TAP manager simulation - mimics real TAP workflow
690-
let rav_result = Self::create_rav_with_tap_manager_simulation(state).await;
744+
// Use real TAP manager to create RAV
745+
let rav_result = Self::create_rav_with_tap_manager(state).await;
691746

692747
let rav_info = match rav_result {
693748
Ok(rav) => rav,

0 commit comments

Comments
 (0)