Skip to content

Commit 0019608

Browse files
committed
refactor: add timestamp_ns from receipt
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 6c0a744 commit 0019608

File tree

5 files changed

+113
-76
lines changed

5 files changed

+113
-76
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ type Balance = U256;
8181

8282
#[derive(Debug)]
8383
pub enum ReceiptFees {
84-
NewReceipt(u128),
84+
NewReceipt(u128, u64),
8585
UpdateValue(UnaggregatedReceipts),
8686
RavRequestResponse(anyhow::Result<(UnaggregatedReceipts, Option<SignedRAV>)>),
8787
Retry,
@@ -594,7 +594,7 @@ impl Actor for SenderAccount {
594594
}
595595

596596
match receipt_fees {
597-
ReceiptFees::NewReceipt(value) => {
597+
ReceiptFees::NewReceipt(value, timestamp_ns) => {
598598
// If state is denied and received new receipt, sender was removed manually from DB
599599
if state.denied {
600600
tracing::warn!(
@@ -609,7 +609,9 @@ impl Actor for SenderAccount {
609609
}
610610

611611
// add new value
612-
state.sender_fee_tracker.add(allocation_id, value);
612+
state
613+
.sender_fee_tracker
614+
.add(allocation_id, value, timestamp_ns);
613615
UNAGGREGATED_FEES
614616
.with_label_values(&[
615617
&state.sender.to_string(),
@@ -640,8 +642,7 @@ impl Actor for SenderAccount {
640642
let counter_greater_receipt_limit = total_counter_for_allocation
641643
>= state.config.tap.rav_request_receipt_limit
642644
&& can_trigger_rav;
643-
let total_fee_outside_buffer =
644-
state.sender_fee_tracker.get_total_fee_outside_buffer();
645+
let total_fee_outside_buffer = state.sender_fee_tracker.get_ravable_total_fee();
645646
let total_fee_greater_trigger_value =
646647
total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value;
647648
let rav_result = match (
@@ -917,7 +918,7 @@ pub mod tests {
917918
use std::collections::{HashMap, HashSet};
918919
use std::sync::atomic::AtomicU32;
919920
use std::sync::{Arc, Mutex};
920-
use std::time::Duration;
921+
use std::time::{Duration, SystemTime, UNIX_EPOCH};
921922
use wiremock::matchers::{body_string_contains, method};
922923
use wiremock::{Mock, MockServer, ResponseTemplate};
923924

@@ -931,7 +932,9 @@ pub mod tests {
931932
(Self::UpdateReceiptFees(l0, l1), Self::UpdateReceiptFees(r0, r1)) => {
932933
l0 == r0
933934
&& match (l1, r1) {
934-
(ReceiptFees::NewReceipt(l), ReceiptFees::NewReceipt(r)) => r == l,
935+
(ReceiptFees::NewReceipt(l1, l2), ReceiptFees::NewReceipt(r1, r2)) => {
936+
r1 == l1 && r2 == l2
937+
}
935938
(ReceiptFees::UpdateValue(l), ReceiptFees::UpdateValue(r)) => r == l,
936939
(
937940
ReceiptFees::RavRequestResponse(l),
@@ -1278,6 +1281,13 @@ pub mod tests {
12781281
)
12791282
}
12801283

1284+
fn get_current_timestamp_u64_ns() -> u64 {
1285+
SystemTime::now()
1286+
.duration_since(UNIX_EPOCH)
1287+
.unwrap()
1288+
.as_nanos() as u64
1289+
}
1290+
12811291
#[sqlx::test(migrations = "../migrations")]
12821292
async fn test_update_receipt_fees_no_rav(pgpool: PgPool) {
12831293
let (sender_account, handle, prefix, _) = create_sender_account(
@@ -1303,7 +1313,7 @@ pub mod tests {
13031313
sender_account
13041314
.cast(SenderAccountMessage::UpdateReceiptFees(
13051315
*ALLOCATION_ID_0,
1306-
ReceiptFees::NewReceipt(TRIGGER_VALUE - 1),
1316+
ReceiptFees::NewReceipt(TRIGGER_VALUE - 1, get_current_timestamp_u64_ns()),
13071317
))
13081318
.unwrap();
13091319

@@ -1346,7 +1356,7 @@ pub mod tests {
13461356
sender_account
13471357
.cast(SenderAccountMessage::UpdateReceiptFees(
13481358
*ALLOCATION_ID_0,
1349-
ReceiptFees::NewReceipt(TRIGGER_VALUE),
1359+
ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()),
13501360
))
13511361
.unwrap();
13521362

@@ -1406,7 +1416,7 @@ pub mod tests {
14061416
sender_account
14071417
.cast(SenderAccountMessage::UpdateReceiptFees(
14081418
*ALLOCATION_ID_0,
1409-
ReceiptFees::NewReceipt(1),
1419+
ReceiptFees::NewReceipt(1, get_current_timestamp_u64_ns()),
14101420
))
14111421
.unwrap();
14121422

@@ -1419,7 +1429,7 @@ pub mod tests {
14191429
sender_account
14201430
.cast(SenderAccountMessage::UpdateReceiptFees(
14211431
*ALLOCATION_ID_0,
1422-
ReceiptFees::NewReceipt(1),
1432+
ReceiptFees::NewReceipt(1, get_current_timestamp_u64_ns()),
14231433
))
14241434
.unwrap();
14251435

@@ -1547,7 +1557,7 @@ pub mod tests {
15471557
sender_account
15481558
.cast(SenderAccountMessage::UpdateReceiptFees(
15491559
*ALLOCATION_ID_0,
1550-
ReceiptFees::NewReceipt(TRIGGER_VALUE),
1560+
ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()),
15511561
))
15521562
.unwrap();
15531563
tokio::time::sleep(Duration::from_millis(200)).await;
@@ -1941,7 +1951,7 @@ pub mod tests {
19411951
sender_account
19421952
.cast(SenderAccountMessage::UpdateReceiptFees(
19431953
*ALLOCATION_ID_0,
1944-
ReceiptFees::NewReceipt(TRIGGER_VALUE),
1954+
ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()),
19451955
))
19461956
.unwrap();
19471957
tokio::time::sleep(Duration::from_millis(100)).await;

tap-agent/src/agent/sender_allocation.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,10 @@ impl Actor for SenderAllocation {
229229
match message {
230230
SenderAllocationMessage::NewReceipt(notification) => {
231231
let NewReceiptNotification {
232-
id, value: fees, ..
232+
id,
233+
value: fees,
234+
timestamp_ns,
235+
..
233236
} = notification;
234237
if id <= unaggregated_fees.last_id {
235238
// our world assumption is wrong
@@ -260,7 +263,7 @@ impl Actor for SenderAllocation {
260263
.sender_account_ref
261264
.cast(SenderAccountMessage::UpdateReceiptFees(
262265
state.allocation_id,
263-
ReceiptFees::NewReceipt(fees),
266+
ReceiptFees::NewReceipt(fees, timestamp_ns),
264267
))?;
265268
}
266269
SenderAllocationMessage::TriggerRAVRequest => {
@@ -1109,14 +1112,19 @@ pub mod tests {
11091112
)
11101113
.unwrap();
11111114

1115+
let timestamp_ns = SystemTime::now()
1116+
.duration_since(UNIX_EPOCH)
1117+
.unwrap()
1118+
.as_nanos() as u64;
1119+
11121120
cast!(
11131121
sender_allocation,
11141122
SenderAllocationMessage::NewReceipt(NewReceiptNotification {
11151123
id: 1,
11161124
value: 20,
11171125
allocation_id: *ALLOCATION_ID_0,
11181126
signer_address: SIGNER.1,
1119-
timestamp_ns: 0,
1127+
timestamp_ns,
11201128
})
11211129
)
11221130
.unwrap();
@@ -1126,7 +1134,7 @@ pub mod tests {
11261134
// should emit update aggregate fees message to sender account
11271135
let expected_message = SenderAccountMessage::UpdateReceiptFees(
11281136
*ALLOCATION_ID_0,
1129-
ReceiptFees::NewReceipt(20u128),
1137+
ReceiptFees::NewReceipt(20u128, timestamp_ns),
11301138
);
11311139
let startup_load_msg = message_receiver.recv().await.unwrap();
11321140
assert_eq!(

tap-agent/src/tracker/generic_tracker.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ impl GenericTracker<GlobalFeeTracker, SenderFeeStats, DurationInfo, Unaggregated
119119
/// It's important to notice that `value` cannot be less than
120120
/// zero, so the only way to make this counter lower is by using
121121
/// `update` function
122-
pub fn add(&mut self, id: Address, value: u128) {
122+
pub fn add(&mut self, id: Address, value: u128, timestamp_ns: u64) {
123123
let contains_buffer = self.contains_buffer();
124124
let entry = self
125125
.id_to_fee
@@ -134,15 +134,15 @@ impl GenericTracker<GlobalFeeTracker, SenderFeeStats, DurationInfo, Unaggregated
134134
entry.count += 1;
135135

136136
if contains_buffer {
137-
entry.buffer_info.new_entry(value);
137+
entry.buffer_info.new_entry(value, timestamp_ns);
138138
}
139139
}
140140

141141
fn contains_buffer(&self) -> bool {
142142
self.extra_data.buffer_duration > Duration::ZERO
143143
}
144144

145-
pub fn get_total_fee_outside_buffer(&mut self) -> u128 {
145+
pub fn get_ravable_total_fee(&mut self) -> u128 {
146146
self.get_total_fee() - self.get_buffered_fee().min(self.global.total_fee)
147147
}
148148

@@ -152,13 +152,10 @@ impl GenericTracker<GlobalFeeTracker, SenderFeeStats, DurationInfo, Unaggregated
152152
.fold(0u128, |acc, expiring| acc + expiring.buffer_info.get_sum())
153153
}
154154

155-
pub fn get_count_outside_buffer_for_allocation(
156-
&mut self,
157-
allocation_id: &Address,
158-
) -> u64 {
155+
pub fn get_count_outside_buffer_for_allocation(&mut self, allocation_id: &Address) -> u64 {
159156
self.id_to_fee
160157
.get_mut(allocation_id)
161-
.map(|alloc| alloc.get_count_outside_buffer())
158+
.map(|alloc| alloc.ravable_count())
162159
.unwrap_or_default()
163160
}
164161

@@ -186,15 +183,15 @@ impl GenericTracker<GlobalFeeTracker, SenderFeeStats, DurationInfo, Unaggregated
186183
.id_to_fee
187184
.entry(allocation_id)
188185
.or_insert(SenderFeeStats::default_from_extra(&self.extra_data));
189-
entry.failed_info.ok();
186+
entry.backoff_info.ok();
190187
}
191188

192189
pub fn failed_rav_backoff(&mut self, allocation_id: Address) {
193190
let entry = self
194191
.id_to_fee
195192
.entry(allocation_id)
196193
.or_insert(SenderFeeStats::default_from_extra(&self.extra_data));
197-
entry.failed_info.fail();
194+
entry.backoff_info.fail();
198195
}
199196
}
200197

tap-agent/src/tracker/sender_fee_stats.rs

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{
22
collections::VecDeque,
3-
time::{Duration, Instant},
3+
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
44
};
55
// use tracing::error;
66

@@ -22,11 +22,11 @@ pub struct SenderFeeStats {
2222
pub(super) buffer_info: BufferInfo,
2323

2424
// Backoff info
25-
pub(super) failed_info: FailedRavInfo,
25+
pub(super) backoff_info: BackoffInfo,
2626
}
2727

2828
impl SenderFeeStats {
29-
pub(crate) fn get_count_outside_buffer(&mut self) -> u64 {
29+
pub(super) fn ravable_count(&mut self) -> u64 {
3030
let allocation_counter = self.count;
3131
let counter_in_buffer = self.buffer_info.get_count();
3232
allocation_counter - counter_in_buffer
@@ -35,15 +35,20 @@ impl SenderFeeStats {
3535

3636
#[derive(Debug, Clone, Default)]
3737
pub struct BufferInfo {
38-
pub entries: VecDeque<(Instant, u128)>,
38+
pub entries: VecDeque<(SystemTime, u128)>,
3939
pub fee_in_buffer: u128,
4040
pub duration: Duration,
4141
}
4242

4343
impl BufferInfo {
44-
pub(super) fn new_entry(&mut self, value: u128) {
45-
let now = Instant::now();
46-
self.entries.push_back((now, value));
44+
pub(super) fn new_entry(&mut self, value: u128, timestamp_ns: u64) {
45+
let duration_since_epoch = Duration::from_nanos(timestamp_ns);
46+
// Create a SystemTime from the UNIX_EPOCH plus the duration
47+
let system_time = UNIX_EPOCH + duration_since_epoch;
48+
let system_time = system_time
49+
.checked_add(self.duration)
50+
.expect("Should be within bounds");
51+
self.entries.push_back((system_time, value));
4752
self.fee_in_buffer += value;
4853
}
4954

@@ -57,26 +62,33 @@ impl BufferInfo {
5762
self.entries.len() as u64
5863
}
5964

60-
fn cleanup(&mut self) {
61-
let now = Instant::now();
65+
// O(Receipts expired)
66+
fn cleanup(&mut self) -> (u128, u64) {
67+
let now = SystemTime::now();
68+
69+
let mut total_value_expired = 0;
70+
let mut total_count_expired = 0;
6271
while let Some(&(timestamp, value)) = self.entries.front() {
63-
if now.duration_since(timestamp) >= self.duration {
72+
if now >= timestamp {
6473
self.entries.pop_front();
65-
self.fee_in_buffer -= value;
74+
total_value_expired += value;
75+
total_count_expired += 1;
6676
} else {
6777
break;
6878
}
6979
}
80+
self.fee_in_buffer -= total_value_expired;
81+
(total_value_expired, total_count_expired)
7082
}
7183
}
7284

7385
#[derive(Debug, Clone)]
74-
pub struct FailedRavInfo {
86+
pub struct BackoffInfo {
7587
failed_ravs_count: u32,
7688
failed_rav_backoff_time: Instant,
7789
}
7890

79-
impl FailedRavInfo {
91+
impl BackoffInfo {
8092
pub fn ok(&mut self) {
8193
self.failed_ravs_count = 0;
8294
}
@@ -107,7 +119,7 @@ impl DefaultFromExtra<DurationInfo> for SenderFeeStats {
107119
}
108120
}
109121

110-
impl Default for FailedRavInfo {
122+
impl Default for BackoffInfo {
111123
fn default() -> Self {
112124
Self {
113125
failed_ravs_count: 0,
@@ -137,7 +149,7 @@ impl AllocationStats<UnaggregatedReceipts> for SenderFeeStats {
137149
}
138150

139151
fn is_allowed_to_trigger_rav_request(&self) -> bool {
140-
!self.failed_info.in_backoff() && !self.blocked && !self.requesting
152+
!self.backoff_info.in_backoff() && !self.blocked && !self.requesting
141153
}
142154

143155
fn get_stats(&self) -> UnaggregatedReceipts {

0 commit comments

Comments
 (0)