Skip to content

Commit e71f891

Browse files
committed
wip
1 parent 8fbe4f2 commit e71f891

File tree

4 files changed

+138
-20
lines changed

4 files changed

+138
-20
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use tracing::{error, Level};
2828
use super::sender_allocation::{SenderAllocation, SenderAllocationArgs};
2929
use crate::agent::sender_allocation::SenderAllocationMessage;
3030
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
31-
use crate::tracker::{DurationInfo, SenderFeeStats, SimpleFeeTracker};
31+
use crate::tracker::{DurationInfo, GlobalFeeTracker, SenderFeeStats, SimpleFeeTracker};
3232
use crate::{
3333
config::{self},
3434
tap::escrow_adapter::EscrowAdapter,
@@ -87,7 +87,7 @@ pub enum ReceiptFees {
8787
Retry,
8888
}
8989

90-
type SenderFeeTracker = SimpleFeeTracker<SenderFeeStats, DurationInfo>;
90+
type SenderFeeTracker = SimpleFeeTracker<SenderFeeStats, DurationInfo, GlobalFeeTracker>;
9191

9292
#[derive(Debug)]
9393
pub enum SenderAccountMessage {

tap-agent/src/tracker.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ mod sender_fee_stats;
1111
#[cfg(test)]
1212
mod tracker_tests;
1313

14-
pub type SimpleFeeTracker<T, E = NoExtraData> = GenericTracker<T, E>;
14+
pub use generic_tracker::GlobalFeeTracker;
15+
16+
pub type SimpleFeeTracker<T, E = NoExtraData, G = u128> = GenericTracker<T, E, G>;
1517

1618
pub trait AllocationStats<G: GlobalTracker> {
1719
fn update(&mut self, v: G);

tap-agent/src/tracker/generic_tracker.rs

Lines changed: 79 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::{AllocationStats, DefaultFromExtra, DurationInfo, SenderFeeStats};
22
use alloy::primitives::Address;
33
use std::{
44
collections::{HashMap, HashSet},
5-
ops::AddAssign,
5+
ops::{AddAssign, SubAssign},
66
time::{Duration, Instant},
77
};
88

@@ -16,24 +16,30 @@ impl GlobalTracker for u128 {
1616
}
1717
}
1818

19-
type GlobalFeeTracker = u128;
19+
pub struct GlobalFeeTracker {
20+
count: u128,
21+
requesting: u128,
22+
total_fee: u128,
23+
}
2024

2125
#[derive(Debug, Clone, Default)]
22-
pub struct GenericTracker<F, E> {
26+
pub struct GenericTracker<F, E, G> {
2327
pub(super) id_to_fee: HashMap<Address, F>,
24-
pub(super) total_fee: u128,
28+
pub(super) total_fee: G,
2529
pub(super) extra_data: E,
30+
pub(super) fees_requesting: u128,
2631
}
2732

28-
impl<F, E> GenericTracker<F, E>
33+
impl<F, E, G> GenericTracker<F, E, G>
2934
where
30-
F: AddAssign<GlobalFeeTracker> + AllocationStats<GlobalFeeTracker> + DefaultFromExtra<E>,
35+
G: GlobalTracker + SubAssign + AddAssign + PartialOrd + Copy,
36+
F: AddAssign<G> + AllocationStats<G> + DefaultFromExtra<E>,
3137
{
3238
/// Updates and overwrite the fee counter into the specific
3339
/// value provided.
3440
///
3541
/// IMPORTANT: This function does not affect the buffer window fee
36-
pub fn update(&mut self, id: Address, value: u128) {
42+
pub fn update(&mut self, id: Address, value: G) {
3743
if !value.should_remove() {
3844
// insert or update, if update remove old fee from total
3945
let fee = self
@@ -53,7 +59,7 @@ where
5359
self.id_to_fee
5460
.iter_mut()
5561
.filter(|(_, fee)| !fee.should_filter_out())
56-
.fold(None, |acc: Option<(&Address, u128)>, (addr, value)| {
62+
.fold(None, |acc: Option<(&Address, G)>, (addr, value)| {
5763
if let Some((_, max_fee)) = acc {
5864
if value.get_valid_fee() > max_fee {
5965
Some((addr, value.get_valid_fee()))
@@ -64,25 +70,29 @@ where
6470
Some((addr, value.get_valid_fee()))
6571
}
6672
})
67-
.filter(|(_, fee)| *fee > 0)
73+
.filter(|(_, fee)| !fee.should_remove())
6874
.map(|(&id, _)| id)
6975
}
7076

7177
pub fn get_list_of_allocation_ids(&self) -> HashSet<Address> {
7278
self.id_to_fee.keys().cloned().collect()
7379
}
7480

75-
pub fn get_total_fee(&self) -> u128 {
81+
pub fn get_total_fee(&self) -> G {
7682
self.total_fee
7783
}
7884
}
7985

80-
impl GenericTracker<SenderFeeStats, DurationInfo> {
86+
impl<G> GenericTracker<SenderFeeStats, DurationInfo, G>
87+
where
88+
G: Default,
89+
{
8190
pub fn new(buffer_duration: Duration) -> Self {
8291
Self {
8392
extra_data: DurationInfo { buffer_duration },
84-
total_fee: 0,
93+
total_fee: Default::default(),
8594
id_to_fee: Default::default(),
95+
fees_requesting: 0,
8696
}
8797
}
8898

@@ -91,7 +101,7 @@ impl GenericTracker<SenderFeeStats, DurationInfo> {
91101
/// It's important to notice that `value` cannot be less than
92102
/// zero, so the only way to make this counter lower is by using
93103
/// `update` function
94-
pub fn add(&mut self, id: Address, value: u128) {
104+
pub fn add(&mut self, id: Address, value: G) {
95105
let entry = self
96106
.id_to_fee
97107
.entry(id)
@@ -114,9 +124,56 @@ impl GenericTracker<SenderFeeStats, DurationInfo> {
114124
.values_mut()
115125
.fold(0u128, |acc, expiring| acc + expiring.get_sum())
116126
}
127+
128+
pub fn get_total_counter_outside_buffer_for_allocation(
129+
&mut self,
130+
allocation_id: &Address,
131+
) -> u64 {
132+
let Some(allocation_info) = self.id_to_fee.get_mut(allocation_id) else {
133+
return 0;
134+
};
135+
let allocation_counter = allocation_info.count;
136+
let counter_in_buffer = allocation_info.get_count();
137+
allocation_counter - counter_in_buffer
138+
}
139+
140+
pub fn start_rav_request(&mut self, allocation_id: Address) {
141+
let entry = self
142+
.id_to_fee
143+
.entry(allocation_id)
144+
.or_insert(SenderFeeStats::default_from_extra(&self.extra_data));
145+
entry.requesting = true;
146+
self.fees_requesting += entry.total_fee;
147+
}
148+
149+
/// Should be called before `update`
150+
pub fn finish_rav_request(&mut self, allocation_id: Address) {
151+
let entry = self
152+
.id_to_fee
153+
.entry(allocation_id)
154+
.or_insert(SenderFeeStats::default_from_extra(&self.extra_data));
155+
entry.requesting = false;
156+
self.fees_requesting -= entry.total_fee;
157+
}
158+
159+
pub fn failed_rav_backoff(&mut self, allocation_id: Address) {
160+
let entry = self
161+
.id_to_fee
162+
.entry(allocation_id)
163+
.or_insert(SenderFeeStats::default_from_extra(&self.extra_data));
164+
entry.failed_info.fail();
165+
}
166+
167+
pub fn ok_rav_request(&mut self, allocation_id: Address) {
168+
let entry = self
169+
.id_to_fee
170+
.entry(allocation_id)
171+
.or_insert(SenderFeeStats::default_from_extra(&self.extra_data));
172+
entry.failed_info.ok();
173+
}
117174
}
118175

119-
impl<E> GenericTracker<SenderFeeStats, E> {
176+
impl<E, G> GenericTracker<SenderFeeStats, E, G> {
120177
pub fn block_allocation_id(&mut self, address: Address) {
121178
self.id_to_fee.entry(address).and_modify(|v| {
122179
v.blocked = true;
@@ -128,9 +185,16 @@ impl<E> GenericTracker<SenderFeeStats, E> {
128185
v.blocked = false;
129186
});
130187
}
188+
189+
pub fn check_allocation_has_rav_request_running(&self, allocation_id: Address) -> bool {
190+
self.id_to_fee
191+
.get(&allocation_id)
192+
.map(|alloc| alloc.requesting)
193+
.unwrap_or_default()
194+
}
131195
}
132196

133-
impl AllocationStats<GlobalFeeTracker> for u128 {
197+
impl AllocationStats<u128> for u128 {
134198
fn update(&mut self, v: u128) {
135199
*self = v;
136200
}

tap-agent/src/tracker/sender_fee_stats.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,38 @@ use super::{AllocationStats, DefaultFromExtra, DurationInfo};
1010
#[derive(Debug, Clone, Default)]
1111
pub struct SenderFeeStats {
1212
pub(super) total_fee: u128,
13+
pub(super) count: u64,
1314
// there are some allocations that we don't want it to be
1415
// heaviest allocation, because they are already marked for finalization,
1516
// and thus requesting RAVs on their own in their `post_stop` routine.
1617
pub(super) blocked: bool,
1718

19+
pub(super) requesting: bool,
20+
1821
// buffer
1922
pub(super) entries: VecDeque<(Instant, u128)>,
2023
pub(super) fee_in_buffer: u128,
2124
pub(super) duration: Duration,
25+
pub(super) failed_info: FailedRavInfo,
26+
}
27+
#[derive(Debug, Clone)]
28+
pub struct FailedRavInfo {
29+
failed_ravs_count: u32,
30+
failed_rav_backoff_time: Instant,
31+
}
32+
33+
impl FailedRavInfo {
34+
pub fn ok(&mut self) {
35+
self.failed_ravs_count = 0;
36+
}
37+
38+
pub fn fail(&mut self) {
39+
// backoff = max(100ms * 2 ^ retries, 60s)
40+
self.failed_rav_backoff_time = Instant::now()
41+
+ (Duration::from_millis(100) * 2u32.pow(self.failed_ravs_count))
42+
.min(Duration::from_secs(60));
43+
self.failed_ravs_count += 1;
44+
}
2245
}
2346

2447
impl DefaultFromExtra<DurationInfo> for SenderFeeStats {
@@ -30,6 +53,15 @@ impl DefaultFromExtra<DurationInfo> for SenderFeeStats {
3053
}
3154
}
3255

56+
impl Default for FailedRavInfo {
57+
fn default() -> Self {
58+
Self {
59+
failed_ravs_count: 0,
60+
failed_rav_backoff_time: Instant::now(),
61+
}
62+
}
63+
}
64+
3365
impl SenderFeeStats {
3466
pub(super) fn get_sum(&mut self) -> u128 {
3567
let now = Instant::now();
@@ -43,15 +75,35 @@ impl SenderFeeStats {
4375
}
4476
self.fee_in_buffer
4577
}
78+
79+
pub(super) fn get_count(&mut self) -> u64 {
80+
self.cleanup();
81+
self.entries.len() as u64
82+
}
83+
84+
pub(super) fn cleanup(&mut self) {
85+
let now = Instant::now();
86+
while let Some(&(timestamp, value)) = self.entries.front() {
87+
if now.duration_since(timestamp) >= self.duration {
88+
self.entries.pop_front();
89+
self.fee_in_buffer -= value;
90+
} else {
91+
break;
92+
}
93+
}
94+
}
4695
}
4796

48-
impl AllocationStats<u128> for SenderFeeStats {
97+
impl AllocationStats<GlobalFeeTracker> for SenderFeeStats {
4998
fn update(&mut self, v: u128) {
5099
self.total_fee = v;
51100
}
52101

53102
fn should_filter_out(&self) -> bool {
54-
self.blocked
103+
let now = Instant::now();
104+
let in_backoff = now > self.failed_info.failed_rav_backoff_time;
105+
106+
in_backoff || self.blocked || self.requesting
55107
}
56108

57109
fn get_fee(&self) -> u128 {

0 commit comments

Comments
 (0)