Skip to content

Commit 00b332e

Browse files
committed
feat: add tracker for buffer unaggregated fees
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 09b36a6 commit 00b332e

File tree

4 files changed

+120
-31
lines changed

4 files changed

+120
-31
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use tap_core::rav::SignedRAV;
2323
use thegraph_core::Address;
2424
use tracing::{error, Level};
2525

26+
use super::sender_accounts_manager::NewReceiptNotification;
2627
use super::sender_allocation::{SenderAllocation, SenderAllocationArgs};
2728
use crate::agent::sender_allocation::SenderAllocationMessage;
2829
use crate::agent::sender_fee_tracker::SenderFeeTracker;
@@ -79,7 +80,8 @@ type Balance = U256;
7980

8081
#[derive(Debug, Eq, PartialEq)]
8182
pub enum ReceiptFees {
82-
NewValue(UnaggregatedReceipts),
83+
NewReceipt(NewReceiptNotification),
84+
UpdateValue(UnaggregatedReceipts),
8385
Retry,
8486
}
8587

@@ -478,7 +480,9 @@ impl Actor for SenderAccount {
478480
.set(config.tap.rav_request_trigger_value as f64);
479481

480482
let state = State {
481-
sender_fee_tracker: SenderFeeTracker::default(),
483+
sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis(
484+
config.tap.rav_request_timestamp_buffer_ms,
485+
)),
482486
rav_tracker: SenderFeeTracker::default(),
483487
invalid_receipts_tracker: SenderFeeTracker::default(),
484488
allocation_ids: allocation_ids.clone(),
@@ -564,14 +568,30 @@ impl Actor for SenderAccount {
564568
scheduled_rav_request.abort();
565569
}
566570

567-
if let ReceiptFees::NewValue(unaggregated_fees) = receipt_fees {
568-
state
569-
.sender_fee_tracker
570-
.update(allocation_id, unaggregated_fees.value);
571+
match receipt_fees {
572+
ReceiptFees::NewReceipt(receipt) => {
573+
state.sender_fee_tracker.add(allocation_id, receipt.value);
571574

572-
UNAGGREGATED_FEES
573-
.with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()])
574-
.set(unaggregated_fees.value as f64);
575+
UNAGGREGATED_FEES
576+
.with_label_values(&[
577+
&state.sender.to_string(),
578+
&allocation_id.to_string(),
579+
])
580+
.add(receipt.value as f64);
581+
}
582+
ReceiptFees::UpdateValue(unaggregated_fees) => {
583+
state
584+
.sender_fee_tracker
585+
.update(allocation_id, unaggregated_fees.value);
586+
587+
UNAGGREGATED_FEES
588+
.with_label_values(&[
589+
&state.sender.to_string(),
590+
&allocation_id.to_string(),
591+
])
592+
.set(unaggregated_fees.value as f64);
593+
}
594+
ReceiptFees::Retry => {}
575595
}
576596

577597
// Eagerly deny the sender (if needed), before the RAV request. To be sure not to
@@ -582,7 +602,8 @@ impl Actor for SenderAccount {
582602
state.add_to_denylist().await;
583603
}
584604

