Skip to content

Commit 8fbe4f2

Browse files
committed
wip
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 4592c42 commit 8fbe4f2

File tree

9 files changed

+431
-11
lines changed

9 files changed

+431
-11
lines changed

tap-agent/src/agent.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use sender_accounts_manager::SenderAccountsManager;
2121
pub mod sender_account;
2222
pub mod sender_accounts_manager;
2323
pub mod sender_allocation;
24-
pub mod sender_fee_tracker;
2524
pub mod unaggregated_receipts;
2625

2726
pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandle<()>) {

tap-agent/src/agent/sender_account.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,9 @@ use tap_core::rav::SignedRAV;
2626
use tracing::{error, Level};
2727

2828
use super::sender_allocation::{SenderAllocation, SenderAllocationArgs};
29-
use super::sender_fee_tracker::{BufferedReceiptFee, DurationInfo};
3029
use crate::agent::sender_allocation::SenderAllocationMessage;
31-
use crate::agent::sender_fee_tracker::SenderFeeTracker;
3230
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
31+
use crate::tracker::{DurationInfo, SenderFeeStats, SimpleFeeTracker};
3332
use crate::{
3433
config::{self},
3534
tap::escrow_adapter::EscrowAdapter,
@@ -88,6 +87,8 @@ pub enum ReceiptFees {
8887
Retry,
8988
}
9089

90+
type SenderFeeTracker = SimpleFeeTracker<SenderFeeStats, DurationInfo>;
91+
9192
#[derive(Debug)]
9293
pub enum SenderAccountMessage {
9394
UpdateBalanceAndLastRavs(Balance, RavMap),
@@ -97,7 +98,7 @@ pub enum SenderAccountMessage {
9798
UpdateInvalidReceiptFees(Address, UnaggregatedReceipts),
9899
UpdateRav(SignedRAV),
99100
#[cfg(test)]
100-
GetSenderFeeTracker(ractor::RpcReplyPort<SenderFeeTracker<BufferedReceiptFee, DurationInfo>>),
101+
GetSenderFeeTracker(ractor::RpcReplyPort<SenderFeeTracker>),
101102
#[cfg(test)]
102103
GetDeny(ractor::RpcReplyPort<bool>),
103104
#[cfg(test)]
@@ -131,9 +132,9 @@ pub struct SenderAccountArgs {
131132
}
132133
pub struct State {
133134
prefix: Option<String>,
134-
sender_fee_tracker: SenderFeeTracker<BufferedReceiptFee, DurationInfo>,
135-
rav_tracker: SenderFeeTracker<u128>,
136-
invalid_receipts_tracker: SenderFeeTracker<u128>,
135+
sender_fee_tracker: SenderFeeTracker,
136+
rav_tracker: SimpleFeeTracker<u128>,
137+
invalid_receipts_tracker: SimpleFeeTracker<u128>,
137138
allocation_ids: HashSet<Address>,
138139
_indexer_allocations_handle: PipeHandle,
139140
_escrow_account_monitor: PipeHandle,
@@ -467,12 +468,12 @@ impl Actor for SenderAccount {
467468
.build(&sender_aggregator_endpoint)?;
468469

469470
let state = State {
470-
sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis(
471+
sender_fee_tracker: SimpleFeeTracker::new(Duration::from_millis(
471472
config.tap.rav_request_timestamp_buffer_ms,
472473
)),
473474
// sender_fee_tracker: SenderFeeTracker::new(),
474-
rav_tracker: SenderFeeTracker::default(),
475-
invalid_receipts_tracker: SenderFeeTracker::default(),
475+
rav_tracker: SimpleFeeTracker::default(),
476+
invalid_receipts_tracker: SimpleFeeTracker::default(),
476477
allocation_ids: allocation_ids.clone(),
477478
_indexer_allocations_handle,
478479
_escrow_account_monitor,

tap-agent/src/agent/sender_allocation.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,12 @@ impl Actor for SenderAllocation {
196196
}
197197

198198
while let Err(err) = state.mark_rav_last().await {
199-
error!(error = %err, %state.allocation_id, %state.sender, "Error while marking allocation last. Retrying in 30 seconds...");
199+
error!(
200+
error = %err,
201+
%state.allocation_id,
202+
%state.sender,
203+
"Error while marking allocation last. Retrying in 30 seconds..."
204+
);
200205
tokio::time::sleep(Duration::from_secs(30)).await;
201206
}
202207

tap-agent/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ pub mod config;
2020
pub mod database;
2121
pub mod metrics;
2222
pub mod tap;
23+
pub mod tracker;

tap-agent/src/tracker.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
pub use extra_data::{DefaultFromExtra, DurationInfo, NoExtraData};
5+
use generic_tracker::{GenericTracker, GlobalTracker};
6+
pub use sender_fee_stats::SenderFeeStats;
7+
8+
mod extra_data;
9+
mod generic_tracker;
10+
mod sender_fee_stats;
11+
#[cfg(test)]
12+
mod tracker_tests;
13+
14+
pub type SimpleFeeTracker<T, E = NoExtraData> = GenericTracker<T, E>;
15+
16+
pub trait AllocationStats<G: GlobalTracker> {
17+
fn update(&mut self, v: G);
18+
fn should_filter_out(&self) -> bool;
19+
fn get_fee(&self) -> G;
20+
fn get_valid_fee(&mut self) -> G;
21+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use std::time::Duration;
2+
3+
pub trait DefaultFromExtra<E> {
4+
fn default_from_extra(extra: &E) -> Self;
5+
}
6+
7+
#[derive(Debug, Clone)]
8+
pub struct DurationInfo {
9+
pub(super) buffer_duration: Duration,
10+
}
11+
12+
#[derive(Debug, Clone, Default)]
13+
pub struct NoExtraData;
14+
15+
impl<T> DefaultFromExtra<NoExtraData> for T
16+
where
17+
T: Default,
18+
{
19+
fn default_from_extra(_: &NoExtraData) -> Self {
20+
Default::default()
21+
}
22+
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
use super::{AllocationStats, DefaultFromExtra, DurationInfo, SenderFeeStats};
2+
use alloy::primitives::Address;
3+
use std::{
4+
collections::{HashMap, HashSet},
5+
ops::AddAssign,
6+
time::{Duration, Instant},
7+
};
8+
9+
pub trait GlobalTracker {
10+
fn should_remove(&self) -> bool;
11+
}
12+
13+
impl GlobalTracker for u128 {
14+
fn should_remove(&self) -> bool {
15+
*self == 0
16+
}
17+
}
18+
19+
type GlobalFeeTracker = u128;
20+
21+
#[derive(Debug, Clone, Default)]
22+
pub struct GenericTracker<F, E> {
23+
pub(super) id_to_fee: HashMap<Address, F>,
24+
pub(super) total_fee: u128,
25+
pub(super) extra_data: E,
26+
}
27+
28+
impl<F, E> GenericTracker<F, E>
29+
where
30+
F: AddAssign<GlobalFeeTracker> + AllocationStats<GlobalFeeTracker> + DefaultFromExtra<E>,
31+
{
32+
/// Updates and overwrite the fee counter into the specific
33+
/// value provided.
34+
///
35+
/// IMPORTANT: This function does not affect the buffer window fee
36+
pub fn update(&mut self, id: Address, value: u128) {
37+
if !value.should_remove() {
38+
// insert or update, if update remove old fee from total
39+
let fee = self
40+
.id_to_fee
41+
.entry(id)
42+
.or_insert(F::default_from_extra(&self.extra_data));
43+
self.total_fee -= fee.get_fee();
44+
fee.update(value);
45+
self.total_fee += value;
46+
} else if let Some(old_fee) = self.id_to_fee.remove(&id) {
47+
self.total_fee -= old_fee.get_fee();
48+
}
49+
}
50+
51+
pub fn get_heaviest_allocation_id(&mut self) -> Option<Address> {
52+
// just loop over and get the biggest fee
53+
self.id_to_fee
54+
.iter_mut()
55+
.filter(|(_, fee)| !fee.should_filter_out())
56+
.fold(None, |acc: Option<(&Address, u128)>, (addr, value)| {
57+
if let Some((_, max_fee)) = acc {
58+
if value.get_valid_fee() > max_fee {
59+
Some((addr, value.get_valid_fee()))
60+
} else {
61+
acc
62+
}
63+
} else {
64+
Some((addr, value.get_valid_fee()))
65+
}
66+
})
67+
.filter(|(_, fee)| *fee > 0)
68+
.map(|(&id, _)| id)
69+
}
70+
71+
pub fn get_list_of_allocation_ids(&self) -> HashSet<Address> {
72+
self.id_to_fee.keys().cloned().collect()
73+
}
74+
75+
pub fn get_total_fee(&self) -> u128 {
76+
self.total_fee
77+
}
78+
}
79+
80+
impl GenericTracker<SenderFeeStats, DurationInfo> {
81+
pub fn new(buffer_duration: Duration) -> Self {
82+
Self {
83+
extra_data: DurationInfo { buffer_duration },
84+
total_fee: 0,
85+
id_to_fee: Default::default(),
86+
}
87+
}
88+
89+
/// Adds into the total_fee entry and buffer window totals
90+
///
91+
/// It's important to notice that `value` cannot be less than
92+
/// zero, so the only way to make this counter lower is by using
93+
/// `update` function
94+
pub fn add(&mut self, id: Address, value: u128) {
95+
let entry = self
96+
.id_to_fee
97+
.entry(id)
98+
.or_insert(SenderFeeStats::default_from_extra(&self.extra_data));
99+
self.total_fee += value;
100+
*entry += value;
101+
if self.extra_data.buffer_duration > Duration::ZERO {
102+
let now = Instant::now();
103+
entry.entries.push_back((now, value));
104+
entry.fee_in_buffer += value;
105+
}
106+
}
107+
108+
pub fn get_total_fee_outside_buffer(&mut self) -> u128 {
109+
self.total_fee - self.get_buffer_fee().min(self.total_fee)
110+
}
111+
112+
pub fn get_buffer_fee(&mut self) -> u128 {
113+
self.id_to_fee
114+
.values_mut()
115+
.fold(0u128, |acc, expiring| acc + expiring.get_sum())
116+
}
117+
}
118+
119+
impl<E> GenericTracker<SenderFeeStats, E> {
120+
pub fn block_allocation_id(&mut self, address: Address) {
121+
self.id_to_fee.entry(address).and_modify(|v| {
122+
v.blocked = true;
123+
});
124+
}
125+
126+
pub fn unblock_allocation_id(&mut self, address: Address) {
127+
self.id_to_fee.entry(address).and_modify(|v| {
128+
v.blocked = false;
129+
});
130+
}
131+
}
132+
133+
impl AllocationStats<GlobalFeeTracker> for u128 {
134+
fn update(&mut self, v: u128) {
135+
*self = v;
136+
}
137+
138+
fn should_filter_out(&self) -> bool {
139+
*self == 0
140+
}
141+
142+
fn get_fee(&self) -> u128 {
143+
*self
144+
}
145+
146+
fn get_valid_fee(&mut self) -> u128 {
147+
self.get_fee()
148+
}
149+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use std::{
2+
collections::VecDeque,
3+
ops::AddAssign,
4+
time::{Duration, Instant},
5+
};
6+
use tracing::error;
7+
8+
use super::{AllocationStats, DefaultFromExtra, DurationInfo};
9+
10+
#[derive(Debug, Clone, Default)]
11+
pub struct SenderFeeStats {
12+
pub(super) total_fee: u128,
13+
// there are some allocations that we don't want it to be
14+
// heaviest allocation, because they are already marked for finalization,
15+
// and thus requesting RAVs on their own in their `post_stop` routine.
16+
pub(super) blocked: bool,
17+
18+
// buffer
19+
pub(super) entries: VecDeque<(Instant, u128)>,
20+
pub(super) fee_in_buffer: u128,
21+
pub(super) duration: Duration,
22+
}
23+
24+
impl DefaultFromExtra<DurationInfo> for SenderFeeStats {
25+
fn default_from_extra(extra: &DurationInfo) -> Self {
26+
SenderFeeStats {
27+
duration: extra.buffer_duration,
28+
..Default::default()
29+
}
30+
}
31+
}
32+
33+
impl SenderFeeStats {
34+
pub(super) fn get_sum(&mut self) -> u128 {
35+
let now = Instant::now();
36+
while let Some(&(timestamp, value)) = self.entries.front() {
37+
if now.duration_since(timestamp) >= self.duration {
38+
self.entries.pop_front();
39+
self.fee_in_buffer -= value;
40+
} else {
41+
break;
42+
}
43+
}
44+
self.fee_in_buffer
45+
}
46+
}
47+
48+
impl AllocationStats<u128> for SenderFeeStats {
49+
fn update(&mut self, v: u128) {
50+
self.total_fee = v;
51+
}
52+
53+
fn should_filter_out(&self) -> bool {
54+
self.blocked
55+
}
56+
57+
fn get_fee(&self) -> u128 {
58+
self.total_fee
59+
}
60+
61+
fn get_valid_fee(&mut self) -> u128 {
62+
self.get_fee() - self.get_sum().min(self.total_fee)
63+
}
64+
}
65+
66+
impl AddAssign<u128> for SenderFeeStats {
67+
fn add_assign(&mut self, rhs: u128) {
68+
self.total_fee += rhs;
69+
70+
self.total_fee = self.total_fee.checked_add(rhs).unwrap_or_else(|| {
71+
// This should never happen, but if it does, we want to know about it.
72+
error!(
73+
"Overflow when adding receipt value {} to total fee {}. \
74+
Setting total fee to u128::MAX.",
75+
self.total_fee, rhs
76+
);
77+
u128::MAX
78+
});
79+
}
80+
}

0 commit comments

Comments
 (0)