Skip to content

Commit e61ad31

Browse files
committed
test(tap-agent): buffer window fee
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 3b4f42e commit e61ad31

File tree

2 files changed

+129
-40
lines changed

2 files changed

+129
-40
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,6 @@ impl Actor for SenderAccount {
601601
state.add_to_denylist().await;
602602
}
603603

604-
// TODO if the total_fee_outside_the_buffer
605604
if state.sender_fee_tracker.get_total_fee_outsite_buffer()
606605
>= state.config.tap.rav_request_trigger_value
607606
{
@@ -904,6 +903,7 @@ pub mod tests {
904903
const DUMMY_URL: &str = "http://localhost:1234";
905904
const TRIGGER_VALUE: u128 = 500;
906905
const ESCROW_VALUE: u128 = 1000;
906+
const BUFFER_MS: u64 = 100;
907907

908908
async fn create_sender_account(
909909
pgpool: PgPool,
@@ -924,7 +924,7 @@ pub mod tests {
924924
},
925925
tap: config::Tap {
926926
rav_request_trigger_value,
927-
rav_request_timestamp_buffer_ms: 1,
927+
rav_request_timestamp_buffer_ms: BUFFER_MS,
928928
rav_request_timeout_secs: 5,
929929
max_unnaggregated_fees_per_sender,
930930
..Default::default()
@@ -1211,14 +1211,11 @@ pub mod tests {
12111211
sender_account
12121212
.cast(SenderAccountMessage::UpdateReceiptFees(
12131213
*ALLOCATION_ID_0,
1214-
ReceiptFees::UpdateValue(UnaggregatedReceipts {
1215-
value: TRIGGER_VALUE - 1,
1216-
last_id: 10,
1217-
}),
1214+
ReceiptFees::NewReceipt(TRIGGER_VALUE - 1),
12181215
))
12191216
.unwrap();
12201217

1221-
tokio::time::sleep(Duration::from_millis(10)).await;
1218+
tokio::time::sleep(Duration::from_millis(BUFFER_MS)).await;
12221219

12231220
assert_eq!(
12241221
triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst),
@@ -1250,10 +1247,24 @@ pub mod tests {
12501247
sender_account
12511248
.cast(SenderAccountMessage::UpdateReceiptFees(
12521249
*ALLOCATION_ID_0,
1253-
ReceiptFees::UpdateValue(UnaggregatedReceipts {
1254-
value: TRIGGER_VALUE,
1255-
last_id: 10,
1256-
}),
1250+
ReceiptFees::NewReceipt(TRIGGER_VALUE),
1251+
))
1252+
.unwrap();
1253+
1254+
tokio::time::sleep(Duration::from_millis(20)).await;
1255+
1256+
assert_eq!(
1257+
triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst),
1258+
0
1259+
);
1260+
1261+
// wait for it to be outside buffer
1262+
tokio::time::sleep(Duration::from_millis(BUFFER_MS)).await;
1263+
1264+
sender_account
1265+
.cast(SenderAccountMessage::UpdateReceiptFees(
1266+
*ALLOCATION_ID_0,
1267+
ReceiptFees::Retry,
12571268
))
12581269
.unwrap();
12591270

@@ -1362,10 +1373,7 @@ pub mod tests {
13621373
sender_account
13631374
.cast(SenderAccountMessage::UpdateReceiptFees(
13641375
*ALLOCATION_ID_0,
1365-
ReceiptFees::UpdateValue(UnaggregatedReceipts {
1366-
value: TRIGGER_VALUE,
1367-
last_id: 11,
1368-
}),
1376+
ReceiptFees::NewReceipt(TRIGGER_VALUE),
13691377
))
13701378
.unwrap();
13711379
tokio::time::sleep(Duration::from_millis(200)).await;
@@ -1750,10 +1758,7 @@ pub mod tests {
17501758
sender_account
17511759
.cast(SenderAccountMessage::UpdateReceiptFees(
17521760
*ALLOCATION_ID_0,
1753-
ReceiptFees::UpdateValue(UnaggregatedReceipts {
1754-
value: TRIGGER_VALUE,
1755-
last_id: 11,
1756-
}),
1761+
ReceiptFees::NewReceipt(TRIGGER_VALUE),
17571762
))
17581763
.unwrap();
17591764
tokio::time::sleep(Duration::from_millis(100)).await;

tap-agent/src/agent/sender_fee_tracker.rs

Lines changed: 105 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,26 +34,30 @@ pub struct SenderFeeTracker {
3434
id_to_fee: HashMap<Address, u128>,
3535
total_fee: u128,
3636

37-
buffer_fee: HashMap<Address, ExpiringSum>,
38-
39-
buffer_duration: Duration,
37+
buffer_window_fee: HashMap<Address, ExpiringSum>,
38+
buffer_window_duration: Duration,
4039
// there are some allocations that we don't want it to be
4140
// heaviest allocation, because they are already marked for finalization,
4241
// and thus requesting RAVs on their own in their `post_stop` routine.
4342
blocked_addresses: HashSet<Address>,
4443
}
4544

4645
impl SenderFeeTracker {
47-
pub fn new(buffer_duration: Duration) -> Self {
46+
pub fn new(buffer_window_duration: Duration) -> Self {
4847
Self {
49-
buffer_duration,
48+
buffer_window_duration,
5049
..Default::default()
5150
}
5251
}
52+
/// Adds into the total_fee entry and buffer window totals
53+
///
54+
/// It's important to notice that `value` cannot be less than
55+
/// zero, so the only way to make this counter lower is by using
56+
/// `update` function
5357
pub fn add(&mut self, id: Address, value: u128) {
54-
if self.buffer_duration > Duration::ZERO {
58+
if self.buffer_window_duration > Duration::ZERO {
5559
let now = Instant::now();
56-
let expiring_sum = self.buffer_fee.entry(id).or_default();
60+
let expiring_sum = self.buffer_window_fee.entry(id).or_default();
5761
expiring_sum.entries.push_back((now, value));
5862
expiring_sum.sum += value;
5963
}
@@ -62,6 +66,10 @@ impl SenderFeeTracker {
6266
*entry += value;
6367
}
6468

69+
/// Updates and overwrite the fee counter into the specific
70+
/// value provided.
71+
///
72+
/// IMPORTANT: This function does not affect the buffer window fee
6573
pub fn update(&mut self, id: Address, fee: u128) {
6674
if fee > 0 {
6775
// insert or update, if update remove old fee from total
@@ -100,12 +108,13 @@ impl SenderFeeTracker {
100108
(
101109
addr,
102110
fee - self
103-
.buffer_fee
111+
.buffer_window_fee
104112
.get_mut(addr)
105-
.map(|expiring| expiring.get_sum(&self.buffer_duration))
113+
.map(|expiring| expiring.get_sum(&self.buffer_window_duration))
106114
.unwrap_or_default(),
107115
)
108116
})
117+
.filter(|(_, fee)| *fee > 0)
109118
.fold(None, |acc: Option<(&Address, u128)>, (addr, fee)| {
110119
if let Some((_, max_fee)) = acc {
111120
if fee > max_fee {
@@ -129,30 +138,29 @@ impl SenderFeeTracker {
129138
}
130139

131140
pub fn get_total_fee_outsite_buffer(&mut self) -> u128 {
132-
self.total_fee - self.get_buffer_fee()
141+
self.total_fee - self.get_buffer_fee().min(self.total_fee)
133142
}
134143

135144
pub fn get_buffer_fee(&mut self) -> u128 {
136-
self.buffer_fee.values_mut().fold(0u128, |acc, expiring| {
137-
acc + expiring.get_sum(&self.buffer_duration)
138-
})
145+
self.buffer_window_fee
146+
.values_mut()
147+
.fold(0u128, |acc, expiring| {
148+
acc + expiring.get_sum(&self.buffer_window_duration)
149+
})
139150
}
140151
}
141152

142153
#[cfg(test)]
143154
mod tests {
144155
use super::SenderFeeTracker;
145-
use std::str::FromStr;
146-
use thegraph_core::Address;
156+
use alloy::primitives::address;
157+
use std::{thread::sleep, time::Duration};
147158

148159
#[test]
149160
fn test_allocation_id_tracker() {
150-
let allocation_id_0: Address =
151-
Address::from_str("0xabababababababababababababababababababab").unwrap();
152-
let allocation_id_1: Address =
153-
Address::from_str("0xbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc").unwrap();
154-
let allocation_id_2: Address =
155-
Address::from_str("0xcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd").unwrap();
161+
let allocation_id_0 = address!("abababababababababababababababababababab");
162+
let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc");
163+
let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd");
156164

157165
let mut tracker = SenderFeeTracker::default();
158166
assert_eq!(tracker.get_heaviest_allocation_id(), None);
@@ -204,4 +212,80 @@ mod tests {
204212
assert_eq!(tracker.get_heaviest_allocation_id(), None);
205213
assert_eq!(tracker.get_total_fee(), 0);
206214
}
215+
216+
#[test]
217+
fn test_buffer_tracker_window() {
218+
let allocation_id_0 = address!("abababababababababababababababababababab");
219+
let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc");
220+
let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd");
221+
222+
const BUFFER_WINDOW: Duration = Duration::from_millis(20);
223+
let mut tracker = SenderFeeTracker::new(BUFFER_WINDOW);
224+
assert_eq!(tracker.get_heaviest_allocation_id(), None);
225+
assert_eq!(tracker.get_total_fee_outsite_buffer(), 0);
226+
assert_eq!(tracker.get_total_fee(), 0);
227+
228+
tracker.add(allocation_id_0, 10);
229+
assert_eq!(tracker.get_heaviest_allocation_id(), None);
230+
assert_eq!(tracker.get_total_fee_outsite_buffer(), 0);
231+
assert_eq!(tracker.get_total_fee(), 10);
232+
233+
sleep(BUFFER_WINDOW);
234+
235+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0));
236+
assert_eq!(tracker.get_total_fee_outsite_buffer(), 10);
237+
assert_eq!(tracker.get_total_fee(), 10);
238+
239+
tracker.add(allocation_id_2, 20);
240+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0));
241+
assert_eq!(tracker.get_total_fee_outsite_buffer(), 10);
242+
assert_eq!(tracker.get_total_fee(), 30);
243+
244+
sleep(BUFFER_WINDOW);
245+
246+
tracker.block_allocation_id(allocation_id_2);
247+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0));
248+
assert_eq!(tracker.get_total_fee_outsite_buffer(), 30);
249+
assert_eq!(tracker.get_total_fee(), 30);
250+
251+
tracker.unblock_allocation_id(allocation_id_2);
252+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2));
253+
254+
tracker.add(allocation_id_1, 30);
255+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2));
256+
assert_eq!(tracker.get_total_fee_outsite_buffer(), 30);
257+
assert_eq!(tracker.get_total_fee(), 60);
258+
259+
sleep(BUFFER_WINDOW);
260+
261+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1));
262+
assert_eq!(tracker.get_total_fee_outsite_buffer(), 60);
263+
assert_eq!(tracker.get_total_fee(), 60);
264+
265+
tracker.add(allocation_id_2, 20);
266+
tracker.update(allocation_id_2, 0);
267+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1));
268+
assert_eq!(tracker.get_total_fee_outsite_buffer(), 20);
269+
assert_eq!(tracker.get_total_fee(), 40);
270+
271+
sleep(BUFFER_WINDOW);
272+
273+
tracker.add(allocation_id_2, 100);
274+
tracker.update(allocation_id_2, 0);
275+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1));
276+
assert_eq!(tracker.get_total_fee_outsite_buffer(), 0);
277+
assert_eq!(tracker.get_total_fee(), 40);
278+
279+
sleep(BUFFER_WINDOW);
280+
281+
tracker.update(allocation_id_1, 0);
282+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0));
283+
assert_eq!(tracker.get_total_fee_outsite_buffer(), 10);
284+
assert_eq!(tracker.get_total_fee(), 10);
285+
286+
tracker.update(allocation_id_0, 0);
287+
assert_eq!(tracker.get_heaviest_allocation_id(), None);
288+
assert_eq!(tracker.get_total_fee_outsite_buffer(), 0);
289+
assert_eq!(tracker.get_total_fee(), 0);
290+
}
207291
}

0 commit comments

Comments
 (0)