Skip to content

Commit 8242976

Browse files
gusinacioaasseman
authored andcommitted
fix(tap-agent): allow sender on allocation close
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 9edb0e5 commit 8242976

File tree

1 file changed

+103
-3
lines changed

1 file changed

+103
-3
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ pub enum SenderAccountMessage {
4545
GetSenderFeeTracker(ractor::RpcReplyPort<SenderFeeTracker>),
4646
#[cfg(test)]
4747
GetDeny(ractor::RpcReplyPort<bool>),
48+
#[cfg(test)]
49+
HasSchedulerEnabled(ractor::RpcReplyPort<bool>),
4850
}
4951

5052
/// A SenderAccount manages the receipts accounting between the indexer and the sender across
@@ -624,6 +626,12 @@ impl Actor for SenderAccount {
624626
let _ = reply.send(state.denied);
625627
}
626628
}
629+
#[cfg(test)]
630+
SenderAccountMessage::HasSchedulerEnabled(reply) => {
631+
if !reply.is_closed() {
632+
let _ = reply.send(state.scheduled_rav_request.is_some());
633+
}
634+
}
627635
}
628636
Ok(())
629637
}
@@ -661,12 +669,16 @@ impl Actor for SenderAccount {
661669
return Ok(());
662670
};
663671

664-
let tracker = &mut state.sender_fee_tracker;
665-
tracker.update(allocation_id, 0);
666672
// clean up hashset
667673
state
668674
.sender_fee_tracker
669675
.unblock_allocation_id(allocation_id);
676+
// update the receipt fees by reseting to 0
677+
myself.cast(SenderAccountMessage::UpdateReceiptFees(
678+
allocation_id,
679+
UnaggregatedReceipts::default(),
680+
))?;
681+
670682
// rav tracker is not updated because it's still not redeemed
671683
}
672684
SupervisionEvent::ActorPanicked(cell, error) => {
@@ -913,6 +925,7 @@ pub mod tests {
913925
pub struct MockSenderAllocation {
914926
triggered_rav_request: Arc<AtomicU32>,
915927
next_rav_value: Arc<Mutex<u128>>,
928+
next_unaggregated_fees_value: Arc<Mutex<u128>>,
916929
receipts: Arc<Mutex<Vec<NewReceiptNotification>>>,
917930
}
918931

@@ -924,18 +937,33 @@ pub mod tests {
924937
triggered_rav_request: triggered_rav_request.clone(),
925938
receipts: Arc::new(Mutex::new(Vec::new())),
926939
next_rav_value: Arc::new(Mutex::new(0)),
940+
next_unaggregated_fees_value: Arc::new(Mutex::new(0)),
927941
},
928942
triggered_rav_request,
929943
)
930944
}
931945

