Skip to content

Commit b283110

Browse files
committed
refactor: add timestamp_ns from receipt
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 6c0a744 commit b283110

File tree

5 files changed

+113
-75
lines changed

5 files changed

+113
-75
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ type Balance = U256;
8181

8282
#[derive(Debug)]
8383
pub enum ReceiptFees {
84-
NewReceipt(u128),
84+
NewReceipt(u128, u64),
8585
UpdateValue(UnaggregatedReceipts),
8686
RavRequestResponse(anyhow::Result<(UnaggregatedReceipts, Option<SignedRAV>)>),
8787
Retry,
@@ -594,7 +594,7 @@ impl Actor for SenderAccount {
594594
}
595595

596596
match receipt_fees {
597-
ReceiptFees::NewReceipt(value) => {
597+
ReceiptFees::NewReceipt(value, timestamp_ns) => {
598598
// If state is denied and received new receipt, sender was removed manually from DB
599599
if state.denied {
600600
tracing::warn!(
@@ -609,7 +609,9 @@ impl Actor for SenderAccount {
609609
}
610610

611611
// add new value
612-
state.sender_fee_tracker.add(allocation_id, value);
612+
state
613+
.sender_fee_tracker
614+
.add(allocation_id, value, timestamp_ns);
613615
UNAGGREGATED_FEES
614616
.with_label_values(&[
615617
&state.sender.to_string(),
@@ -641,7 +643,7 @@ impl Actor for SenderAccount {
641643
>= state.config.tap.rav_request_receipt_limit
642644
&& can_trigger_rav;
643645
let total_fee_outside_buffer =
644-
state.sender_fee_tracker.get_total_fee_outside_buffer();
646+
state.sender_fee_tracker.get_ravable_total_fee();
645647
let total_fee_greater_trigger_value =
646648
total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value;
647649
let rav_result = match (
@@ -917,7 +919,7 @@ pub mod tests {
917919
use std::collections::{HashMap, HashSet};
918920
use std::sync::atomic::AtomicU32;
919921
use std::sync::{Arc, Mutex};
920-
use std::time::Duration;
922+
use std::time::{Duration, SystemTime, UNIX_EPOCH};
921923
use wiremock::matchers::{body_string_contains, method};
922924
use wiremock::{Mock, MockServer, ResponseTemplate};
923925

@@ -931,7 +933,9 @@ pub mod tests {
931933
(Self::UpdateReceiptFees(l0, l1), Self::UpdateReceiptFees(r0, r1)) => {
932934
l0 == r0
933935
&& match (l1, r1) {
934-
(ReceiptFees::NewReceipt(l), ReceiptFees::NewReceipt(r)) => r == l,
936+
(ReceiptFees::NewReceipt(l1, l2), ReceiptFees::NewReceipt(r1, r2)) => {
937+
r1 == l1 && r2 == l2
938+
}
935939
(ReceiptFees::UpdateValue(l), ReceiptFees::UpdateValue(r)) => r == l,
936940
(
937941
ReceiptFees::RavRequestResponse(l),
@@ -1278,6 +1282,13 @@ pub mod tests {
12781282
)
12791283
}
12801284

1285+
fn get_current_timestamp_u64_ns() -> u64 {
1286+
SystemTime::now()
1287+
.duration_since(UNIX_EPOCH)
1288+
.unwrap()
1289+
.as_nanos() as u64
1290+
}
1291+
12811292
#[sqlx::test(migrations = "../migrations")]
12821293
async fn test_update_receipt_fees_no_rav(pgpool: PgPool) {
12831294
let (sender_account, handle, prefix, _) = create_sender_account(
@@ -1303,7 +1314,7 @@ pub mod tests {
13031314
sender_account
13041315
.cast(SenderAccountMessage::UpdateReceiptFees(
13051316
*ALLOCATION_ID_0,
1306-
ReceiptFees::NewReceipt(TRIGGER_VALUE - 1),
1317+
ReceiptFees::NewReceipt(TRIGGER_VALUE - 1, get_current_timestamp_u64_ns()),
13071318
))
13081319
.unwrap();
13091320

@@ -1346,7 +1357,7 @@ pub mod tests {
13461357
sender_account
13471358
.cast(SenderAccountMessage::UpdateReceiptFees(
13481359
*ALLOCATION_ID_0,
1349-
ReceiptFees::NewReceipt(TRIGGER_VALUE),
1360+
ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()),
13501361
))
13511362
.unwrap();
13521363

@@ -1406,7 +1417,7 @@ pub mod tests {
14061417
sender_account
14071418
.cast(SenderAccountMessage::UpdateReceiptFees(
14081419
*ALLOCATION_ID_0,
1409-
ReceiptFees::NewReceipt(1),
1420+
ReceiptFees::NewReceipt(1, get_current_timestamp_u64_ns()),
14101421
))
14111422
.unwrap();
14121423

@@ -1419,7 +1430,7 @@ pub mod tests {
14191430
sender_account
14201431
.cast(SenderAccountMessage::UpdateReceiptFees(
14211432
*ALLOCATION_ID_0,
1422-
ReceiptFees::NewReceipt(1),
1433+
ReceiptFees::NewReceipt(1, get_current_timestamp_u64_ns()),
14231434
))
14241435
.unwrap();
14251436

@@ -1547,7 +1558,7 @@ pub mod tests {
15471558
sender_account
15481559
.cast(SenderAccountMessage::UpdateReceiptFees(
15491560
*ALLOCATION_ID_0,
1550-
ReceiptFees::NewReceipt(TRIGGER_VALUE),
1561+
ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()),
15511562
))
15521563
.unwrap();
15531564
tokio::time::sleep(Duration::from_millis(200)).await;
@@ -1941,7 +1952,7 @@ pub mod tests {
19411952
sender_account
19421953
.cast(SenderAccountMessage::UpdateReceiptFees(
19431954
*ALLOCATION_ID_0,
1944-
ReceiptFees::NewReceipt(TRIGGER_VALUE),
1955+
ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()),
19451956
))
19461957
.unwrap();
19471958
tokio::time::sleep(Duration::from_millis(100)).await;

tap-agent/src/agent/sender_allocation.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,10 @@ impl Actor for SenderAllocation {
229229
match message {
230230
SenderAllocationMessage::NewReceipt(notification) => {
231231
let NewReceiptNotification {
232-
id, value: fees, ..
232+
id,
233+
value: fees,
234+
timestamp_ns,
235+
..
233236
} = notification;
234237
if id <= unaggregated_fees.last_id {
235238
// our world assumption is wrong
@@ -260,7 +263,7 @@ impl Actor for SenderAllocation {
260263
.sender_account_ref
261264
.cast(SenderAccountMessage::UpdateReceiptFees(
262265
state.allocation_id,
263-
ReceiptFees::NewReceipt(fees),
266+
ReceiptFees::NewReceipt(fees, timestamp_ns),
264267
))?;
265268
}
266269
SenderAllocationMessage::TriggerRAVRequest => {
@@ -1109,14 +1112,19 @@ pub mod tests {
11091112
)
11101113
.unwrap();
11111114

1115+
let timestamp_ns = SystemTime::now()
1116+
.duration_since(UNIX_EPOCH)
1117+
.unwrap()
1118+
.as_nanos() as u64;
1119+
11121120
cast!(
11131121
sender_allocation,
11141122
SenderAllocationMessage::NewReceipt(NewReceiptNotification {
11151123
id: 1,
11161124
value: 20,
11171125
allocation_id: *ALLOCATION_ID_0,
11181126
signer_address: SIGNER.1,
1119-
timestamp_ns: 0,
1127+
timestamp_ns,
11201128
})
11211129
)
11221130
.unwrap();
@@ -1126,7 +1134,7 @@ pub mod tests {
11261134
// should emit update aggregate fees message to sender account
11271135
let expected_message = SenderAccountMessage::UpdateReceiptFees(
11281136
*ALLOCATION_ID_0,
1129-
ReceiptFees::NewReceipt(20u128),
1137+
ReceiptFees::NewReceipt(20u128, timestamp_ns),
11301138
);
11311139
let startup_load_msg = message_receiver.recv().await.unwrap();
11321140
assert_eq!(

tap-agent/src/tracker/generic_tracker.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ impl GenericTracker<GlobalFeeTracker, SenderFeeStats, DurationInfo, Unaggregated
119119
/// It's important to notice that `value` cannot be less than
120120
/// zero, so the only way to make this counter lower is by using
121121
/// `update` function
122-
pub fn add(&mut self, id: Address, value: u128) {
122+
pub fn add(&mut self, id: Address, value: u128, timestamp_ns: u64) {
123123
let contains_buffer = self.contains_buffer();
124124
let entry = self
125125
.id_to_fee
@@ -134,15 +134,15 @@ impl GenericTracker<GlobalFeeTracker, SenderFeeStats, DurationInfo, Unaggregated
134134
entry.count += 1;
135135

136136
if contains_buffer {
137-
entry.buffer_info.new_entry(value);
137+
entry.buffer_info.new_entry(value, timestamp_ns);
138138
}
139139
}
140140

141141
fn contains_buffer(&self) -> bool {
142142
self.extra_data.buffer_duration > Duration::ZERO
143143
}
144144

145-
pub fn get_total_fee_outside_buffer(&mut self) -> u128 {
145+
pub fn get_ravable_total_fee(&mut self) -> u128 {
146146
self.get_total_fee() - self.get_buffered_fee().min(self.global.total_fee)
147147
}
148148

@@ -152,13 +152,10 @@ impl GenericTracker<GlobalFeeTracker, SenderFeeStats, DurationInfo, Unaggregated
152152
.fold(0u128, |acc, expiring| acc + expiring.buffer_info.get_sum())
153153
}
154154

155-
pub fn get_count_outside_buffer_for_allocation(
156-
&mut self,
157-
allocation_id: &Address,
158-
) -> u64 {
155+
pub fn get_count_outside_buffer_for_allocation(&mut self, allocation_id: &Address) -> u64 {
159156
self.id_to_fee
160157
.get_mut(allocation_id)
161-
.map(|alloc| alloc.get_count_outside_buffer())
158+
.map(|alloc| alloc.ravable_count())
162159
.unwrap_or_default()
163160
}
164161

@@ -186,15 +183,15 @@ impl GenericTracker<GlobalFeeTracker, SenderFeeStats, DurationInfo, Unaggregated
186183
.id_to_fee
187184
.entry(allocation_id)
188185
.or_insert(SenderFeeStats::default_from_extra(&self.extra_data));
189-
entry.failed_info.ok();
186+
entry.backoff_info.ok();
190187
}
191188

192189
pub fn failed_rav_backoff(&mut self, allocation_id: Address) {
193190
let entry = self
194191
.id_to_fee
195192
.entry(allocation_id)
196193
.or_insert(SenderFeeStats::default_from_extra(&self.extra_data));
197-
entry.failed_info.fail();
194+
entry.backoff_info.fail();
198195
}
199196
}
200197

tap-agent/src/tracker/sender_fee_stats.rs

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{
22
collections::VecDeque,
3-
time::{Duration, Instant},
3+
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
44
};
55
// use tracing::error;
66

@@ -22,11 +22,11 @@ pub struct SenderFeeStats {
2222
pub(super) buffer_info: BufferInfo,
2323

2424
// Backoff info
25-
pub(super) failed_info: FailedRavInfo,
25+
pub(super) backoff_info: BackoffInfo,
2626
}
2727

2828
impl SenderFeeStats {
29-
pub(crate) fn get_count_outside_buffer(&mut self) -> u64 {
29+
pub(super) fn ravable_count(&mut self) -> u64 {
3030
let allocation_counter = self.count;
3131
let counter_in_buffer = self.buffer_info.get_count();
3232
allocation_counter - counter_in_buffer
@@ -35,15 +35,20 @@ impl SenderFeeStats {
3535

3636
#[derive(Debug, Clone, Default)]
3737
pub struct BufferInfo {
38-
pub entries: VecDeque<(Instant, u128)>,
38+
pub entries: VecDeque<(SystemTime, u128)>,
3939
pub fee_in_buffer: u128,
4040
pub duration: Duration,
4141
}
4242

4343
impl BufferInfo {
44-
pub(super) fn new_entry(&mut self, value: u128) {
45-
let now = Instant::now();
46-
self.entries.push_back((now, value));
44+
pub(super) fn new_entry(&mut self, value: u128, timestamp_ns: u64) {
45+
let duration_since_epoch = Duration::from_nanos(timestamp_ns);
46+
// Create a SystemTime from the UNIX_EPOCH plus the duration
47+
let system_time = UNIX_EPOCH + duration_since_epoch;
48+
let system_time = system_time
49+
.checked_add(self.duration)
50+
.expect("Should be within bounds");
51+
self.entries.push_back((system_time, value));
4752
self.fee_in_buffer += value;
4853
}
4954

@@ -57,26 +62,33 @@ impl BufferInfo {
5762
self.entries.len() as u64
5863
}
5964

60-
fn cleanup(&mut self) {
61-
let now = Instant::now();
65+
// O(Receipts expired)
66+
fn cleanup(&mut self) -> (u128, u64) {
67+
let now = SystemTime::now();
68+
69+
let mut total_value_expired = 0;
70+
let mut total_count_expired = 0;
6271
while let Some(&(timestamp, value)) = self.entries.front() {
63-
if now.duration_since(timestamp) >= self.duration {
72+
if now >= timestamp {
6473
self.entries.pop_front();
65-
self.fee_in_buffer -= value;
74+
total_value_expired += value;
75+
total_count_expired += 1;
6676
} else {
6777
break;
6878
}
6979
}
80+
self.fee_in_buffer -= total_value_expired;
81+
(total_value_expired, total_count_expired)
7082
}
7183
}
7284

7385
#[derive(Debug, Clone)]
74-
pub struct FailedRavInfo {
86+
pub struct BackoffInfo {
7587
failed_ravs_count: u32,
7688
failed_rav_backoff_time: Instant,
7789
}
7890

79-
impl FailedRavInfo {
91+
impl BackoffInfo {
8092
pub fn ok(&mut self) {
8193
self.failed_ravs_count = 0;
8294
}
@@ -107,7 +119,7 @@ impl DefaultFromExtra<DurationInfo> for SenderFeeStats {
107119
}
108120
}
109121

110-
impl Default for FailedRavInfo {
122+
impl Default for BackoffInfo {
111123
fn default() -> Self {
112124
Self {
113125
failed_ravs_count: 0,
@@ -137,7 +149,7 @@ impl AllocationStats<UnaggregatedReceipts> for SenderFeeStats {
137149
}
138150

139151
fn is_allowed_to_trigger_rav_request(&self) -> bool {
140-
!self.failed_info.in_backoff() && !self.blocked && !self.requesting
152+
!self.backoff_info.in_backoff() && !self.blocked && !self.requesting
141153
}
142154

143155
fn get_stats(&self) -> UnaggregatedReceipts {

0 commit comments

Comments
 (0)