@@ -23,7 +23,10 @@ use crate::{
23
23
tap:: context:: { NetworkVersion , TapAgentContext } ,
24
24
} ;
25
25
use indexer_receipt:: TapReceipt ;
26
- use tap_core:: { manager:: Manager as TapManager , receipt:: checks:: CheckList } ;
26
+ use tap_core:: {
27
+ manager:: Manager as TapManager ,
28
+ receipt:: { checks:: CheckList , Context , WithValueAndTimestamp } ,
29
+ } ;
27
30
use thegraph_core:: alloy:: primitives:: Address ;
28
31
29
32
/// Message types for SenderAllocationTask - matches original SenderAllocationMessage
@@ -172,7 +175,15 @@ impl ErrorTrackingState {
172
175
}
173
176
}
174
177
175
- impl < T : NetworkVersion > SenderAllocationTask < T > {
178
+ impl < T > SenderAllocationTask < T >
179
+ where
180
+ T : NetworkVersion ,
181
+ TapAgentContext < T > : tap_core:: manager:: adapters:: ReceiptRead < TapReceipt >
182
+ + tap_core:: manager:: adapters:: ReceiptDelete
183
+ + tap_core:: manager:: adapters:: RavRead < T :: Rav >
184
+ + tap_core:: manager:: adapters:: RavStore < T :: Rav >
185
+ + tap_core:: manager:: adapters:: SignatureChecker ,
186
+ {
176
187
/// Spawn a new SenderAllocationTask with minimal arguments (for testing)
177
188
#[ cfg( any( test, feature = "test" ) ) ]
178
189
pub async fn spawn_simple (
@@ -595,77 +606,121 @@ impl<T: NetworkVersion> SenderAllocationTask<T> {
595
606
true // Most receipts have valid signatures
596
607
}
597
608
598
- /// Simulate TAP manager RAV creation workflow
599
- /// TODO: Replace with real TAP manager.create_rav_request() call
600
- async fn create_rav_with_tap_manager_simulation (
601
- state : & TaskState < T > ,
609
+ /// Create RAV using the real TAP manager
610
+ async fn create_rav_with_tap_manager (
611
+ state : & mut TaskState < T > ,
602
612
) -> anyhow:: Result < RavInformation > {
603
613
tracing:: debug!(
604
614
allocation_id = ?state. allocation_id,
605
615
receipt_count = state. unaggregated_fees. counter,
606
616
total_value = state. unaggregated_fees. value,
607
- "TAP manager simulation: Starting RAV creation "
617
+ "Creating RAV request via TAP manager "
608
618
) ;
609
619
610
- // Simulate TAP manager workflow:
611
- // 1. Retrieve receipts from database (simulated)
612
- // 2. Validate receipts (simulated)
613
- // 3. Create aggregation request to sender's aggregator (simulated)
614
- // 4. Handle aggregator response (simulated)
620
+ // Call TAP manager to create RAV request
621
+ // The TAP manager will:
622
+ // 1. Retrieve receipts from database based on timestamp range
623
+ // 2. Validate receipts (signatures, values, etc.)
624
+ // 3. Separate valid and invalid receipts
625
+ // 4. Create an expected RAV from valid receipts
626
+ let rav_request_result = state
627
+ . tap_manager
628
+ . create_rav_request (
629
+ & Context :: new ( ) ,
630
+ 0 , // timestamp_buffer_ns - use 0 for no buffer
631
+ None , // rav_request_receipt_limit - process all receipts
632
+ )
633
+ . await ;
615
634
616
- // Step 1: Simulate database receipt retrieval
617
- tracing:: debug!(
618
- allocation_id = ?state. allocation_id,
619
- "TAP manager simulation: Retrieving receipts from database"
620
- ) ;
621
- // In real implementation: state.tap_manager.retrieve_receipts_in_timestamp_range()
622
- // Database connection available: state.pgpool
635
+ let rav_request = match rav_request_result {
636
+ Ok ( request) => request,
637
+ Err ( e) => {
638
+ tracing:: error!(
639
+ allocation_id = ?state. allocation_id,
640
+ error = %e,
641
+ "TAP manager failed to create RAV request"
642
+ ) ;
643
+ return Err ( anyhow:: anyhow!( "TAP manager error: {}" , e) ) ;
644
+ }
645
+ } ;
623
646
624
- // Step 2: Simulate receipt validation
625
- tracing:: debug!(
626
- allocation_id = ?state. allocation_id,
627
- "TAP manager simulation: Validating receipts"
628
- ) ;
629
- // In real implementation: TAP manager handles signature verification automatically
630
- // Uses sender address (state.sender) and indexer address (state.indexer_address) for validation
647
+ // Check if we have an expected RAV
648
+ let expected_rav = match rav_request. expected_rav {
649
+ Ok ( rav) => rav,
650
+ Err ( e) => {
651
+ tracing:: debug!(
652
+ allocation_id = ?state. allocation_id,
653
+ valid_receipts = rav_request. valid_receipts. len( ) ,
654
+ invalid_receipts = rav_request. invalid_receipts. len( ) ,
655
+ error = %e,
656
+ "RAV aggregation failed"
657
+ ) ;
631
658
632
- // Step 3: Simulate aggregator communication
633
- tracing:: debug!(
634
- allocation_id = ?state. allocation_id,
635
- "TAP manager simulation: Sending aggregation request to aggregator"
636
- ) ;
659
+ // Store invalid receipts if any
660
+ if !rav_request. invalid_receipts . is_empty ( ) {
661
+ Self :: store_invalid_receipts ( state, rav_request. invalid_receipts ) . await ?;
662
+ }
637
663
638
- // Simulate potential aggregator communication failure (5% chance)
639
- if state. unaggregated_fees . counter % 20 == 0 {
640
- return Err ( anyhow:: anyhow!(
641
- "Simulated aggregator communication timeout"
642
- ) ) ;
664
+ return Err ( anyhow:: anyhow!( "RAV aggregation failed: {}" , e) ) ;
665
+ }
666
+ } ;
667
+
668
+ // Store invalid receipts if any
669
+ let invalid_receipts_count = rav_request. invalid_receipts . len ( ) ;
670
+ if !rav_request. invalid_receipts . is_empty ( ) {
671
+ Self :: store_invalid_receipts ( state, rav_request. invalid_receipts ) . await ?;
643
672
}
644
673
645
- // Step 4: Simulate successful RAV creation
674
+ tracing:: info!(
675
+ allocation_id = ?state. allocation_id,
676
+ valid_receipts = rav_request. valid_receipts. len( ) ,
677
+ invalid_receipts = invalid_receipts_count,
678
+ rav_value = expected_rav. value( ) ,
679
+ "TAP manager created RAV request"
680
+ ) ;
681
+
682
+ // TODO: Send to aggregator for signing
683
+ // For now, we'll return the unsigned RAV info
646
684
let rav_info = RavInformation {
647
685
allocation_id : state. tap_allocation_id ,
648
- value_aggregate : state . unaggregated_fees . value ,
686
+ value_aggregate : expected_rav . value ( ) ,
649
687
} ;
650
688
651
- tracing:: debug!(
652
- allocation_id = ?state. allocation_id,
653
- rav_value = rav_info. value_aggregate,
654
- "TAP manager simulation: RAV creation successful"
655
- ) ;
689
+ // TODO: Once we have the signed RAV from aggregator:
690
+ // state.tap_manager.verify_and_store_rav(expected_rav, signed_rav).await?;
656
691
657
- // Simulate database operations for RAV storage
658
- tracing:: debug!(
659
- allocation_id = ?state. allocation_id,
660
- "TAP manager simulation: Storing RAV and cleaning up processed receipts"
661
- ) ;
662
- // In real implementation:
663
- // - state.tap_manager.update_last_rav() - stores RAV in database via state.pgpool
664
- // - state.tap_manager.remove_obsolete_receipts() - cleans up processed receipts
692
+ // Clean up processed receipts
693
+ state
694
+ . tap_manager
695
+ . remove_obsolete_receipts ( )
696
+ . await
697
+ . map_err ( |e| anyhow:: anyhow!( "Failed to remove obsolete receipts: {}" , e) ) ?;
665
698
666
699
Ok ( rav_info)
667
700
}
668
701
702
+ /// Store invalid receipts for tracking
703
+ async fn store_invalid_receipts (
704
+ state : & TaskState < T > ,
705
+ invalid_receipts : Vec <
706
+ tap_core:: receipt:: ReceiptWithState < tap_core:: receipt:: state:: Failed , TapReceipt > ,
707
+ > ,
708
+ ) -> anyhow:: Result < ( ) > {
709
+ // For now, just log the invalid receipts
710
+ // TODO: Store in database table for tracking
711
+ for receipt_with_state in invalid_receipts {
712
+ // Access the receipt through the correct field name
713
+ let receipt = receipt_with_state. signed_receipt ( ) ;
714
+ tracing:: warn!(
715
+ allocation_id = ?state. allocation_id,
716
+ receipt_value = receipt. value( ) ,
717
+ receipt_timestamp = receipt. timestamp_ns( ) ,
718
+ "Invalid receipt detected"
719
+ ) ;
720
+ }
721
+ Ok ( ( ) )
722
+ }
723
+
669
724
/// Handle RAV request - with real TAP manager integration
670
725
async fn handle_rav_request ( state : & mut TaskState < T > ) -> anyhow:: Result < ( ) > {
671
726
let start_time = Instant :: now ( ) ;
@@ -686,8 +741,8 @@ impl<T: NetworkVersion> SenderAllocationTask<T> {
686
741
"Creating RAV for aggregated receipts"
687
742
) ;
688
743
689
- // Enhanced TAP manager simulation - mimics real TAP workflow
690
- let rav_result = Self :: create_rav_with_tap_manager_simulation ( state) . await ;
744
+ // Use real TAP manager to create RAV
745
+ let rav_result = Self :: create_rav_with_tap_manager ( state) . await ;
691
746
692
747
let rav_info = match rav_result {
693
748
Ok ( rav) => rav,
0 commit comments