946+
pub fn new_with_next_unaggregated_fees_value() -> (Self, Arc<Mutex<u128>>) {
947+
let unaggregated_fees = Arc::new(Mutex::new(0));
948+
(
949+
Self {
950+
triggered_rav_request: Arc::new(AtomicU32::new(0)),
951+
receipts: Arc::new(Mutex::new(Vec::new())),
952+
next_rav_value: Arc::new(Mutex::new(0)),
953+
next_unaggregated_fees_value: unaggregated_fees.clone(),
954+
},
955+
unaggregated_fees,
956+
)
957+
}
958+
932959
pub fn new_with_next_rav_value() -> (Self, Arc<Mutex<u128>>) {
933960
let next_rav_value = Arc::new(Mutex::new(0));
934961
(
935962
Self {
936963
triggered_rav_request: Arc::new(AtomicU32::new(0)),
937964
receipts: Arc::new(Mutex::new(Vec::new())),
938965
next_rav_value: next_rav_value.clone(),
966+
next_unaggregated_fees_value: Arc::new(Mutex::new(0)),
939967
},
940968
next_rav_value,
941969
)
@@ -948,6 +976,7 @@ pub mod tests {
948976
triggered_rav_request: Arc::new(AtomicU32::new(0)),
949977
receipts: receipts.clone(),
950978
next_rav_value: Arc::new(Mutex::new(0)),
979+
next_unaggregated_fees_value: Arc::new(Mutex::new(0)),
951980
},
952981
receipts,
953982
)
@@ -984,7 +1013,13 @@ pub mod tests {
9841013
4,
9851014
*self.next_rav_value.lock().unwrap(),
9861015
);
987-
reply.send((UnaggregatedReceipts::default(), Some(signed_rav)))?;
1016+
reply.send((
1017+
UnaggregatedReceipts {
1018+
value: *self.next_unaggregated_fees_value.lock().unwrap(),
1019+
last_id: 0,
1020+
},
1021+
Some(signed_rav),
1022+
))?;
9881023
}
9891024
SenderAllocationMessage::NewReceipt(receipt) => {
9901025
self.receipts.lock().unwrap().push(receipt);
@@ -1538,4 +1573,69 @@ pub mod tests {
15381573
sender_account.stop_and_wait(None, None).await.unwrap();
15391574
handle.await.unwrap();
15401575
}
1576+
1577+
#[sqlx::test(migrations = "../migrations")]
1578+
async fn test_sender_denied_close_allocation_stop_retry(pgpool: PgPool) {
1579+
// we set to 1 to block the sender on a really low value
1580+
let max_unaggregated_fees_per_sender: u128 = 1;
1581+
1582+
let (sender_account, handle, prefix, _) = create_sender_account(
1583+
pgpool,
1584+
HashSet::new(),
1585+
TRIGGER_VALUE,
1586+
max_unaggregated_fees_per_sender,
1587+
DUMMY_URL,
1588+
)
1589+
.await;
1590+
1591+
let (mock_sender_allocation, next_unaggregated_fees) =
1592+
MockSenderAllocation::new_with_next_unaggregated_fees_value();
1593+
1594+
let name = format!("{}:{}:{}", prefix, SENDER.1, *ALLOCATION_ID_0);
1595+
let (allocation, allocation_handle) = MockSenderAllocation::spawn_linked(
1596+
Some(name),
1597+
mock_sender_allocation,
1598+
(),
1599+
sender_account.get_cell(),
1600+
)
1601+
.await
1602+
.unwrap();
1603+
*next_unaggregated_fees.lock().unwrap() = TRIGGER_VALUE;
1604+
1605+
// set retry
1606+
sender_account
1607+
.cast(SenderAccountMessage::UpdateReceiptFees(
1608+
*ALLOCATION_ID_0,
1609+
UnaggregatedReceipts {
1610+
value: TRIGGER_VALUE,
1611+
last_id: 11,
1612+
},
1613+
))
1614+
.unwrap();
1615+
tokio::time::sleep(Duration::from_millis(100)).await;
1616+
1617+
let deny = call!(sender_account, SenderAccountMessage::GetDeny).unwrap();
1618+
assert!(deny, "should be blocked");
1619+
1620+
let scheuduler_enabled =
1621+
call!(sender_account, SenderAccountMessage::HasSchedulerEnabled).unwrap();
1622+
assert!(scheuduler_enabled, "should have an scheduler enabled");
1623+
1624+
// close the allocation and trigger
1625+
allocation.stop_and_wait(None, None).await.unwrap();
1626+
allocation_handle.await.unwrap();
1627+
1628+
tokio::time::sleep(Duration::from_millis(100)).await;
1629+
1630+
// should remove the block and the retry
1631+
let deny = call!(sender_account, SenderAccountMessage::GetDeny).unwrap();
1632+
assert!(!deny, "should be unblocked");
1633+
1634+
let scheuduler_enabled =
1635+
call!(sender_account, SenderAccountMessage::HasSchedulerEnabled).unwrap();
1636+
assert!(!scheuduler_enabled, "should have an scheduler disabled");
1637+
1638+
sender_account.stop_and_wait(None, None).await.unwrap();
1639+
handle.await.unwrap();
1640+
}
15411641
}

0 commit comments

Comments
 (0)