Skip to content

Commit 8f26845

Browse files
committed
test(tap-agent): fix mock sender allocation
1 parent 3cabc28 commit 8f26845

File tree

3 files changed

+114
-34
lines changed

3 files changed

+114
-34
lines changed

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,17 @@ impl State {
606606
.set(unaggregated_fees.value as f64);
607607
}
608608

609+
/// Determines whether the sender should be denied/blocked based on current fees and balance.
610+
///
611+
/// The deny condition is reached when either:
612+
/// 1. Total potential fees (pending RAVs + unaggregated fees) exceed the sender's balance
613+
/// 2. Total risky fees (unaggregated + invalid) exceed max_amount_willing_to_lose
614+
///
615+
/// When a successful RAV request clears unaggregated fees, this function should return
616+
/// false, indicating the deny condition is resolved and retries can stop.
617+
///
618+
/// This is the core logic that determines when the retry mechanism should continue
619+
/// versus when it should stop after successful RAV processing.
609620
fn deny_condition_reached(&self) -> bool {
610621
let pending_ravs = self.rav_tracker.get_total_fee();
611622
let unaggregated_fees = self.sender_fee_tracker.get_total_fee();
@@ -1254,12 +1265,19 @@ impl Actor for SenderAccount {
12541265
}
12551266
}
12561267