585-
if state.sender_fee_tracker.get_total_fee()
605+
// TODO if the total_fee_outside_the_buffer
606+
if state.sender_fee_tracker.get_total_fee_outsite_buffer()
586607
>= state.config.tap.rav_request_trigger_value
587608
{
588609
tracing::debug!(
@@ -769,7 +790,7 @@ impl Actor for SenderAccount {
769790
// update the receipt fees by reseting to 0
770791
myself.cast(SenderAccountMessage::UpdateReceiptFees(
771792
allocation_id,
772-
ReceiptFees::NewValue(UnaggregatedReceipts::default()),
793+
ReceiptFees::UpdateValue(UnaggregatedReceipts::default()),
773794
))?;
774795

775796
// rav tracker is not updated because it's still not redeemed
@@ -1191,7 +1212,7 @@ pub mod tests {
11911212
sender_account
11921213
.cast(SenderAccountMessage::UpdateReceiptFees(
11931214
*ALLOCATION_ID_0,
1194-
ReceiptFees::NewValue(UnaggregatedReceipts {
1215+
ReceiptFees::UpdateValue(UnaggregatedReceipts {
11951216
value: TRIGGER_VALUE - 1,
11961217
last_id: 10,
11971218
}),
@@ -1230,7 +1251,7 @@ pub mod tests {
12301251
sender_account
12311252
.cast(SenderAccountMessage::UpdateReceiptFees(
12321253
*ALLOCATION_ID_0,
1233-
ReceiptFees::NewValue(UnaggregatedReceipts {
1254+
ReceiptFees::UpdateValue(UnaggregatedReceipts {
12341255
value: TRIGGER_VALUE,
12351256
last_id: 10,
12361257
}),
@@ -1342,7 +1363,7 @@ pub mod tests {
13421363
sender_account
13431364
.cast(SenderAccountMessage::UpdateReceiptFees(
13441365
*ALLOCATION_ID_0,
1345-
ReceiptFees::NewValue(UnaggregatedReceipts {
1366+
ReceiptFees::UpdateValue(UnaggregatedReceipts {
13461367
value: TRIGGER_VALUE,
13471368
last_id: 11,
13481369
}),
@@ -1388,7 +1409,7 @@ pub mod tests {
13881409
sender_account
13891410
.cast(SenderAccountMessage::UpdateReceiptFees(
13901411
*ALLOCATION_ID_0,
1391-
ReceiptFees::NewValue(UnaggregatedReceipts {
1412+
ReceiptFees::UpdateValue(UnaggregatedReceipts {
13921413
value: $value,
13931414
last_id: 11,
13941415
}),
@@ -1529,7 +1550,7 @@ pub mod tests {
15291550
sender_account
15301551
.cast(SenderAccountMessage::UpdateReceiptFees(
15311552
*ALLOCATION_ID_0,
1532-
ReceiptFees::NewValue(UnaggregatedReceipts {
1553+
ReceiptFees::UpdateValue(UnaggregatedReceipts {
15331554
value: $value,
15341555
last_id: 11,
15351556
}),
@@ -1730,7 +1751,7 @@ pub mod tests {
17301751
sender_account
17311752
.cast(SenderAccountMessage::UpdateReceiptFees(
17321753
*ALLOCATION_ID_0,
1733-
ReceiptFees::NewValue(UnaggregatedReceipts {
1754+
ReceiptFees::UpdateValue(UnaggregatedReceipts {
17341755
value: TRIGGER_VALUE,
17351756
last_id: 11,
17361757
}),

tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ lazy_static! {
3434
.unwrap();
3535
}
3636

37-
#[derive(Deserialize, Debug)]
37+
#[derive(Deserialize, Debug, PartialEq, Eq)]
3838
pub struct NewReceiptNotification {
3939
pub id: u64,
4040
pub allocation_id: Address,

tap-agent/src/agent/sender_allocation.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ impl Actor for SenderAllocation {
161161

162162
sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees(
163163
allocation_id,
164-
ReceiptFees::NewValue(state.unaggregated_fees.clone()),
164+
ReceiptFees::UpdateValue(state.unaggregated_fees.clone()),
165165
))?;
166166

167167
// update rav tracker for sender account
@@ -225,9 +225,10 @@ impl Actor for SenderAllocation {
225225
);
226226
let unaggregated_fees = &mut state.unaggregated_fees;
227227
match message {
228-
SenderAllocationMessage::NewReceipt(NewReceiptNotification {
229-
id, value: fees, ..
230-
}) => {
228+
SenderAllocationMessage::NewReceipt(notification) => {
229+
let NewReceiptNotification {
230+
id, value: fees, ..
231+
} = notification;
231232
if id > unaggregated_fees.last_id {
232233
unaggregated_fees.last_id = id;
233234
unaggregated_fees.value = unaggregated_fees
@@ -248,7 +249,7 @@ impl Actor for SenderAllocation {
248249
.sender_account_ref
249250
.cast(SenderAccountMessage::UpdateReceiptFees(
250251
state.allocation_id,
251-
ReceiptFees::NewValue(unaggregated_fees.clone()),
252+
ReceiptFees::NewReceipt(notification),
252253
))?;
253254
}
254255
}
@@ -974,7 +975,7 @@ pub mod tests {
974975
// Should emit a message to the sender account with the unaggregated fees.
975976
let expected_message = SenderAccountMessage::UpdateReceiptFees(
976977
*ALLOCATION_ID_0,
977-
ReceiptFees::NewValue(UnaggregatedReceipts {
978+
ReceiptFees::UpdateValue(UnaggregatedReceipts {
978979
last_id: 10,
979980
value: 55u128,
980981
}),
@@ -1073,7 +1074,7 @@ pub mod tests {
10731074
// should emit update aggregate fees message to sender account
10741075
let expected_message = SenderAccountMessage::UpdateReceiptFees(
10751076
*ALLOCATION_ID_0,
1076-
ReceiptFees::NewValue(UnaggregatedReceipts {
1077+
ReceiptFees::UpdateValue(UnaggregatedReceipts {
10771078
last_id: 1,
10781079
value: 20,
10791080
}),
@@ -1191,7 +1192,7 @@ pub mod tests {
11911192
last_message_emitted.lock().unwrap().last(),
11921193
Some(&SenderAccountMessage::UpdateReceiptFees(
11931194
*ALLOCATION_ID_0,
1194-
ReceiptFees::NewValue(UnaggregatedReceipts::default())
1195+
ReceiptFees::UpdateValue(UnaggregatedReceipts::default())
11951196
))
11961197
);
11971198
}

tap-agent/src/agent/sender_fee_tracker.rs

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,66 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use alloy::primitives::Address;
5-
use std::collections::{HashMap, HashSet};
5+
use std::{
6+
collections::{HashMap, HashSet, VecDeque},
7+
time::{Duration, Instant},
8+
};
69
use tracing::error;
710

11+
#[derive(Debug, Clone, Default)]
12+
struct ExpiringSum {
13+
entries: VecDeque<(Instant, u128)>,
14+
sum: u128,
15+
}
16+
17+
impl ExpiringSum {
18+
fn get_sum(&mut self, duration: &Duration) -> u128 {
19+
let now = Instant::now();
20+
while let Some(&(timestamp, value)) = self.entries.front() {
21+
if now.duration_since(timestamp) >= *duration {
22+
self.entries.pop_front();
23+
self.sum -= value;
24+
} else {
25+
break;
26+
}
27+
}
28+
self.sum
29+
}
30+
}
31+
832
#[derive(Debug, Clone, Default)]
933
pub struct SenderFeeTracker {
1034
id_to_fee: HashMap<Address, u128>,
1135
total_fee: u128,
36+
37+
buffer_fee: HashMap<Address, ExpiringSum>,
38+
39+
buffer_duration: Duration,
1240
// there are some allocations that we don't want it to be
1341
// heaviest allocation, because they are already marked for finalization,
1442
// and thus requesting RAVs on their own in their `post_stop` routine.
1543
blocked_addresses: HashSet<Address>,
1644
}
1745

1846
impl SenderFeeTracker {
47+
pub fn new(buffer_duration: Duration) -> Self {
48+
Self {
49+
buffer_duration,
50+
..Default::default()
51+
}
52+
}
53+
pub fn add(&mut self, id: Address, value: u128) {
54+
if self.buffer_duration > Duration::ZERO {
55+
let now = Instant::now();
56+
let expiring_sum = self.buffer_fee.entry(id).or_default();
57+
expiring_sum.entries.push_back((now, value));
58+
expiring_sum.sum += value;
59+
}
60+
self.total_fee += value;
61+
let entry = self.id_to_fee.entry(id).or_default();
62+
*entry += value;
63+
}
64+
1965
pub fn update(&mut self, id: Address, fee: u128) {
2066
if fee > 0 {
2167
// insert or update, if update remove old fee from total
@@ -44,20 +90,31 @@ impl SenderFeeTracker {
4490
self.blocked_addresses.remove(&address);
4591
}
4692

47-
pub fn get_heaviest_allocation_id(&self) -> Option<Address> {
93+
pub fn get_heaviest_allocation_id(&mut self) -> Option<Address> {
4894
// just loop over and get the biggest fee
4995
self.id_to_fee
5096
.iter()
5197
.filter(|(addr, _)| !self.blocked_addresses.contains(*addr))
98+
// map to the value minus fees in buffer
99+
.map(|(addr, fee)| {
100+
(
101+
addr,
102+
fee - self
103+
.buffer_fee
104+
.get_mut(addr)
105+
.map(|expiring| expiring.get_sum(&self.buffer_duration))
106+
.unwrap_or_default(),
107+
)
108+
})
52109
.fold(None, |acc: Option<(&Address, u128)>, (addr, fee)| {
53110
if let Some((_, max_fee)) = acc {
54-
if *fee > max_fee {
55-
Some((addr, *fee))
111+
if fee > max_fee {
112+
Some((addr, fee))
56113
} else {
57114
acc
58115
}
59116
} else {
60-
Some((addr, *fee))
117+
Some((addr, fee))
61118
}
62119
})
63120
.map(|(&id, _)| id)
@@ -70,6 +127,16 @@ impl SenderFeeTracker {
70127
pub fn get_total_fee(&self) -> u128 {
71128
self.total_fee
72129
}
130+
131+
pub fn get_total_fee_outsite_buffer(&mut self) -> u128 {
132+
self.total_fee - self.get_buffer_fee()
133+
}
134+
135+
pub fn get_buffer_fee(&mut self) -> u128 {
136+
self.buffer_fee.values_mut().fold(0u128, |acc, expiring| {
137+
acc + expiring.get_sum(&self.buffer_duration)
138+
})
139+
}
73140
}
74141

75142
#[cfg(test)]

0 commit comments

Comments
 (0)