Skip to content

Commit 3e4779a

Browse files
committed
refactor: pass sender_type to sender_account
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent f315a72 commit 3e4779a

File tree

2 files changed

+129
-60
lines changed

2 files changed

+129
-60
lines changed

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 106 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use tonic::transport::{Channel, Endpoint};
3535
use tracing::Level;
3636

3737
use super::{
38-
sender_accounts_manager::AllocationId,
38+
sender_accounts_manager::{AllocationId, SenderType},
3939
sender_allocation::{
4040
AllocationConfig, SenderAllocation, SenderAllocationArgs, SenderAllocationMessage,
4141
},
@@ -233,6 +233,9 @@ pub struct SenderAccountArgs {
233233

234234
/// Configuration for retry scheduler in case sender is denied
235235
pub retry_interval: Duration,
236+
237+
/// Sender type, used to decide which set of tables to use
238+
pub sender_type: SenderType,
236239
}
237240

238241
/// State used by the actor
@@ -321,6 +324,9 @@ pub struct State {
321324
/// limited to `max_amount_willing_to_lose_grt`
322325
trusted_sender: bool,
323326

327+
/// Sender type, used to decide which set of tables to use
328+
sender_type: SenderType,
329+
324330
// Config forwarded to [SenderAllocation]
325331
config: &'static SenderAccountConfig,
326332
}
@@ -574,7 +580,7 @@ impl State {
574580
"Denying sender."
575581
);
576582

577-
SenderAccount::deny_sender(&self.pgpool, self.sender).await;
583+
SenderAccount::deny_sender(self.sender_type, &self.pgpool, self.sender).await;
578584
self.denied = true;
579585
SENDER_DENIED
580586
.with_label_values(&[&self.sender.to_string()])
@@ -590,16 +596,21 @@ impl State {
590596
sender_balance = self.sender_balance.to_u128(),
591597
"Allowing sender."
592598
);
593-
sqlx::query!(
594-
r#"
599+
match self.sender_type {
600+
SenderType::Legacy => {
601+
sqlx::query!(
602+
r#"
595603
DELETE FROM scalar_tap_denylist
596604
WHERE sender_address = $1
597605
"#,
598-
self.sender.encode_hex(),
599-
)
600-
.execute(&self.pgpool)
601-
.await
602-
.expect("Should not fail to delete from denylist");
606+
self.sender.encode_hex(),
607+
)
608+
.execute(&self.pgpool)
609+
.await
610+
.expect("Should not fail to delete from denylist");
611+
}
612+
SenderType::Horizon => unimplemented!(),
613+
}
603614
self.denied = false;
604615

605616
SENDER_DENIED
@@ -688,6 +699,7 @@ impl Actor for SenderAccount {
688699
allocation_ids,
689700
prefix,
690701
retry_interval,
702+
sender_type,
691703
}: Self::Arguments,
692704
) -> Result<Self::State, ActorProcessingErr> {
693705
let myself_clone = myself.clone();
@@ -714,39 +726,58 @@ impl Actor for SenderAccount {
714726
.get_balance_for_sender(&sender_id)
715727
.unwrap_or_default();
716728
async move {
717-
let last_non_final_ravs = sqlx::query!(
718-
r#"
719-
SELECT allocation_id, value_aggregate
720-
FROM scalar_tap_ravs
721-
WHERE sender_address = $1 AND last AND NOT final;
722-
"#,
723-
sender_id.encode_hex(),
724-
)
725-
.fetch_all(&pgpool)
726-
.await
727-
.expect("Should not fail to fetch from scalar_tap_ravs");
729+
let last_non_final_ravs = match sender_type {
730+
// Get all ravs from v1 table
731+
SenderType::Legacy => sqlx::query!(
732+
r#"
733+
SELECT allocation_id, value_aggregate
734+
FROM scalar_tap_ravs
735+
WHERE sender_address = $1 AND last AND NOT final;
736+
"#,
737+
sender_id.encode_hex(),
738+
)
739+
.fetch_all(&pgpool)
740+
.await
741+
.expect("Should not fail to fetch from scalar_tap_ravs"),
742+
// Get all ravs from v2 table
743+
SenderType::Horizon => {
744+
unimplemented!()
745+
}
746+
};
728747

729748
// get a list from the subgraph of which subgraphs were already redeemed and were not marked as final
730-
let redeemed_ravs_allocation_ids = match escrow_subgraph
731-
.query::<UnfinalizedTransactions, _>(unfinalized_transactions::Variables {
732-
unfinalized_ravs_allocation_ids: last_non_final_ravs
733-
.iter()
734-
.map(|rav| rav.allocation_id.to_string())
735-
.collect::<Vec<_>>(),
736-
sender: format!("{:x?}", sender_id),
737-
})
738-
.await
739-
{
740-
Ok(Ok(response)) => response
741-
.transactions
742-
.into_iter()
743-
.map(|tx| {
744-
tx.allocation_id
745-
.expect("all redeem tx must have allocation_id")
746-
})
747-
.collect::<Vec<_>>(),
748-
// if we have any problems, we don't want to filter out
749-
_ => vec![],
749+
let redeemed_ravs_allocation_ids = match sender_type {
750+
SenderType::Legacy => {
751+
// This query returns unfinalized transactions for v1
752+
match escrow_subgraph
753+
.query::<UnfinalizedTransactions, _>(
754+
unfinalized_transactions::Variables {
755+
unfinalized_ravs_allocation_ids: last_non_final_ravs
756+
.iter()
757+
.map(|rav| rav.allocation_id.to_string())
758+
.collect::<Vec<_>>(),
759+
sender: format!("{:x?}", sender_id),
760+
},
761+
)
762+
.await
763+
{
764+
Ok(Ok(response)) => response
765+
.transactions
766+
.into_iter()
767+
.map(|tx| {
768+
tx.allocation_id
769+
.expect("all redeem tx must have allocation_id")
770+
})
771+
.collect::<Vec<_>>(),
772+
// if we have any problems, we don't want to filter out
773+
_ => vec![],
774+
}
775+
}
776+
// TODO Implement query for unfinalized v2 transactions
777+
// Depends on Escrow Subgraph Schema
778+
SenderType::Horizon => {
779+
todo!()
780+
}
750781
};
751782

752783
// filter the ravs marked as last that were not redeemed yet
@@ -779,21 +810,25 @@ impl Actor for SenderAccount {
779810
}
780811
});
781812

782-
// Get deny status from the scalar_tap_denylist table
783-
let denied = sqlx::query!(
784-
r#"
813+
let denied = match sender_type {
814+
// Get deny status from the scalar_tap_denylist table
815+
SenderType::Legacy => sqlx::query!(
816+
r#"
785817
SELECT EXISTS (
786818
SELECT 1
787819
FROM scalar_tap_denylist
788820
WHERE sender_address = $1
789821
) as denied
790822
"#,
791-
sender_id.encode_hex(),
792-
)
793-
.fetch_one(&pgpool)
794-
.await?
795-
.denied
796-
.expect("Deny status cannot be null");
823+
sender_id.encode_hex(),
824+
)
825+
.fetch_one(&pgpool)
826+
.await?
827+
.denied
828+
.expect("Deny status cannot be null"),
829+
// Get deny status from the tap horizon table
830+
SenderType::Horizon => unimplemented!(),
831+
};
797832

798833
let sender_balance = escrow_accounts
799834
.borrow()
@@ -860,6 +895,7 @@ impl Actor for SenderAccount {
860895
backoff_info: BackoffInfo::default(),
861896
trusted_sender: config.trusted_senders.contains(&sender_id),
862897
config,
898+
sender_type,
863899
};
864900

865901
stream::iter(allocation_ids)
@@ -937,7 +973,12 @@ impl Actor for SenderAccount {
937973
fee ***MONEY***.
938974
"
939975
);
940-
SenderAccount::deny_sender(&state.pgpool, state.sender).await;
976+
SenderAccount::deny_sender(
977+
state.sender_type,
978+
&state.pgpool,
979+
state.sender,
980+
)
981+
.await;
941982
}
942983

943984
// add new value
@@ -1266,7 +1307,14 @@ impl Actor for SenderAccount {
12661307

12671308
impl SenderAccount {
12681309
/// Deny sender by giving `sender` [Address]
1269-
pub async fn deny_sender(pool: &PgPool, sender: Address) {
1310+
pub async fn deny_sender(sender_type: SenderType, pool: &PgPool, sender: Address) {
1311+
match sender_type {
1312+
SenderType::Legacy => Self::deny_v1_sender(pool, sender).await,
1313+
SenderType::Horizon => Self::deny_v2_sender(pool, sender).await,
1314+
}
1315+
}
1316+
1317+
async fn deny_v1_sender(pool: &PgPool, sender: Address) {
12701318
sqlx::query!(
12711319
r#"
12721320
INSERT INTO scalar_tap_denylist (sender_address)
@@ -1278,6 +1326,10 @@ impl SenderAccount {
12781326
.await
12791327
.expect("Should not fail to insert into denylist");
12801328
}
1329+
1330+
async fn deny_v2_sender(_pool: &PgPool, _sender: Address) {
1331+
unimplemented!()
1332+
}
12811333
}
12821334

12831335
#[cfg(test)]
@@ -1537,8 +1589,8 @@ pub mod tests {
15371589
flush_messages(&notify).await;
15381590

15391591
// should not delete it because it was not in network subgraph
1540-
let actor_ref = ActorRef::<SenderAllocationMessage>::where_is(sender_allocation_id.clone());
1541-
assert!(actor_ref.is_some());
1592+
let allocation_ref =
1593+
ActorRef::<SenderAllocationMessage>::where_is(sender_allocation_id.clone()).unwrap();
15421594

15431595
// Mock result for closed allocations
15441596

@@ -1568,7 +1620,7 @@ pub mod tests {
15681620
.cast(SenderAccountMessage::UpdateAllocationIds(HashSet::new()))
15691621
.unwrap();
15701622

1571-
flush_messages(&notify).await;
1623+
allocation_ref.wait(None).await.unwrap();
15721624

15731625
let actor_ref = ActorRef::<SenderAllocationMessage>::where_is(sender_allocation_id.clone());
15741626
assert!(actor_ref.is_none());

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,13 @@ impl Display for AllocationId {
8383
}
8484
}
8585

86+
/// Type used in [SenderAccountsManager] and [SenderAccount] to route the correct escrow queries
87+
/// and to use the correct set of tables
8688
#[derive(Clone, Copy)]
87-
enum SenderType {
89+
pub enum SenderType {
90+
/// SenderAccounts that are found in Escrow Subgraph v1 (Legacy)
8891
Legacy,
92+
/// SenderAccounts that are found in Tap Collector v2 (Horizon)
8993
Horizon,
9094
}
9195

@@ -465,7 +469,7 @@ impl State {
465469
sender_id,
466470
e
467471
);
468-
SenderAccount::deny_sender(&self.pgpool, sender_id).await;
472+
SenderAccount::deny_sender(sender_type, &self.pgpool, sender_id).await;
469473
}
470474
}
471475

@@ -752,6 +756,7 @@ impl State {
752756
allocation_ids,
753757
prefix: self.prefix.clone(),
754758
retry_interval: Duration::from_secs(30),
759+
sender_type,
755760
})
756761
}
757762
}
@@ -908,7 +913,9 @@ mod tests {
908913
use reqwest::Url;
909914
use ruint::aliases::U256;
910915
use sqlx::{postgres::PgListener, PgPool};
911-
use test_assets::{flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER};
916+
use test_assets::{
917+
assert_while_retry, flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER,
918+
};
912919
use thegraph_core::alloy::hex::ToHexExt;
913920
use tokio::sync::{
914921
mpsc::{self, error::TryRecvError},
@@ -1018,13 +1025,21 @@ mod tests {
10181025

10191026
flush_messages(&notify).await;
10201027

1028+
assert_while_retry! {
1029+
ActorRef::<SenderAccountMessage>::where_is(format!(
1030+
"{}:legacy:{}",
1031+
prefix.clone(),
1032+
SENDER.1
1033+
)).is_none()
1034+
};
1035+
10211036
// verify if create sender account
1022-
let actor_ref = ActorRef::<SenderAccountMessage>::where_is(format!(
1037+
let sender_ref = ActorRef::<SenderAccountMessage>::where_is(format!(
10231038
"{}:legacy:{}",
10241039
prefix.clone(),
10251040
SENDER.1
1026-
));
1027-
assert!(actor_ref.is_some());
1041+
))
1042+
.unwrap();
10281043

10291044
actor
10301045
.cast(SenderAccountsManagerMessage::UpdateSenderAccountsV1(
@@ -1033,6 +1048,8 @@ mod tests {
10331048
.unwrap();
10341049

10351050
flush_messages(&notify).await;
1051+
1052+
sender_ref.wait(None).await.unwrap();
10361053
// verify if it gets removed
10371054
let actor_ref =
10381055
ActorRef::<SenderAccountMessage>::where_is(format!("{}:{}", prefix, SENDER.1));

0 commit comments

Comments
 (0)