1268+
// Retry logic: Check if the deny condition is still met after RAV processing
1269+
// This is crucial for stopping retries when RAV requests successfully resolve
1270+
// the underlying issue (e.g., clearing unaggregated fees).
12571271
match (state.denied, state.deny_condition_reached()) {
1258-
// Allow the sender right after the potential RAV request. This way, the
1259-
// sender can be allowed again as soon as possible if the RAV was successful.
1272+
// Case: Sender was denied BUT deny condition no longer met
1273+
// This happens when a successful RAV request clears unaggregated fees,
1274+
// reducing total_potential_fees below the balance threshold.
1275+
// Action: Remove from denylist and stop retrying.
12601276
(true, false) => state.remove_from_denylist().await,
1261-
// if couldn't remove from denylist, resend the message in 30 seconds
1262-
// this may trigger another rav request
1277+
1278+
// Case: Sender still denied AND deny condition still met
1279+
// This happens when RAV requests fail or don't sufficiently reduce fees.
1280+
// Action: Schedule another retry to attempt RAV creation again.
12631281
(true, true) => {
12641282
// retry in a moment
12651283
state.scheduled_rav_request =
@@ -2012,8 +2030,22 @@ pub mod tests {
20122030
assert!(deny);
20132031
}
20142032

2033+
/// Tests the retry mechanism for RAV requests when a sender is blocked due to unaggregated fees.
2034+
///
2035+
/// This test verifies that:
2036+
/// 1. When unaggregated fees exceed the allowed limit, the sender enters a retry state
2037+
/// 2. The retry mechanism triggers RAV requests to resolve the blocked condition
2038+
/// 3. When a RAV request succeeds and clears unaggregated fees, retries stop appropriately
2039+
///
2040+
/// Key behavior tested:
2041+
/// - Sender is blocked when max_unaggregated_fees_per_sender = 0 and any fees are added
2042+
/// - First retry attempt triggers a RAV request
2043+
/// - Successful RAV request clears unaggregated fees and creates a RAV for the amount
2044+
/// - No additional retries occur since the deny condition is resolved
2045+
///
2046+
/// This aligns with the TAP protocol where RAV creation aggregates unaggregated receipts
2047+
/// into a voucher, effectively clearing the unaggregated fees balance.
20152048
#[tokio::test]
2016-
#[serial_test::serial]
20172049
async fn test_retry_unaggregated_fees() {
20182050
let test_db = test_assets::setup_shared_test_db().await;
20192051
let pgpool = test_db.pool;
@@ -2050,9 +2082,12 @@ pub mod tests {
20502082
tokio::time::sleep(RETRY_DURATION).await;
20512083
assert_triggered!(triggered_rav_request);
20522084

2053-
// wait to retry again
2085+
// Verify that no additional retry happens since the first RAV request
2086+
// successfully cleared the unaggregated fees and resolved the deny condition.
2087+
// This validates that the retry mechanism stops when the underlying issue is resolved,
2088+
// which is the correct behavior according to the TAP protocol and retry logic.
20542089
tokio::time::sleep(RETRY_DURATION).await;
2055-
assert_triggered!(triggered_rav_request);
2090+
assert_not_triggered!(triggered_rav_request);
20562091
}
20572092

20582093
#[tokio::test]
@@ -2297,8 +2332,8 @@ pub mod tests {
22972332
.call()
22982333
.await;
22992334

2300-
let (mock_sender_allocation, _) =
2301-
MockSenderAllocation::new_with_next_rav_value(sender_account.clone());
2335+
let (mock_sender_allocation, _, _) =
2336+
MockSenderAllocation::new_with_triggered_rav_request(sender_account.clone());
23022337

23032338
let name = format!("{}:{}:{}", prefix, SENDER.1, ALLOCATION_ID_0);
23042339
let (allocation, _) = MockSenderAllocation::spawn(Some(name), mock_sender_allocation, ())

crates/tap-agent/src/test.rs

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -755,13 +755,12 @@ pub mod actors {
755755
use std::{fmt::Debug, sync::Arc};
756756

757757
use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent};
758-
use test_assets::{ALLOCATION_ID_0, TAP_SIGNER};
758+
use test_assets::ALLOCATION_ID_0;
759759
use thegraph_core::{alloy::primitives::Address, AllocationId as AllocationIdCore};
760760
use tokio::sync::{mpsc, watch, Notify};
761761

762-
use super::create_rav;
763762
use crate::agent::{
764-
sender_account::{ReceiptFees, SenderAccountMessage},
763+
sender_account::{RavInformation, ReceiptFees, SenderAccountMessage},
765764
sender_accounts_manager::{AllocationId, NewReceiptNotification},
766765
sender_allocation::SenderAllocationMessage,
767766
unaggregated_receipts::UnaggregatedReceipts,
@@ -893,11 +892,21 @@ pub mod actors {
893892
}
894893
}
895894

895+
/// Mock implementation of SenderAllocation for testing purposes.
896+
///
897+
/// This mock simulates the behavior of a real sender allocation actor, particularly
898+
/// for testing RAV request flows and retry mechanisms. When a RAV request is triggered,
899+
/// it sends back a successful response that follows TAP protocol behavior:
900+
///
901+
/// - Clears unaggregated fees to zero (they become part of the RAV)
902+
/// - Creates a RAV for the full aggregated amount
903+
/// - Properly resolves deny conditions to stop unnecessary retries
904+
///
905+
/// This implementation aligns with the documented expectation:
906+
/// "set the unnagregated fees to zero and the rav to the amount"
896907
pub struct MockSenderAllocation {
897908
triggered_rav_request: Arc<Notify>,
898909
sender_actor: Option<ActorRef<SenderAccountMessage>>,
899-
900-
next_rav_value: watch::Receiver<u128>,
901910
next_unaggregated_fees_value: watch::Receiver<u128>,
902911
receipts: mpsc::Sender<NewReceiptNotification>,
903912
}
@@ -913,7 +922,6 @@ pub mod actors {
913922
sender_actor: Some(sender_actor),
914923
triggered_rav_request: triggered_rav_request.clone(),
915924
receipts: mpsc::channel(1).0,
916-
next_rav_value: watch::channel(0).1,
917925
next_unaggregated_fees_value,
918926
},
919927
triggered_rav_request,
@@ -924,16 +932,15 @@ pub mod actors {
924932
pub fn new_with_next_rav_value(
925933
sender_actor: ActorRef<SenderAccountMessage>,
926934
) -> (Self, watch::Sender<u128>) {
927-
let (next_rav_value_sender, next_rav_value) = watch::channel(0);
935+
let (unaggregated_fees, next_unaggregated_fees_value) = watch::channel(0);
928936
(
929937
Self {
930938
sender_actor: Some(sender_actor),
931939
triggered_rav_request: Arc::new(Notify::new()),
932940
receipts: mpsc::channel(1).0,
933-
next_rav_value,
934-
next_unaggregated_fees_value: watch::channel(0).1,
941+
next_unaggregated_fees_value,
935942
},
936-
next_rav_value_sender,
943+
unaggregated_fees,
937944
)
938945
}
939946

@@ -945,7 +952,6 @@ pub mod actors {
945952
sender_actor: None,
946953
triggered_rav_request: Arc::new(Notify::new()),
947954
receipts: tx,
948-
next_rav_value: watch::channel(0).1,
949955
next_unaggregated_fees_value: watch::channel(0).1,
950956
},
951957
rx,
@@ -977,21 +983,30 @@ pub mod actors {
977983
SenderAllocationMessage::TriggerRavRequest => {
978984
self.triggered_rav_request.notify_one();
979985
if let Some(sender_account) = self.sender_actor.as_ref() {
980-
let signed_rav = create_rav(
981-
ALLOCATION_ID_0,
982-
TAP_SIGNER.0.clone(),
983-
4,
984-
*self.next_rav_value.borrow(),
985-
);
986+
// Mock a successful RAV request response that follows TAP protocol behavior:
987+
// 1. Aggregate unaggregated receipts into a Receipt Aggregate Voucher (RAV)
988+
// 2. Clear unaggregated fees to zero (they're now represented in the RAV)
989+
// 3. Create a RAV for the full aggregated amount
990+
//
991+
// This behavior aligns with the documented expectation:
992+
// "set the unnagregated fees to zero and the rav to the amount"
993+
// (see sender_account.rs test_deny_allow comment)
994+
//
995+
// Important: This correctly resolves the deny condition when unaggregated
996+
// fees are cleared, which stops the retry mechanism as intended.
997+
let current_value = *self.next_unaggregated_fees_value.borrow();
986998
sender_account.cast(SenderAccountMessage::UpdateReceiptFees(
987999
AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_0)),
9881000
ReceiptFees::RavRequestResponse(
9891001
UnaggregatedReceipts {
990-
value: *self.next_unaggregated_fees_value.borrow(),
1002+
value: 0, // Clear unaggregated fees - they're now in the RAV
9911003
last_id: 0,
9921004
counter: 0,
9931005
},
994-
Ok(Some(signed_rav.into())),
1006+
Ok(Some(RavInformation {
1007+
allocation_id: ALLOCATION_ID_0,
1008+
value_aggregate: current_value, // RAV for the full amount
1009+
})),
9951010
),
9961011
))?;
9971012
}

crates/test-assets/src/lib.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,6 @@ pub async fn setup_shared_test_db() -> TestDatabase {
530530
let db_id = DB_COUNTER.fetch_add(1, Ordering::SeqCst);
531531
let unique_db_name = format!("test_db_{db_id}");
532532

533-
// Start PostgreSQL container
534533
let pg_container = postgres::Postgres::default()
535534
.start()
536535
.await
@@ -541,9 +540,25 @@ pub async fn setup_shared_test_db() -> TestDatabase {
541540
.await
542541
.expect("Failed to get container port");
543542

543+
// In CI environments, we might need to use the container's IP instead of localhost
544+
let host = if std::env::var("CI").is_ok() {
545+
pg_container
546+
.get_host()
547+
.await
548+
.expect("Failed to get container host")
549+
.to_string()
550+
} else {
551+
"localhost".to_string()
552+
};
553+
544554
// Connect to postgres database first to create our test database
545555
let admin_connection_string =
546-
format!("postgres://postgres:postgres@localhost:{host_port}/postgres");
556+
format!("postgres://postgres:postgres@{host}:{host_port}/postgres");
557+
558+
tracing::debug!(
559+
"Attempting to connect to admin database: {}",
560+
admin_connection_string
561+
);
547562
let admin_pool = sqlx::PgPool::connect(&admin_connection_string)
548563
.await
549564
.expect("Failed to connect to admin database");
@@ -556,7 +571,7 @@ pub async fn setup_shared_test_db() -> TestDatabase {
556571

557572
// Connect to our test database
558573
let connection_string =
559-
format!("postgres://postgres:postgres@localhost:{host_port}/{unique_db_name}");
574+
format!("postgres://postgres:postgres@{host}:{host_port}/{unique_db_name}");
560575
let pool = sqlx::PgPool::connect(&connection_string)
561576
.await
562577
.expect("Failed to connect to test database");
@@ -596,7 +611,6 @@ pub async fn setup_test_db_with_migrator(migrator: Migrator) -> TestDatabase {
596611
let db_id = DB_COUNTER.fetch_add(1, Ordering::SeqCst);
597612
let unique_db_name = format!("test_db_custom_{db_id}");
598613

599-
// Start PostgreSQL container
600614
let pg_container = postgres::Postgres::default()
601615
.start()
602616
.await
@@ -607,9 +621,25 @@ pub async fn setup_test_db_with_migrator(migrator: Migrator) -> TestDatabase {
607621
.await
608622
.expect("Failed to get container port");
609623

624+
// In CI environments, we might need to use the container's IP instead of localhost
625+
let host = if std::env::var("CI").is_ok() {
626+
pg_container
627+
.get_host()
628+
.await
629+
.expect("Failed to get container host")
630+
.to_string()
631+
} else {
632+
"localhost".to_string()
633+
};
634+
610635
// Connect to postgres database first to create our test database
611636
let admin_connection_string =
612-
format!("postgres://postgres:postgres@localhost:{host_port}/postgres");
637+
format!("postgres://postgres:postgres@{host}:{host_port}/postgres");
638+
639+
tracing::debug!(
640+
"Attempting to connect to admin database: {}",
641+
admin_connection_string
642+
);
613643
let admin_pool = sqlx::PgPool::connect(&admin_connection_string)
614644
.await
615645
.expect("Failed to connect to admin database");
@@ -622,7 +652,7 @@ pub async fn setup_test_db_with_migrator(migrator: Migrator) -> TestDatabase {
622652

623653
// Connect to our test database
624654
let connection_string =
625-
format!("postgres://postgres:postgres@localhost:{host_port}/{unique_db_name}");
655+
format!("postgres://postgres:postgres@{host}:{host_port}/{unique_db_name}");
626656
let pool = sqlx::PgPool::connect(&connection_string)
627657
.await
628658
.expect("Failed to connect to test database");

0 commit comments

Comments
 (0)