@@ -67,6 +67,8 @@ pub struct SenderAccountArgs {
67
67
pub sender_aggregator_endpoint : String ,
68
68
pub allocation_ids : HashSet < Address > ,
69
69
pub prefix : Option < String > ,
70
+
71
+ pub retry_interval : Duration ,
70
72
}
71
73
pub struct State {
72
74
prefix : Option < String > ,
@@ -82,6 +84,7 @@ pub struct State {
82
84
// Deny reasons
83
85
denied : bool ,
84
86
sender_balance : U256 ,
87
+ retry_interval : Duration ,
85
88
86
89
//Eventuals
87
90
escrow_accounts : Eventual < EscrowAccounts > ,
@@ -251,6 +254,7 @@ impl Actor for SenderAccount {
251
254
sender_aggregator_endpoint,
252
255
allocation_ids,
253
256
prefix,
257
+ retry_interval,
254
258
} : Self :: Arguments ,
255
259
) -> std:: result:: Result < Self :: State , ActorProcessingErr > {
256
260
let myself_clone = myself. clone ( ) ;
@@ -419,6 +423,7 @@ impl Actor for SenderAccount {
419
423
sender : sender_id,
420
424
denied,
421
425
sender_balance,
426
+ retry_interval,
422
427
scheduled_rav_request : None ,
423
428
} ;
424
429
@@ -502,7 +507,7 @@ impl Actor for SenderAccount {
502
507
( true , true ) => {
503
508
// retry in a moment
504
509
state. scheduled_rav_request =
505
- Some ( myself. send_after ( Duration :: from_secs ( 30 ) , move || {
510
+ Some ( myself. send_after ( state . retry_interval , move || {
506
511
SenderAccountMessage :: UpdateReceiptFees (
507
512
allocation_id,
508
513
unaggregated_fees,
@@ -692,7 +697,7 @@ pub mod tests {
692
697
use serde_json:: json;
693
698
use sqlx:: PgPool ;
694
699
use std:: collections:: { HashMap , HashSet } ;
695
- use std:: sync:: atomic:: { AtomicBool , AtomicU32 } ;
700
+ use std:: sync:: atomic:: AtomicU32 ;
696
701
use std:: sync:: { Arc , Mutex } ;
697
702
use std:: time:: Duration ;
698
703
use wiremock:: matchers:: { body_string_contains, method} ;
@@ -773,6 +778,7 @@ pub mod tests {
773
778
sender_aggregator_endpoint : DUMMY_URL . to_string ( ) ,
774
779
allocation_ids : HashSet :: new ( ) ,
775
780
prefix : Some ( prefix. clone ( ) ) ,
781
+ retry_interval : Duration :: from_millis ( 10 ) ,
776
782
} ;
777
783
778
784
let ( sender, handle) = SenderAccount :: spawn ( Some ( prefix. clone ( ) ) , SenderAccount , args)
@@ -823,14 +829,14 @@ pub mod tests {
823
829
}
824
830
825
831
pub struct MockSenderAllocation {
826
- triggered_rav_request : Arc < AtomicBool > ,
832
+ triggered_rav_request : Arc < AtomicU32 > ,
827
833
next_rav_value : Arc < Mutex < u128 > > ,
828
834
receipts : Arc < Mutex < Vec < NewReceiptNotification > > > ,
829
835
}
830
836
831
837
impl MockSenderAllocation {
832
- pub fn new_with_triggered_rav_request ( ) -> ( Self , Arc < AtomicBool > ) {
833
- let triggered_rav_request = Arc :: new ( AtomicBool :: new ( false ) ) ;
838
+ pub fn new_with_triggered_rav_request ( ) -> ( Self , Arc < AtomicU32 > ) {
839
+ let triggered_rav_request = Arc :: new ( AtomicU32 :: new ( 0 ) ) ;
834
840
(
835
841
Self {
836
842
triggered_rav_request : triggered_rav_request. clone ( ) ,
@@ -845,7 +851,7 @@ pub mod tests {
845
851
let next_rav_value = Arc :: new ( Mutex :: new ( 0 ) ) ;
846
852
(
847
853
Self {
848
- triggered_rav_request : Arc :: new ( AtomicBool :: new ( false ) ) ,
854
+ triggered_rav_request : Arc :: new ( AtomicU32 :: new ( 0 ) ) ,
849
855
receipts : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
850
856
next_rav_value : next_rav_value. clone ( ) ,
851
857
} ,
@@ -857,7 +863,7 @@ pub mod tests {
857
863
let receipts = Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
858
864
(
859
865
Self {
860
- triggered_rav_request : Arc :: new ( AtomicBool :: new ( false ) ) ,
866
+ triggered_rav_request : Arc :: new ( AtomicU32 :: new ( 0 ) ) ,
861
867
receipts : receipts. clone ( ) ,
862
868
next_rav_value : Arc :: new ( Mutex :: new ( 0 ) ) ,
863
869
} ,
@@ -889,7 +895,7 @@ pub mod tests {
889
895
match message {
890
896
SenderAllocationMessage :: TriggerRAVRequest ( reply) => {
891
897
self . triggered_rav_request
892
- . store ( true , std:: sync:: atomic:: Ordering :: SeqCst ) ;
898
+ . fetch_add ( 1 , std:: sync:: atomic:: Ordering :: SeqCst ) ;
893
899
let signed_rav = create_rav (
894
900
* ALLOCATION_ID_0 ,
895
901
SIGNER . 0 . clone ( ) ,
@@ -912,7 +918,7 @@ pub mod tests {
912
918
sender : Address ,
913
919
allocation : Address ,
914
920
) -> (
915
- Arc < AtomicBool > ,
921
+ Arc < AtomicU32 > ,
916
922
ActorRef < SenderAllocationMessage > ,
917
923
JoinHandle < ( ) > ,
918
924
) {
@@ -954,7 +960,10 @@ pub mod tests {
954
960
955
961
tokio:: time:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
956
962
957
- assert ! ( !triggered_rav_request. load( std:: sync:: atomic:: Ordering :: SeqCst ) ) ;
963
+ assert_eq ! (
964
+ triggered_rav_request. load( std:: sync:: atomic:: Ordering :: SeqCst ) ,
965
+ 0
966
+ ) ;
958
967
959
968
allocation. stop_and_wait ( None , None ) . await . unwrap ( ) ;
960
969
allocation_handle. await . unwrap ( ) ;
@@ -990,7 +999,10 @@ pub mod tests {
990
999
991
1000
tokio:: time:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
992
1001
993
- assert ! ( triggered_rav_request. load( std:: sync:: atomic:: Ordering :: SeqCst ) ) ;
1002
+ assert_eq ! (
1003
+ triggered_rav_request. load( std:: sync:: atomic:: Ordering :: SeqCst ) ,
1004
+ 1
1005
+ ) ;
994
1006
995
1007
allocation. stop_and_wait ( None , None ) . await . unwrap ( ) ;
996
1008
allocation_handle. await . unwrap ( ) ;
@@ -1065,6 +1077,53 @@ pub mod tests {
1065
1077
assert ! ( deny) ;
1066
1078
}
1067
1079
1080
+ #[ sqlx:: test( migrations = "../migrations" ) ]
1081
+ async fn test_retry_unaggregated_fees ( pgpool : PgPool ) {
1082
+ // we set to zero to block the sender, no matter the fee
1083
+ let max_unaggregated_fees_per_sender: u128 = 0 ;
1084
+
1085
+ let ( sender_account, handle, prefix, _) = create_sender_account (
1086
+ pgpool,
1087
+ HashSet :: new ( ) ,
1088
+ TRIGGER_VALUE ,
1089
+ max_unaggregated_fees_per_sender,
1090
+ DUMMY_URL ,
1091
+ )
1092
+ . await ;
1093
+
1094
+ let ( triggered_rav_request, allocation, allocation_handle) =
1095
+ create_mock_sender_allocation ( prefix, SENDER . 1 , * ALLOCATION_ID_0 ) . await ;
1096
+
1097
+ assert_eq ! (
1098
+ triggered_rav_request. load( std:: sync:: atomic:: Ordering :: SeqCst ) ,
1099
+ 0
1100
+ ) ;
1101
+ sender_account
1102
+ . cast ( SenderAccountMessage :: UpdateReceiptFees (
1103
+ * ALLOCATION_ID_0 ,
1104
+ UnaggregatedReceipts {
1105
+ value : TRIGGER_VALUE ,
1106
+ last_id : 11 ,
1107
+ } ,
1108
+ ) )
1109
+ . unwrap ( ) ;
1110
+ tokio:: time:: sleep ( Duration :: from_millis ( 30 ) ) . await ;
1111
+
1112
+ let retry_value = triggered_rav_request. load ( std:: sync:: atomic:: Ordering :: SeqCst ) ;
1113
+ assert ! ( retry_value > 1 , "It didn't retry more than once" ) ;
1114
+
1115
+ tokio:: time:: sleep ( Duration :: from_millis ( 30 ) ) . await ;
1116
+
1117
+ let new_value = triggered_rav_request. load ( std:: sync:: atomic:: Ordering :: SeqCst ) ;
1118
+ assert ! ( new_value > retry_value, "It didn't retry anymore" ) ;
1119
+
1120
+ allocation. stop_and_wait ( None , None ) . await . unwrap ( ) ;
1121
+ allocation_handle. await . unwrap ( ) ;
1122
+
1123
+ sender_account. stop_and_wait ( None , None ) . await . unwrap ( ) ;
1124
+ handle. await . unwrap ( ) ;
1125
+ }
1126
+
1068
1127
#[ sqlx:: test( migrations = "../migrations" ) ]
1069
1128
async fn test_deny_allow ( pgpool : PgPool ) {
1070
1129
async fn get_deny_status ( sender_account : & ActorRef < SenderAccountMessage > ) -> bool {
0 commit comments