Skip to content

Commit 1c753ef

Browse files
committed
refactor: implement get_pending_sender_allocation_id_v2
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent e5c9c04 commit 1c753ef

File tree

5 files changed

+162
-7
lines changed

5 files changed

+162
-7
lines changed

.sqlx/query-7124365390787371768fdb52259f8a9fd7f1a549ddd6410fa4359cacbea2ce6e.json

Lines changed: 26 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-7d2c582d3f92ccf179847bd01de43a328bea75b556032bfea57eb61b30e51f95.json

Lines changed: 26 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 104 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -619,7 +619,104 @@ impl State {
619619
///
620620
/// This loads horizon allocations
621621
async fn get_pending_sender_allocation_id_v2(&self) -> HashMap<Address, HashSet<AllocationId>> {
622-
unimplemented!()
622+
// First we accumulate all allocations for each sender. This is because we may have more
623+
// than one signer per sender in DB.
624+
let mut unfinalized_sender_allocations_map: HashMap<Address, HashSet<AllocationId>> =
625+
HashMap::new();
626+
627+
let receipts_signer_allocations_in_db = sqlx::query!(
628+
r#"
629+
WITH grouped AS (
630+
SELECT signer_address, allocation_id
631+
FROM tap_horizon_receipts
632+
GROUP BY signer_address, allocation_id
633+
)
634+
SELECT DISTINCT
635+
signer_address,
636+
(
637+
SELECT ARRAY
638+
(
639+
SELECT DISTINCT allocation_id
640+
FROM grouped
641+
WHERE signer_address = top.signer_address
642+
)
643+
) AS allocation_ids
644+
FROM grouped AS top
645+
"#
646+
)
647+
.fetch_all(&self.pgpool)
648+
.await
649+
.expect("should be able to fetch pending receipts from the database");
650+
651+
for row in receipts_signer_allocations_in_db {
652+
let allocation_ids = row
653+
.allocation_ids
654+
.expect("all receipts should have an allocation_id")
655+
.iter()
656+
.map(|allocation_id| {
657+
AllocationId::Legacy(
658+
Address::from_str(allocation_id)
659+
.expect("allocation_id should be a valid address"),
660+
)
661+
})
662+
.collect::<HashSet<_>>();
663+
let signer_id = Address::from_str(&row.signer_address)
664+
.expect("signer_address should be a valid address");
665+
let sender_id = self
666+
.escrow_accounts_v1
667+
.borrow()
668+
.get_sender_for_signer(&signer_id)
669+
.expect("should be able to get sender from signer");
670+
671+
// Accumulate allocations for the sender
672+
unfinalized_sender_allocations_map
673+
.entry(sender_id)
674+
.or_default()
675+
.extend(allocation_ids);
676+
}
677+
678+
let nonfinal_ravs_sender_allocations_in_db = sqlx::query!(
679+
r#"
680+
SELECT DISTINCT
681+
sender_address,
682+
(
683+
SELECT ARRAY
684+
(
685+
SELECT DISTINCT allocation_id
686+
FROM tap_horizon_ravs
687+
WHERE sender_address = top.sender_address
688+
AND NOT last
689+
)
690+
) AS allocation_id
691+
FROM scalar_tap_ravs AS top
692+
"#
693+
)
694+
.fetch_all(&self.pgpool)
695+
.await
696+
.expect("should be able to fetch unfinalized RAVs from the database");
697+
698+
for row in nonfinal_ravs_sender_allocations_in_db {
699+
let allocation_ids = row
700+
.allocation_id
701+
.expect("all RAVs should have an allocation_id")
702+
.iter()
703+
.map(|allocation_id| {
704+
AllocationId::Legacy(
705+
Address::from_str(allocation_id)
706+
.expect("allocation_id should be a valid address"),
707+
)
708+
})
709+
.collect::<HashSet<_>>();
710+
let sender_id = Address::from_str(&row.sender_address)
711+
.expect("sender_address should be a valid address");
712+
713+
// Accumulate allocations for the sender
714+
unfinalized_sender_allocations_map
715+
.entry(sender_id)
716+
.or_default()
717+
.extend(allocation_ids);
718+
}
719+
unfinalized_sender_allocations_map
623720
}
624721

625722
/// Helper function to create [SenderAccountArgs]
@@ -922,8 +1019,11 @@ mod tests {
9221019
flush_messages(&notify).await;
9231020

9241021
// verify if create sender account
925-
let actor_ref =
926-
ActorRef::<SenderAccountMessage>::where_is(format!("{}:{}", prefix.clone(), SENDER.1));
1022+
let actor_ref = ActorRef::<SenderAccountMessage>::where_is(format!(
1023+
"{}:legacy:{}",
1024+
prefix.clone(),
1025+
SENDER.1
1026+
));
9271027
assert!(actor_ref.is_some());
9281028

9291029
actor
@@ -959,7 +1059,7 @@ mod tests {
9591059
.unwrap();
9601060

9611061
let actor_ref =
962-
ActorRef::<SenderAccountMessage>::where_is(format!("{}:{}", prefix, SENDER_2.1));
1062+
ActorRef::<SenderAccountMessage>::where_is(format!("{}:legacy:{}", prefix, SENDER_2.1));
9631063
assert!(actor_ref.is_some());
9641064
}
9651065

crates/tap-agent/tests/sender_account_manager_test.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,11 @@ async fn sender_account_manager_layer_test(pgpool: PgPool) {
7474
flush_messages(&notify).await;
7575

7676
// verify if create sender account
77-
let sender_account_ref =
78-
ActorRef::<SenderAccountMessage>::where_is(format!("{}:{}", prefix.clone(), SENDER.1));
77+
let sender_account_ref = ActorRef::<SenderAccountMessage>::where_is(format!(
78+
"{}:legacy:{}",
79+
prefix.clone(),
80+
SENDER.1
81+
));
7982
assert!(sender_account_ref.is_some());
8083

8184
let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, 1, 1, TRIGGER_VALUE - 10);

crates/tap-agent/tests/tap_agent_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ async fn test_start_tap_agent(pgpool: PgPool) {
119119
flush_messages(&notify).await;
120120

121121
// verify if create sender account
122-
let actor_ref = ActorRef::<SenderAccountMessage>::where_is(format!("{}", TAP_SENDER.1));
122+
let actor_ref = ActorRef::<SenderAccountMessage>::where_is(format!("legacy:{}", TAP_SENDER.1));
123123

124124
assert!(actor_ref.is_some());
125125

0 commit comments

Comments
 (0)