@@ -619,7 +619,104 @@ impl State {
619619 ///
620620 /// This loads horizon allocations
621621 async fn get_pending_sender_allocation_id_v2 ( & self ) -> HashMap < Address , HashSet < AllocationId > > {
622- unimplemented ! ( )
622+ // First we accumulate all allocations for each sender. This is because we may have more
623+ // than one signer per sender in DB.
624+ let mut unfinalized_sender_allocations_map: HashMap < Address , HashSet < AllocationId > > =
625+ HashMap :: new ( ) ;
626+
627+ let receipts_signer_allocations_in_db = sqlx:: query!(
628+ r#"
629+ WITH grouped AS (
630+ SELECT signer_address, allocation_id
631+ FROM tap_horizon_receipts
632+ GROUP BY signer_address, allocation_id
633+ )
634+ SELECT DISTINCT
635+ signer_address,
636+ (
637+ SELECT ARRAY
638+ (
639+ SELECT DISTINCT allocation_id
640+ FROM grouped
641+ WHERE signer_address = top.signer_address
642+ )
643+ ) AS allocation_ids
644+ FROM grouped AS top
645+ "#
646+ )
647+ . fetch_all ( & self . pgpool )
648+ . await
649+ . expect ( "should be able to fetch pending receipts from the database" ) ;
650+
651+ for row in receipts_signer_allocations_in_db {
652+ let allocation_ids = row
653+ . allocation_ids
654+ . expect ( "all receipts should have an allocation_id" )
655+ . iter ( )
656+ . map ( |allocation_id| {
657+ AllocationId :: Legacy (
658+ Address :: from_str ( allocation_id)
659+ . expect ( "allocation_id should be a valid address" ) ,
660+ )
661+ } )
662+ . collect :: < HashSet < _ > > ( ) ;
663+ let signer_id = Address :: from_str ( & row. signer_address )
664+ . expect ( "signer_address should be a valid address" ) ;
665+ let sender_id = self
666+ . escrow_accounts_v1
667+ . borrow ( )
668+ . get_sender_for_signer ( & signer_id)
669+ . expect ( "should be able to get sender from signer" ) ;
670+
671+ // Accumulate allocations for the sender
672+ unfinalized_sender_allocations_map
673+ . entry ( sender_id)
674+ . or_default ( )
675+ . extend ( allocation_ids) ;
676+ }
677+
678+ let nonfinal_ravs_sender_allocations_in_db = sqlx:: query!(
679+ r#"
680+ SELECT DISTINCT
681+ sender_address,
682+ (
683+ SELECT ARRAY
684+ (
685+ SELECT DISTINCT allocation_id
686+ FROM tap_horizon_ravs
687+ WHERE sender_address = top.sender_address
688+ AND NOT last
689+ )
690+ ) AS allocation_id
691+ FROM scalar_tap_ravs AS top
692+ "#
693+ )
694+ . fetch_all ( & self . pgpool )
695+ . await
696+ . expect ( "should be able to fetch unfinalized RAVs from the database" ) ;
697+
698+ for row in nonfinal_ravs_sender_allocations_in_db {
699+ let allocation_ids = row
700+ . allocation_id
701+ . expect ( "all RAVs should have an allocation_id" )
702+ . iter ( )
703+ . map ( |allocation_id| {
704+ AllocationId :: Legacy (
705+ Address :: from_str ( allocation_id)
706+ . expect ( "allocation_id should be a valid address" ) ,
707+ )
708+ } )
709+ . collect :: < HashSet < _ > > ( ) ;
710+ let sender_id = Address :: from_str ( & row. sender_address )
711+ . expect ( "sender_address should be a valid address" ) ;
712+
713+ // Accumulate allocations for the sender
714+ unfinalized_sender_allocations_map
715+ . entry ( sender_id)
716+ . or_default ( )
717+ . extend ( allocation_ids) ;
718+ }
719+ unfinalized_sender_allocations_map
623720 }
624721
625722 /// Helper function to create [SenderAccountArgs]
0 commit comments