diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 7bfbb85c8..679260cec 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -79,7 +79,8 @@ type Balance = U256; #[derive(Debug, Eq, PartialEq)] pub enum ReceiptFees { - NewValue(UnaggregatedReceipts), + NewReceipt(u128), + UpdateValue(UnaggregatedReceipts), Retry, } @@ -198,8 +199,13 @@ impl State { async fn rav_requester_single(&mut self) -> Result<()> { let Some(allocation_id) = self.sender_fee_tracker.get_heaviest_allocation_id() else { anyhow::bail!( - "Error while getting the heaviest allocation because \ - no unblocked allocation has enough unaggregated fees tracked" + "Error while getting the heaviest allocation, \ + this is due one of the following reasons: \n + 1. allocations have too much fees under their buffer\n + 2. allocations are blocked to be redeemed due to ongoing last rav. \n + If you keep seeing this message try to increase your `amount_willing_to_lose` \ + and restart your `tap-agent`\n + If this doesn't work, open an issue on our Github." ); }; let sender_allocation_id = self.format_sender_allocation(&allocation_id); @@ -478,7 +484,9 @@ impl Actor for SenderAccount { .set(config.tap.rav_request_trigger_value as f64); let state = State { - sender_fee_tracker: SenderFeeTracker::default(), + sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis( + config.tap.rav_request_timestamp_buffer_ms, + )), rav_tracker: SenderFeeTracker::default(), invalid_receipts_tracker: SenderFeeTracker::default(), allocation_ids: allocation_ids.clone(), @@ -564,14 +572,30 @@ impl Actor for SenderAccount { scheduled_rav_request.abort(); } - if let ReceiptFees::NewValue(unaggregated_fees) = receipt_fees { - state - .sender_fee_tracker - .update(allocation_id, unaggregated_fees.value); + match receipt_fees { + ReceiptFees::NewReceipt(value) => { + state.sender_fee_tracker.add(allocation_id, value); - UNAGGREGATED_FEES - .with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()]) - .set(unaggregated_fees.value as f64); + UNAGGREGATED_FEES + .with_label_values(&[ + &state.sender.to_string(), + &allocation_id.to_string(), + ]) + .add(value as f64); + } + ReceiptFees::UpdateValue(unaggregated_fees) => { + state + .sender_fee_tracker + .update(allocation_id, unaggregated_fees.value); + + UNAGGREGATED_FEES + .with_label_values(&[ + &state.sender.to_string(), + &allocation_id.to_string(), + ]) + .set(unaggregated_fees.value as f64); + } + ReceiptFees::Retry => {} } // Eagerly deny the sender (if needed), before the RAV request. To be sure not to @@ -582,7 +606,7 @@ impl Actor for SenderAccount { state.add_to_denylist().await; } - if state.sender_fee_tracker.get_total_fee() + if state.sender_fee_tracker.get_total_fee_outside_buffer() >= state.config.tap.rav_request_trigger_value { tracing::debug!( @@ -769,7 +793,7 @@ impl Actor for SenderAccount { // update the receipt fees by reseting to 0 myself.cast(SenderAccountMessage::UpdateReceiptFees( allocation_id, - ReceiptFees::NewValue(UnaggregatedReceipts::default()), + ReceiptFees::UpdateValue(UnaggregatedReceipts::default()), ))?; // rav tracker is not updated because it's still not redeemed @@ -884,6 +908,7 @@ pub mod tests { const DUMMY_URL: &str = "http://localhost:1234"; const TRIGGER_VALUE: u128 = 500; const ESCROW_VALUE: u128 = 1000; + const BUFFER_MS: u64 = 100; async fn create_sender_account( pgpool: PgPool, @@ -904,7 +929,7 @@ pub mod tests { }, tap: config::Tap { rav_request_trigger_value, - rav_request_timestamp_buffer_ms: 1, + rav_request_timestamp_buffer_ms: BUFFER_MS, rav_request_timeout_secs: 5, max_unnaggregated_fees_per_sender, ..Default::default() @@ -1191,14 +1216,11 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewValue(UnaggregatedReceipts { - value: TRIGGER_VALUE - 1, - last_id: 10, - }), + ReceiptFees::NewReceipt(TRIGGER_VALUE - 1), )) .unwrap(); - tokio::time::sleep(Duration::from_millis(10)).await; + tokio::time::sleep(Duration::from_millis(BUFFER_MS)).await; assert_eq!( triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst), @@ -1230,10 +1252,24 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewValue(UnaggregatedReceipts { - value: TRIGGER_VALUE, - last_id: 10, - }), + ReceiptFees::NewReceipt(TRIGGER_VALUE), + )) + .unwrap(); + + tokio::time::sleep(Duration::from_millis(20)).await; + + assert_eq!( + triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst), + 0 + ); + + // wait for it to be outside buffer + tokio::time::sleep(Duration::from_millis(BUFFER_MS)).await; + + sender_account + .cast(SenderAccountMessage::UpdateReceiptFees( + *ALLOCATION_ID_0, + ReceiptFees::Retry, )) .unwrap(); @@ -1342,10 +1378,7 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewValue(UnaggregatedReceipts { - value: TRIGGER_VALUE, - last_id: 11, - }), + ReceiptFees::NewReceipt(TRIGGER_VALUE), )) .unwrap(); tokio::time::sleep(Duration::from_millis(200)).await; @@ -1388,7 +1421,7 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewValue(UnaggregatedReceipts { + ReceiptFees::UpdateValue(UnaggregatedReceipts { value: $value, last_id: 11, }), @@ -1529,7 +1562,7 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewValue(UnaggregatedReceipts { + ReceiptFees::UpdateValue(UnaggregatedReceipts { value: $value, last_id: 11, }), @@ -1730,10 +1763,7 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewValue(UnaggregatedReceipts { - value: TRIGGER_VALUE, - last_id: 11, - }), + ReceiptFees::NewReceipt(TRIGGER_VALUE), )) .unwrap(); tokio::time::sleep(Duration::from_millis(100)).await; diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index 9b1724e16..abeff0518 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -34,7 +34,7 @@ lazy_static! { .unwrap(); } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, PartialEq, Eq)] pub struct NewReceiptNotification { pub id: u64, pub allocation_id: Address, diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 26012fc3f..5d28be34e 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -161,7 +161,7 @@ impl Actor for SenderAllocation { sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees( allocation_id, - ReceiptFees::NewValue(state.unaggregated_fees.clone()), + ReceiptFees::UpdateValue(state.unaggregated_fees.clone()), ))?; // update rav tracker for sender account @@ -225,9 +225,10 @@ impl Actor for SenderAllocation { ); let unaggregated_fees = &mut state.unaggregated_fees; match message { - SenderAllocationMessage::NewReceipt(NewReceiptNotification { - id, value: fees, .. - }) => { + SenderAllocationMessage::NewReceipt(notification) => { + let NewReceiptNotification { + id, value: fees, .. + } = notification; if id > unaggregated_fees.last_id { unaggregated_fees.last_id = id; unaggregated_fees.value = unaggregated_fees @@ -248,7 +249,7 @@ impl Actor for SenderAllocation { .sender_account_ref .cast(SenderAccountMessage::UpdateReceiptFees( state.allocation_id, - ReceiptFees::NewValue(unaggregated_fees.clone()), + ReceiptFees::NewReceipt(fees), ))?; } } @@ -974,7 +975,7 @@ pub mod tests { // Should emit a message to the sender account with the unaggregated fees. let expected_message = SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewValue(UnaggregatedReceipts { + ReceiptFees::UpdateValue(UnaggregatedReceipts { last_id: 10, value: 55u128, }), @@ -1073,10 +1074,7 @@ pub mod tests { // should emit update aggregate fees message to sender account let expected_message = SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewValue(UnaggregatedReceipts { - last_id: 1, - value: 20, - }), + ReceiptFees::NewReceipt(20u128), ); let last_message_emitted = last_message_emitted.lock().unwrap(); assert_eq!(last_message_emitted.len(), 2); @@ -1191,7 +1189,7 @@ pub mod tests { last_message_emitted.lock().unwrap().last(), Some(&SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewValue(UnaggregatedReceipts::default()) + ReceiptFees::UpdateValue(UnaggregatedReceipts::default()) )) ); } diff --git a/tap-agent/src/agent/sender_fee_tracker.rs b/tap-agent/src/agent/sender_fee_tracker.rs index 23d7de840..721404b28 100644 --- a/tap-agent/src/agent/sender_fee_tracker.rs +++ b/tap-agent/src/agent/sender_fee_tracker.rs @@ -2,13 +2,40 @@ // SPDX-License-Identifier: Apache-2.0 use alloy::primitives::Address; -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + time::{Duration, Instant}, +}; use tracing::error; +#[derive(Debug, Clone, Default)] +struct ExpiringSum { + entries: VecDeque<(Instant, u128)>, + sum: u128, +} + +impl ExpiringSum { + fn get_sum(&mut self, duration: &Duration) -> u128 { + let now = Instant::now(); + while let Some(&(timestamp, value)) = self.entries.front() { + if now.duration_since(timestamp) >= *duration { + self.entries.pop_front(); + self.sum -= value; + } else { + break; + } + } + self.sum + } +} + #[derive(Debug, Clone, Default)] pub struct SenderFeeTracker { id_to_fee: HashMap, total_fee: u128, + + buffer_window_fee: HashMap, + buffer_window_duration: Duration, // there are some allocations that we don't want it to be // heaviest allocation, because they are already marked for finalization, // and thus requesting RAVs on their own in their `post_stop` routine. @@ -16,6 +43,33 @@ pub struct SenderFeeTracker { } impl SenderFeeTracker { + pub fn new(buffer_window_duration: Duration) -> Self { + Self { + buffer_window_duration, + ..Default::default() + } + } + /// Adds into the total_fee entry and buffer window totals + /// + /// It's important to notice that `value` cannot be less than + /// zero, so the only way to make this counter lower is by using + /// `update` function + pub fn add(&mut self, id: Address, value: u128) { + if self.buffer_window_duration > Duration::ZERO { + let now = Instant::now(); + let expiring_sum = self.buffer_window_fee.entry(id).or_default(); + expiring_sum.entries.push_back((now, value)); + expiring_sum.sum += value; + } + self.total_fee += value; + let entry = self.id_to_fee.entry(id).or_default(); + *entry += value; + } + + /// Updates and overwrite the fee counter into the specific + /// value provided. + /// + /// IMPORTANT: This function does not affect the buffer window fee pub fn update(&mut self, id: Address, fee: u128) { if fee > 0 { // insert or update, if update remove old fee from total @@ -44,20 +98,32 @@ impl SenderFeeTracker { self.blocked_addresses.remove(&address); } - pub fn get_heaviest_allocation_id(&self) -> Option
{ + pub fn get_heaviest_allocation_id(&mut self) -> Option
{ // just loop over and get the biggest fee self.id_to_fee .iter() .filter(|(addr, _)| !self.blocked_addresses.contains(*addr)) + // map to the value minus fees in buffer + .map(|(addr, fee)| { + ( + addr, + fee - self + .buffer_window_fee + .get_mut(addr) + .map(|expiring| expiring.get_sum(&self.buffer_window_duration)) + .unwrap_or_default(), + ) + }) + .filter(|(_, fee)| *fee > 0) .fold(None, |acc: Option<(&Address, u128)>, (addr, fee)| { if let Some((_, max_fee)) = acc { - if *fee > max_fee { - Some((addr, *fee)) + if fee > max_fee { + Some((addr, fee)) } else { acc } } else { - Some((addr, *fee)) + Some((addr, fee)) } }) .map(|(&id, _)| id) @@ -70,22 +136,31 @@ impl SenderFeeTracker { pub fn get_total_fee(&self) -> u128 { self.total_fee } + + pub fn get_total_fee_outside_buffer(&mut self) -> u128 { + self.total_fee - self.get_buffer_fee().min(self.total_fee) + } + + pub fn get_buffer_fee(&mut self) -> u128 { + self.buffer_window_fee + .values_mut() + .fold(0u128, |acc, expiring| { + acc + expiring.get_sum(&self.buffer_window_duration) + }) + } } #[cfg(test)] mod tests { use super::SenderFeeTracker; - use std::str::FromStr; - use thegraph_core::Address; + use alloy::primitives::address; + use std::{thread::sleep, time::Duration}; #[test] fn test_allocation_id_tracker() { - let allocation_id_0: Address = - Address::from_str("0xabababababababababababababababababababab").unwrap(); - let allocation_id_1: Address = - Address::from_str("0xbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc").unwrap(); - let allocation_id_2: Address = - Address::from_str("0xcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd").unwrap(); + let allocation_id_0 = address!("abababababababababababababababababababab"); + let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); + let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"); let mut tracker = SenderFeeTracker::default(); assert_eq!(tracker.get_heaviest_allocation_id(), None); @@ -137,4 +212,80 @@ mod tests { assert_eq!(tracker.get_heaviest_allocation_id(), None); assert_eq!(tracker.get_total_fee(), 0); } + + #[test] + fn test_buffer_tracker_window() { + let allocation_id_0 = address!("abababababababababababababababababababab"); + let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); + let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"); + + const BUFFER_WINDOW: Duration = Duration::from_millis(20); + let mut tracker = SenderFeeTracker::new(BUFFER_WINDOW); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_total_fee_outside_buffer(), 0); + assert_eq!(tracker.get_total_fee(), 0); + + tracker.add(allocation_id_0, 10); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_total_fee_outside_buffer(), 0); + assert_eq!(tracker.get_total_fee(), 10); + + sleep(BUFFER_WINDOW); + + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_total_fee_outside_buffer(), 10); + assert_eq!(tracker.get_total_fee(), 10); + + tracker.add(allocation_id_2, 20); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_total_fee_outside_buffer(), 10); + assert_eq!(tracker.get_total_fee(), 30); + + sleep(BUFFER_WINDOW); + + tracker.block_allocation_id(allocation_id_2); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_total_fee_outside_buffer(), 30); + assert_eq!(tracker.get_total_fee(), 30); + + tracker.unblock_allocation_id(allocation_id_2); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + + tracker.add(allocation_id_1, 30); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + assert_eq!(tracker.get_total_fee_outside_buffer(), 30); + assert_eq!(tracker.get_total_fee(), 60); + + sleep(BUFFER_WINDOW); + + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_total_fee_outside_buffer(), 60); + assert_eq!(tracker.get_total_fee(), 60); + + tracker.add(allocation_id_2, 20); + tracker.update(allocation_id_2, 0); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_total_fee_outside_buffer(), 20); + assert_eq!(tracker.get_total_fee(), 40); + + sleep(BUFFER_WINDOW); + + tracker.add(allocation_id_2, 100); + tracker.update(allocation_id_2, 0); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_total_fee_outside_buffer(), 0); + assert_eq!(tracker.get_total_fee(), 40); + + sleep(BUFFER_WINDOW); + + tracker.update(allocation_id_1, 0); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_total_fee_outside_buffer(), 10); + assert_eq!(tracker.get_total_fee(), 10); + + tracker.update(allocation_id_0, 0); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_total_fee_outside_buffer(), 0); + assert_eq!(tracker.get_total_fee(), 0); + } }