Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 49 additions & 7 deletions crates/tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1831,7 +1831,13 @@ pub mod tests {

// make sure there's a reason to keep denied
let signed_rav = create_rav(ALLOCATION_ID_0, SIGNER.0.clone(), 4, ESCROW_VALUE);
store_rav_with_options(&pgpool, signed_rav, SENDER.1, true, false)
store_rav_with_options()
.pgpool(&pgpool)
.signed_rav(signed_rav)
.sender(SENDER.1)
.last(true)
.final_rav(false)
.call()
.await
.unwrap();

Expand Down Expand Up @@ -1981,7 +1987,13 @@ pub mod tests {
async fn test_initialization_with_pending_ravs_over_the_limit(pgpool: PgPool) {
// add last non-final ravs
let signed_rav = create_rav(ALLOCATION_ID_0, SIGNER.0.clone(), 4, ESCROW_VALUE);
store_rav_with_options(&pgpool, signed_rav, SENDER.1, true, false)
store_rav_with_options()
.pgpool(&pgpool)
.signed_rav(signed_rav)
.sender(SENDER.1)
.last(true)
.final_rav(false)
.call()
.await
.unwrap();

Expand All @@ -1999,13 +2011,25 @@ pub mod tests {
async fn test_unaggregated_fees_over_balance(pgpool: PgPool) {
// add last non-final ravs
let signed_rav = create_rav(ALLOCATION_ID_0, SIGNER.0.clone(), 4, ESCROW_VALUE / 2);
store_rav_with_options(&pgpool, signed_rav, SENDER.1, true, false)
store_rav_with_options()
.pgpool(&pgpool)
.signed_rav(signed_rav)
.sender(SENDER.1)
.last(true)
.final_rav(false)
.call()
.await
.unwrap();

// other rav final, should not be taken into account
let signed_rav = create_rav(ALLOCATION_ID_1, SIGNER.0.clone(), 4, ESCROW_VALUE / 2);
store_rav_with_options(&pgpool, signed_rav, SENDER.1, true, true)
store_rav_with_options()
.pgpool(&pgpool)
.signed_rav(signed_rav)
.sender(SENDER.1)
.last(true)
.final_rav(true)
.call()
.await
.unwrap();

Expand Down Expand Up @@ -2176,12 +2200,24 @@ pub mod tests {

// redeemed
let signed_rav = create_rav(ALLOCATION_ID_0, SIGNER.0.clone(), 4, ESCROW_VALUE);
store_rav_with_options(&pgpool, signed_rav, SENDER.1, true, false)
store_rav_with_options()
.pgpool(&pgpool)
.signed_rav(signed_rav)
.sender(SENDER.1)
.last(true)
.final_rav(false)
.call()
.await
.unwrap();

let signed_rav = create_rav(ALLOCATION_ID_1, SIGNER.0.clone(), 4, ESCROW_VALUE - 1);
store_rav_with_options(&pgpool, signed_rav, SENDER.1, true, false)
store_rav_with_options()
.pgpool(&pgpool)
.signed_rav(signed_rav)
.sender(SENDER.1)
.last(true)
.final_rav(false)
.call()
.await
.unwrap();

Expand Down Expand Up @@ -2233,7 +2269,13 @@ pub mod tests {
async fn test_thawing_deposit_process(pgpool: PgPool) {
// add last non-final ravs
let signed_rav = create_rav(ALLOCATION_ID_0, SIGNER.0.clone(), 4, ESCROW_VALUE / 2);
store_rav_with_options(&pgpool, signed_rav, SENDER.1, true, false)
store_rav_with_options()
.pgpool(&pgpool)
.signed_rav(signed_rav)
.sender(SENDER.1)
.last(true)
.final_rav(false)
.call()
.await
.unwrap();

Expand Down
64 changes: 36 additions & 28 deletions crates/tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,23 +283,27 @@ impl Actor for SenderAccountsManager {

// Start the new_receipts_watcher task that will consume from the `pglistener`
// after starting all senders
state.new_receipts_watcher_handle_v1 = Some(tokio::spawn(new_receipts_watcher(
myself.get_cell(),
pglistener_v1,
escrow_accounts_v1,
SenderType::Legacy,
prefix.clone(),
)));
state.new_receipts_watcher_handle_v1 = Some(tokio::spawn(
new_receipts_watcher()
.sender_type(SenderType::Legacy)
.actor_cell(myself.get_cell())
.pglistener(pglistener_v1)
.escrow_accounts_rx(escrow_accounts_v1)
.maybe_prefix(prefix.clone())
.call(),
));

// Start the new_receipts_watcher task that will consume from the `pglistener`
// after starting all senders
state.new_receipts_watcher_handle_v2 = Some(tokio::spawn(new_receipts_watcher(
myself.get_cell(),
pglistener_v2,
escrow_accounts_v2,
SenderType::Horizon,
prefix,
)));
state.new_receipts_watcher_handle_v2 = Some(tokio::spawn(
new_receipts_watcher()
.actor_cell(myself.get_cell())
.pglistener(pglistener_v2)
.escrow_accounts_rx(escrow_accounts_v2)
.sender_type(SenderType::Horizon)
.maybe_prefix(prefix)
.call(),
));

tracing::info!("SenderAccountManager created!");
Ok(state)
Expand Down Expand Up @@ -757,6 +761,7 @@ impl State {

/// Continuously listens for new receipt notifications from Postgres and forwards them to the
/// corresponding SenderAccount.
#[bon::builder]
async fn new_receipts_watcher(
actor_cell: ActorCell,
mut pglistener: PgListener,
Expand Down Expand Up @@ -1169,13 +1174,15 @@ mod tests {
let dummy_actor = DummyActor::spawn().await;

// Start the new_receipts_watcher task that will consume from the `pglistener`
let new_receipts_watcher_handle = tokio::spawn(new_receipts_watcher(
dummy_actor.get_cell(),
pglistener,
escrow_accounts_rx,
SenderType::Legacy,
Some(prefix.clone()),
));
let new_receipts_watcher_handle = tokio::spawn(
new_receipts_watcher()
.actor_cell(dummy_actor.get_cell())
.pglistener(pglistener)
.escrow_accounts_rx(escrow_accounts_rx)
.sender_type(SenderType::Legacy)
.prefix(prefix.clone())
.call(),
);

let receipts_count = 10;
// add receipts to the database
Expand Down Expand Up @@ -1213,13 +1220,14 @@ mod tests {
let dummy_actor = DummyActor::spawn().await;

// Start the new_receipts_watcher task that will consume from the `pglistener`
let new_receipts_watcher_handle = tokio::spawn(new_receipts_watcher(
dummy_actor.get_cell(),
pglistener,
escrow_accounts_rx,
SenderType::Legacy,
None,
));
let new_receipts_watcher_handle = tokio::spawn(
new_receipts_watcher()
.sender_type(SenderType::Legacy)
.actor_cell(dummy_actor.get_cell())
.pglistener(pglistener)
.escrow_accounts_rx(escrow_accounts_rx)
.call(),
);
pgpool.close().await;
new_receipts_watcher_handle.await.unwrap();

Expand Down
15 changes: 8 additions & 7 deletions crates/tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,14 @@ where
escrow_accounts.clone(),
)),
];
let context = TapAgentContext::new(
pgpool.clone(),
allocation_id,
config.indexer_address,
sender,
escrow_accounts.clone(),
);
let context = TapAgentContext::builder()
.pgpool(pgpool.clone())
.allocation_id(allocation_id)
.indexer_address(config.indexer_address)
.sender(sender)
.escrow_accounts(escrow_accounts.clone())
.build();

let latest_rav = context.last_rav().await.unwrap_or_default();
let tap_manager = TapManager::new(
domain_separator.clone(),
Expand Down
27 changes: 5 additions & 22 deletions crates/tap-agent/src/tap/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,36 +152,19 @@ impl NetworkVersion for Horizon {
/// Context used by [tap_core::manager::Manager] that enables certain helper methods
///
/// This context is implemented for PostgresSQL
#[derive(Clone)]
#[derive(Clone, bon::Builder)]
pub struct TapAgentContext<T> {
pgpool: PgPool,
#[cfg_attr(test, builder(default = crate::test::ALLOCATION_ID_0))]
allocation_id: Address,
#[cfg_attr(test, builder(default = test_assets::TAP_SENDER.1))]
sender: Address,
#[cfg_attr(test, builder(default = crate::test::INDEXER.1))]
indexer_address: Address,
escrow_accounts: Receiver<EscrowAccounts>,
/// We use phantom data as a marker since it's
/// only used to define what methods are available
/// for each type of network
#[builder(default = PhantomData)]
Copy link

@Veetaha Veetaha Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using #[builder(skip)] for PhantomData fields to avoid generating setters for this field at all.

_phantom: PhantomData<T>,
}

/// Allow any [NetworkVersion] to create a new context
impl<T: NetworkVersion> TapAgentContext<T> {
/// Creates a TapContext
pub fn new(
pgpool: PgPool,
allocation_id: Address,
indexer_address: Address,
sender: Address,
escrow_accounts: Receiver<EscrowAccounts>,
) -> Self {
Self {
pgpool,
allocation_id,
indexer_address,
sender,
escrow_accounts,
_phantom: PhantomData,
}
}
}
26 changes: 10 additions & 16 deletions crates/tap-agent/src/tap/context/rav.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,13 @@ mod test {
use rstest::rstest;
use sqlx::PgPool;
use tap_core::signed_message::Eip712SignedMessage;
use test_assets::{TAP_SENDER as SENDER, TAP_SIGNER as SIGNER};
use test_assets::TAP_SIGNER as SIGNER;
use tokio::sync::watch;

use super::*;
use crate::{
tap::context::NetworkVersion,
test::{CreateRav, ALLOCATION_ID_0, INDEXER},
test::{CreateRav, ALLOCATION_ID_0},
};

#[derive(Debug)]
Expand All @@ -351,23 +351,17 @@ mod test {
const VALUE_AGGREGATE: u128 = u128::MAX;

async fn legacy_adapter(pgpool: PgPool) -> TapAgentContext<Legacy> {
TapAgentContext::new(
pgpool,
ALLOCATION_ID_0,
INDEXER.1,
SENDER.1,
watch::channel(EscrowAccounts::default()).1,
)
TapAgentContext::builder()
.pgpool(pgpool)
.escrow_accounts(watch::channel(EscrowAccounts::default()).1)
.build()
}

async fn horizon_adapter(pgpool: PgPool) -> TapAgentContext<Horizon> {
TapAgentContext::new(
pgpool,
ALLOCATION_ID_0,
INDEXER.1,
SENDER.1,
watch::channel(EscrowAccounts::default()).1,
)
TapAgentContext::builder()
.pgpool(pgpool)
.escrow_accounts(watch::channel(EscrowAccounts::default()).1)
.build()
}

/// Insert a single receipt and retrieve it from the database using the adapter.
Expand Down
24 changes: 9 additions & 15 deletions crates/tap-agent/src/tap/context/receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ mod test {
use tokio::sync::watch::{self, Receiver};

use super::*;
use crate::test::{store_receipt, CreateReceipt, INDEXER, SENDER_2};
use crate::test::{store_receipt, CreateReceipt, SENDER_2};

const ALLOCATION_ID_IRRELEVANT: Address = ALLOCATION_ID_1;

Expand All @@ -427,26 +427,20 @@ mod test {
pgpool: PgPool,
escrow_accounts: Receiver<EscrowAccounts>,
) -> TapAgentContext<Legacy> {
TapAgentContext::new(
pgpool.clone(),
ALLOCATION_ID_0,
INDEXER.1,
SENDER.1,
escrow_accounts,
)
TapAgentContext::builder()
.pgpool(pgpool)
.escrow_accounts(escrow_accounts)
.build()
}

async fn horizon_adapter(
pgpool: PgPool,
escrow_accounts: Receiver<EscrowAccounts>,
) -> TapAgentContext<Horizon> {
TapAgentContext::new(
pgpool,
ALLOCATION_ID_0,
INDEXER.1,
SENDER.1,
escrow_accounts,
)
TapAgentContext::builder()
.pgpool(pgpool)
.escrow_accounts(escrow_accounts)
.build()
}

/// Insert a single receipt and retrieve it from the database using the adapter.
Expand Down
10 changes: 9 additions & 1 deletion crates/tap-agent/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,14 @@ pub async fn store_rav(
signed_rav: SignedRav,
sender: Address,
) -> anyhow::Result<()> {
store_rav_with_options(pgpool, signed_rav, sender, false, false).await
store_rav_with_options()
.pgpool(pgpool)
.signed_rav(signed_rav)
.sender(sender)
.last(false)
.final_rav(false)
.call()
.await
}

// TODO use static and check for possible errors with connection refused
Expand Down Expand Up @@ -659,6 +666,7 @@ async fn create_grpc_aggregator() -> (JoinHandle<()>, SocketAddr) {
.unwrap()
}

#[bon::builder]
pub async fn store_rav_with_options(
pgpool: &PgPool,
signed_rav: SignedRav,
Expand Down
Loading