Skip to content

Commit 033cdaa

Browse files
committed
refactor: implement get_pending_sender_allocation_id_v2
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 75d1b7d commit 033cdaa

File tree

5 files changed

+162
-7
lines changed

5 files changed

+162
-7
lines changed

.sqlx/query-7124365390787371768fdb52259f8a9fd7f1a549ddd6410fa4359cacbea2ce6e.json

Lines changed: 26 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-7d2c582d3f92ccf179847bd01de43a328bea75b556032bfea57eb61b30e51f95.json

Lines changed: 26 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 104 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,104 @@ impl State {
606606
///
607607
/// This loads horizon allocations
608608
async fn get_pending_sender_allocation_id_v2(&self) -> HashMap<Address, HashSet<AllocationId>> {
609-
unimplemented!()
609+
// First we accumulate all allocations for each sender. This is because we may have more
610+
// than one signer per sender in DB.
611+
let mut unfinalized_sender_allocations_map: HashMap<Address, HashSet<AllocationId>> =
612+
HashMap::new();
613+
614+
let receipts_signer_allocations_in_db = sqlx::query!(
615+
r#"
616+
WITH grouped AS (
617+
SELECT signer_address, allocation_id
618+
FROM tap_horizon_receipts
619+
GROUP BY signer_address, allocation_id
620+
)
621+
SELECT DISTINCT
622+
signer_address,
623+
(
624+
SELECT ARRAY
625+
(
626+
SELECT DISTINCT allocation_id
627+
FROM grouped
628+
WHERE signer_address = top.signer_address
629+
)
630+
) AS allocation_ids
631+
FROM grouped AS top
632+
"#
633+
)
634+
.fetch_all(&self.pgpool)
635+
.await
636+
.expect("should be able to fetch pending receipts from the database");
637+
638+
for row in receipts_signer_allocations_in_db {
639+
let allocation_ids = row
640+
.allocation_ids
641+
.expect("all receipts should have an allocation_id")
642+
.iter()
643+
.map(|allocation_id| {
644+
AllocationId::Legacy(
645+
Address::from_str(allocation_id)
646+
.expect("allocation_id should be a valid address"),
647+
)
648+
})
649+
.collect::<HashSet<_>>();
650+
let signer_id = Address::from_str(&row.signer_address)
651+
.expect("signer_address should be a valid address");
652+
let sender_id = self
653+
.escrow_accounts_v1
654+
.borrow()
655+
.get_sender_for_signer(&signer_id)
656+
.expect("should be able to get sender from signer");
657+
658+
// Accumulate allocations for the sender
659+
unfinalized_sender_allocations_map
660+
.entry(sender_id)
661+
.or_default()
662+
.extend(allocation_ids);
663+
}
664+
665+
let nonfinal_ravs_sender_allocations_in_db = sqlx::query!(
666+
r#"
667+
SELECT DISTINCT
668+
sender_address,
669+
(
670+
SELECT ARRAY
671+
(
672+
SELECT DISTINCT allocation_id
673+
FROM tap_horizon_ravs
674+
WHERE sender_address = top.sender_address
675+
AND NOT last
676+
)
677+
) AS allocation_id
678+
FROM scalar_tap_ravs AS top
679+
"#
680+
)
681+
.fetch_all(&self.pgpool)
682+
.await
683+
.expect("should be able to fetch unfinalized RAVs from the database");
684+
685+
for row in nonfinal_ravs_sender_allocations_in_db {
686+
let allocation_ids = row
687+
.allocation_id
688+
.expect("all RAVs should have an allocation_id")
689+
.iter()
690+
.map(|allocation_id| {
691+
AllocationId::Legacy(
692+
Address::from_str(allocation_id)
693+
.expect("allocation_id should be a valid address"),
694+
)
695+
})
696+
.collect::<HashSet<_>>();
697+
let sender_id = Address::from_str(&row.sender_address)
698+
.expect("sender_address should be a valid address");
699+
700+
// Accumulate allocations for the sender
701+
unfinalized_sender_allocations_map
702+
.entry(sender_id)
703+
.or_default()
704+
.extend(allocation_ids);
705+
}
706+
unfinalized_sender_allocations_map
610707
}
611708

612709
/// Helper function to create [SenderAccountArgs]
@@ -909,8 +1006,11 @@ mod tests {
9091006
flush_messages(&notify).await;
9101007

9111008
// verify if create sender account
912-
let actor_ref =
913-
ActorRef::<SenderAccountMessage>::where_is(format!("{}:{}", prefix.clone(), SENDER.1));
1009+
let actor_ref = ActorRef::<SenderAccountMessage>::where_is(format!(
1010+
"{}:legacy:{}",
1011+
prefix.clone(),
1012+
SENDER.1
1013+
));
9141014
assert!(actor_ref.is_some());
9151015

9161016
actor
@@ -946,7 +1046,7 @@ mod tests {
9461046
.unwrap();
9471047

9481048
let actor_ref =
949-
ActorRef::<SenderAccountMessage>::where_is(format!("{}:{}", prefix, SENDER_2.1));
1049+
ActorRef::<SenderAccountMessage>::where_is(format!("{}:legacy:{}", prefix, SENDER_2.1));
9501050
assert!(actor_ref.is_some());
9511051
}
9521052

crates/tap-agent/tests/sender_account_manager_test.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,11 @@ async fn sender_account_manager_layer_test(pgpool: PgPool) {
7474
flush_messages(&notify).await;
7575

7676
// verify if create sender account
77-
let sender_account_ref =
78-
ActorRef::<SenderAccountMessage>::where_is(format!("{}:{}", prefix.clone(), SENDER.1));
77+
let sender_account_ref = ActorRef::<SenderAccountMessage>::where_is(format!(
78+
"{}:legacy:{}",
79+
prefix.clone(),
80+
SENDER.1
81+
));
7982
assert!(sender_account_ref.is_some());
8083

8184
let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, 1, 1, TRIGGER_VALUE - 10);

crates/tap-agent/tests/tap_agent_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ async fn test_start_tap_agent(pgpool: PgPool) {
119119
flush_messages(&notify).await;
120120

121121
// verify if create sender account
122-
let actor_ref = ActorRef::<SenderAccountMessage>::where_is(format!("{}", TAP_SENDER.1));
122+
let actor_ref = ActorRef::<SenderAccountMessage>::where_is(format!("legacy:{}", TAP_SENDER.1));
123123

124124
assert!(actor_ref.is_some());
125125

0 commit comments

Comments
 (0)