diff --git a/tap-agent/src/agent.rs b/tap-agent/src/agent.rs index 3985e700e..d79d02b04 100644 --- a/tap-agent/src/agent.rs +++ b/tap-agent/src/agent.rs @@ -21,7 +21,6 @@ use sender_accounts_manager::SenderAccountsManager; pub mod sender_account; pub mod sender_accounts_manager; pub mod sender_allocation; -pub mod sender_fee_tracker; pub mod unaggregated_receipts; pub async fn start_agent() -> (ActorRef, JoinHandle<()>) { diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index dd31c3392..690ef2817 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -28,8 +28,8 @@ use tracing::{error, Level}; use super::sender_allocation::{SenderAllocation, SenderAllocationArgs}; use crate::agent::sender_allocation::SenderAllocationMessage; -use crate::agent::sender_fee_tracker::SenderFeeTracker; use crate::agent::unaggregated_receipts::UnaggregatedReceipts; +use crate::tracker::{SenderFeeTracker, SimpleFeeTracker}; use crate::{ config::{self}, tap::escrow_adapter::EscrowAdapter, @@ -82,7 +82,7 @@ type Balance = U256; #[derive(Debug)] pub enum ReceiptFees { - NewReceipt(u128), + NewReceipt(u128, u64), UpdateValue(UnaggregatedReceipts), RavRequestResponse(anyhow::Result<(UnaggregatedReceipts, Option)>), Retry, @@ -132,8 +132,8 @@ pub struct SenderAccountArgs { pub struct State { prefix: Option, sender_fee_tracker: SenderFeeTracker, - rav_tracker: SenderFeeTracker, - invalid_receipts_tracker: SenderFeeTracker, + rav_tracker: SimpleFeeTracker, + invalid_receipts_tracker: SimpleFeeTracker, allocation_ids: HashSet
, _indexer_allocations_handle: JoinHandle<()>, _escrow_account_monitor: PipeHandle, @@ -238,6 +238,53 @@ impl State { Ok(()) } + fn finalize_rav_request( + &mut self, + allocation_id: Address, + rav_result: anyhow::Result<(UnaggregatedReceipts, Option)>, + ) { + self.sender_fee_tracker.finish_rav_request(allocation_id); + match rav_result { + Ok((fees, rav)) => { + self.sender_fee_tracker.ok_rav_request(allocation_id); + + let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate); + self.update_rav(allocation_id, rav_value); + + // update sender fee tracker + self.update_sender_fee(allocation_id, fees); + } + Err(err) => { + // TODO we should update the total value too + self.sender_fee_tracker.failed_rav_backoff(allocation_id); + error!( + "Error while requesting RAV for sender {} and allocation {}: {}", + self.sender, allocation_id, err + ); + } + }; + } + + fn update_rav(&mut self, allocation_id: Address, rav_value: u128) { + self.rav_tracker.update(allocation_id, rav_value); + PENDING_RAV + .with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()]) + .set(rav_value as f64); + } + + fn update_sender_fee( + &mut self, + allocation_id: Address, + unaggregated_fees: UnaggregatedReceipts, + ) { + self.sender_fee_tracker + .update(allocation_id, unaggregated_fees); + + UNAGGREGATED_FEES + .with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()]) + .set(unaggregated_fees.value as f64); + } + fn deny_condition_reached(&self) -> bool { let pending_ravs = self.rav_tracker.get_total_fee(); let unaggregated_fees = self.sender_fee_tracker.get_total_fee(); @@ -471,8 +518,8 @@ impl Actor for SenderAccount { sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis( config.tap.rav_request_timestamp_buffer_ms, )), - rav_tracker: SenderFeeTracker::default(), - invalid_receipts_tracker: SenderFeeTracker::default(), + rav_tracker: SimpleFeeTracker::default(), + invalid_receipts_tracker: SimpleFeeTracker::default(), allocation_ids: allocation_ids.clone(), _indexer_allocations_handle, _escrow_account_monitor, @@ -520,16 +567,7 @@ impl Actor for SenderAccount { match message { SenderAccountMessage::UpdateRav(rav) => { - state - .rav_tracker - .update(rav.message.allocationId, rav.message.valueAggregate, 0); - - PENDING_RAV - .with_label_values(&[ - &state.sender.to_string(), - &rav.message.allocationId.to_string(), - ]) - .set(rav.message.valueAggregate as f64); + state.update_rav(rav.message.allocationId, rav.message.valueAggregate); let should_deny = !state.denied && state.deny_condition_reached(); if should_deny { @@ -543,7 +581,7 @@ impl Actor for SenderAccount { state .invalid_receipts_tracker - .update(allocation_id, unaggregated_fees.value, 0); + .update(allocation_id, unaggregated_fees.value); // invalid receipts can't go down let should_deny = !state.denied && state.deny_condition_reached(); @@ -558,7 +596,7 @@ impl Actor for SenderAccount { } match receipt_fees { - ReceiptFees::NewReceipt(value) => { + ReceiptFees::NewReceipt(value, timestamp_ns) => { // If state is denied and received new receipt, sender was removed manually from DB if state.denied { tracing::warn!( @@ -571,8 +609,11 @@ impl Actor for SenderAccount { ); SenderAccount::deny_sender(&state.pgpool, state.sender).await; } - state.sender_fee_tracker.add(allocation_id, value); + // add new value + state + .sender_fee_tracker + .add(allocation_id, value, timestamp_ns); UNAGGREGATED_FEES .with_label_values(&[ &state.sender.to_string(), @@ -581,58 +622,10 @@ impl Actor for SenderAccount { .add(value as f64); } ReceiptFees::RavRequestResponse(rav_result) => { - state.sender_fee_tracker.finish_rav_request(allocation_id); - match rav_result { - Ok((fees, rav)) => { - state.rav_tracker.ok_rav_request(allocation_id); - - let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate); - // update rav tracker - state.rav_tracker.update(allocation_id, rav_value, 0); - PENDING_RAV - .with_label_values(&[ - &state.sender.to_string(), - &allocation_id.to_string(), - ]) - .set(rav_value as f64); - - // update sender fee tracker - state.sender_fee_tracker.update( - allocation_id, - fees.value, - fees.counter, - ); - UNAGGREGATED_FEES - .with_label_values(&[ - &state.sender.to_string(), - &allocation_id.to_string(), - ]) - .set(fees.value as f64); - } - Err(err) => { - state.rav_tracker.failed_rav_backoff(allocation_id); - error!( - "Error while requesting RAV for sender {} and allocation {}: {}", - state.sender, - allocation_id, - err - ); - } - }; + state.finalize_rav_request(allocation_id, rav_result); } ReceiptFees::UpdateValue(unaggregated_fees) => { - state.sender_fee_tracker.update( - allocation_id, - unaggregated_fees.value, - unaggregated_fees.counter, - ); - - UNAGGREGATED_FEES - .with_label_values(&[ - &state.sender.to_string(), - &allocation_id.to_string(), - ]) - .set(unaggregated_fees.value as f64); + state.update_sender_fee(allocation_id, unaggregated_fees); } ReceiptFees::Retry => {} } @@ -646,14 +639,12 @@ impl Actor for SenderAccount { } let total_counter_for_allocation = state .sender_fee_tracker - .get_total_counter_outside_buffer_for_allocation(&allocation_id); + .get_count_outside_buffer_for_allocation(&allocation_id); + let can_trigger_rav = state.sender_fee_tracker.can_trigger_rav(allocation_id); let counter_greater_receipt_limit = total_counter_for_allocation >= state.config.tap.rav_request_receipt_limit - && !state - .sender_fee_tracker - .check_allocation_has_rav_request_running(allocation_id); - let total_fee_outside_buffer = - state.sender_fee_tracker.get_total_fee_outside_buffer(); + && can_trigger_rav; + let total_fee_outside_buffer = state.sender_fee_tracker.get_ravable_total_fee(); let total_fee_greater_trigger_value = total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value; let rav_result = match ( @@ -775,7 +766,7 @@ impl Actor for SenderAccount { for allocation_id in tracked_allocation_ids.difference(&active_allocation_ids) { // if it's being tracked and we didn't receive any update from the non_final_last_ravs // remove from the tracker - state.rav_tracker.update(*allocation_id, 0, 0); + state.rav_tracker.remove(*allocation_id); let _ = PENDING_RAV.remove_label_values(&[ &state.sender.to_string(), @@ -784,10 +775,7 @@ impl Actor for SenderAccount { } for (allocation_id, value) in non_final_last_ravs { - state.rav_tracker.update(allocation_id, value, 0); - PENDING_RAV - .with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()]) - .set(value as f64); + state.update_rav(allocation_id, value); } // now that balance and rav tracker is updated, check match (state.denied, state.deny_condition_reached()) { @@ -851,15 +839,14 @@ impl Actor for SenderAccount { return Ok(()); }; - // clean up hashset - state - .sender_fee_tracker - .unblock_allocation_id(allocation_id); - // update the receipt fees by reseting to 0 - myself.cast(SenderAccountMessage::UpdateReceiptFees( + // remove from sender_fee_tracker + state.sender_fee_tracker.remove(allocation_id); + + // check for deny conditions + let _ = myself.cast(SenderAccountMessage::UpdateReceiptFees( allocation_id, - ReceiptFees::UpdateValue(UnaggregatedReceipts::default()), - ))?; + ReceiptFees::Retry, + )); // rav tracker is not updated because it's still not redeemed } @@ -939,7 +926,7 @@ pub mod tests { use std::collections::{HashMap, HashSet}; use std::sync::atomic::AtomicU32; use std::sync::{Arc, Mutex}; - use std::time::Duration; + use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::watch; use wiremock::matchers::{body_string_contains, method}; use wiremock::{Mock, MockServer, ResponseTemplate}; @@ -954,7 +941,9 @@ pub mod tests { (Self::UpdateReceiptFees(l0, l1), Self::UpdateReceiptFees(r0, r1)) => { l0 == r0 && match (l1, r1) { - (ReceiptFees::NewReceipt(l), ReceiptFees::NewReceipt(r)) => r == l, + (ReceiptFees::NewReceipt(l1, l2), ReceiptFees::NewReceipt(r1, r2)) => { + r1 == l1 && r2 == l2 + } (ReceiptFees::UpdateValue(l), ReceiptFees::UpdateValue(r)) => r == l, ( ReceiptFees::RavRequestResponse(l), @@ -1301,6 +1290,13 @@ pub mod tests { ) } + fn get_current_timestamp_u64_ns() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() as u64 + } + #[sqlx::test(migrations = "../migrations")] async fn test_update_receipt_fees_no_rav(pgpool: PgPool) { let (sender_account, handle, prefix, _) = create_sender_account( @@ -1326,7 +1322,7 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewReceipt(TRIGGER_VALUE - 1), + ReceiptFees::NewReceipt(TRIGGER_VALUE - 1, get_current_timestamp_u64_ns()), )) .unwrap(); @@ -1369,7 +1365,7 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewReceipt(TRIGGER_VALUE), + ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()), )) .unwrap(); @@ -1429,7 +1425,7 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewReceipt(1), + ReceiptFees::NewReceipt(1, get_current_timestamp_u64_ns()), )) .unwrap(); @@ -1442,7 +1438,7 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewReceipt(1), + ReceiptFees::NewReceipt(1, get_current_timestamp_u64_ns()), )) .unwrap(); @@ -1570,7 +1566,7 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewReceipt(TRIGGER_VALUE), + ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()), )) .unwrap(); tokio::time::sleep(Duration::from_millis(200)).await; @@ -1964,7 +1960,7 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewReceipt(TRIGGER_VALUE), + ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()), )) .unwrap(); tokio::time::sleep(Duration::from_millis(100)).await; diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 715c9235a..b7f6ab323 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -149,7 +149,7 @@ impl Actor for SenderAllocation { if state.invalid_receipts_fees.value > 0 { sender_account_ref.cast(SenderAccountMessage::UpdateInvalidReceiptFees( allocation_id, - state.invalid_receipts_fees.clone(), + state.invalid_receipts_fees, ))?; } @@ -158,7 +158,7 @@ impl Actor for SenderAllocation { sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees( allocation_id, - ReceiptFees::UpdateValue(state.unaggregated_fees.clone()), + ReceiptFees::UpdateValue(state.unaggregated_fees), ))?; // update rav tracker for sender account @@ -196,7 +196,12 @@ impl Actor for SenderAllocation { } while let Err(err) = state.mark_rav_last().await { - error!(error = %err, %state.allocation_id, %state.sender, "Error while marking allocation last. Retrying in 30 seconds..."); + error!( + error = %err, + %state.allocation_id, + %state.sender, + "Error while marking allocation last. Retrying in 30 seconds..." + ); tokio::time::sleep(Duration::from_secs(30)).await; } @@ -224,7 +229,10 @@ impl Actor for SenderAllocation { match message { SenderAllocationMessage::NewReceipt(notification) => { let NewReceiptNotification { - id, value: fees, .. + id, + value: fees, + timestamp_ns, + .. } = notification; if id <= unaggregated_fees.last_id { // our world assumption is wrong @@ -255,7 +263,7 @@ impl Actor for SenderAllocation { .sender_account_ref .cast(SenderAccountMessage::UpdateReceiptFees( state.allocation_id, - ReceiptFees::NewReceipt(fees), + ReceiptFees::NewReceipt(fees, timestamp_ns), ))?; } SenderAllocationMessage::TriggerRAVRequest => { @@ -263,7 +271,7 @@ impl Actor for SenderAllocation { state .request_rav() .await - .map(|_| (state.unaggregated_fees.clone(), state.latest_rav.clone())) + .map(|_| (state.unaggregated_fees, state.latest_rav.clone())) } else { Err(anyhow!("Unaggregated fee equals zero")) }; @@ -278,7 +286,7 @@ impl Actor for SenderAllocation { #[cfg(test)] SenderAllocationMessage::GetUnaggregatedReceipts(reply) => { if !reply.is_closed() { - let _ = reply.send(unaggregated_fees.clone()); + let _ = reply.send(*unaggregated_fees); } } } @@ -769,7 +777,7 @@ impl SenderAllocationState { self.sender_account_ref .cast(SenderAccountMessage::UpdateInvalidReceiptFees( self.allocation_id, - self.invalid_receipts_fees.clone(), + self.invalid_receipts_fees, ))?; Ok(()) @@ -1104,6 +1112,11 @@ pub mod tests { ) .unwrap(); + let timestamp_ns = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() as u64; + cast!( sender_allocation, SenderAllocationMessage::NewReceipt(NewReceiptNotification { @@ -1111,7 +1124,7 @@ pub mod tests { value: 20, allocation_id: *ALLOCATION_ID_0, signer_address: SIGNER.1, - timestamp_ns: 0, + timestamp_ns, }) ) .unwrap(); @@ -1121,7 +1134,7 @@ pub mod tests { // should emit update aggregate fees message to sender account let expected_message = SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - ReceiptFees::NewReceipt(20u128), + ReceiptFees::NewReceipt(20u128, timestamp_ns), ); let startup_load_msg = message_receiver.recv().await.unwrap(); assert_eq!( diff --git a/tap-agent/src/agent/sender_fee_tracker.rs b/tap-agent/src/agent/sender_fee_tracker.rs deleted file mode 100644 index 922ed9eb2..000000000 --- a/tap-agent/src/agent/sender_fee_tracker.rs +++ /dev/null @@ -1,532 +0,0 @@ -// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use alloy::primitives::Address; -use std::{ - collections::{HashMap, HashSet, VecDeque}, - time::{Duration, Instant}, -}; -use tracing::error; - -#[derive(Debug, Clone, Default)] -struct ExpiringSum { - entries: VecDeque<(Instant, u128)>, - sum: u128, -} - -impl ExpiringSum { - fn get_sum(&mut self, duration: &Duration) -> u128 { - self.cleanup(duration); - self.sum - } - - fn get_count(&mut self, duration: &Duration) -> u64 { - self.cleanup(duration); - self.entries.len() as u64 - } - - fn cleanup(&mut self, duration: &Duration) { - let now = Instant::now(); - while let Some(&(timestamp, value)) = self.entries.front() { - if now.duration_since(timestamp) >= *duration { - self.entries.pop_front(); - self.sum -= value; - } else { - break; - } - } - } -} - -#[derive(Debug, Clone, Default)] -pub struct FeeCounter { - fee: u128, - count: u64, -} - -#[derive(Debug, Clone, Default)] -pub struct SenderFeeTracker { - id_to_fee: HashMap, - total_fee: u128, - - fees_requesting: u128, - ids_requesting: HashSet
, - - buffer_window_fee: HashMap, - buffer_window_duration: Duration, - // there are some allocations that we don't want it to be - // heaviest allocation, because they are already marked for finalization, - // and thus requesting RAVs on their own in their `post_stop` routine. - blocked_addresses: HashSet
, - failed_ravs: HashMap, -} - -#[derive(Debug, Clone)] -pub struct FailedRavInfo { - failed_ravs_count: u32, - failed_rav_backoff_time: Instant, -} - -impl Default for FailedRavInfo { - fn default() -> Self { - Self { - failed_ravs_count: 0, - failed_rav_backoff_time: Instant::now(), - } - } -} - -impl SenderFeeTracker { - pub fn new(buffer_window_duration: Duration) -> Self { - Self { - buffer_window_duration, - ..Default::default() - } - } - /// Adds into the total_fee entry and buffer window totals - /// - /// It's important to notice that `value` cannot be less than - /// zero, so the only way to make this counter lower is by using - /// `update` function - pub fn add(&mut self, id: Address, value: u128) { - if self.buffer_window_duration > Duration::ZERO { - let now = Instant::now(); - let expiring_sum = self.buffer_window_fee.entry(id).or_default(); - expiring_sum.entries.push_back((now, value)); - expiring_sum.sum += value; - } - self.total_fee += value; - - let entry = self.id_to_fee.entry(id).or_default(); - entry.fee += value; - entry.count += 1; - } - - /// Updates and overwrite the fee counter into the specific - /// value provided. - /// - /// IMPORTANT: This function does not affect the buffer window fee - pub fn update(&mut self, id: Address, fee: u128, counter: u64) { - if fee > 0 { - // insert or update, if update remove old fee from total - if let Some(old_fee) = self.id_to_fee.insert( - id, - FeeCounter { - fee, - count: counter, - }, - ) { - self.total_fee -= old_fee.fee; - } - self.total_fee = self.total_fee.checked_add(fee).unwrap_or_else(|| { - // This should never happen, but if it does, we want to know about it. - error!( - "Overflow when adding receipt value {} to total fee {}. \ - Setting total fee to u128::MAX.", - fee, self.total_fee - ); - u128::MAX - }); - } else if let Some(old_fee) = self.id_to_fee.remove(&id) { - self.total_fee -= old_fee.fee; - } - } - - pub fn block_allocation_id(&mut self, address: Address) { - self.blocked_addresses.insert(address); - } - - pub fn unblock_allocation_id(&mut self, address: Address) { - self.blocked_addresses.remove(&address); - } - - pub fn get_heaviest_allocation_id(&mut self) -> Option
{ - // just loop over and get the biggest fee - let now = Instant::now(); - self.id_to_fee - .iter() - .filter(|(addr, _)| !self.blocked_addresses.contains(*addr)) - .filter(|(addr, _)| !self.ids_requesting.contains(*addr)) - .filter(|(addr, _)| { - self.failed_ravs - .get(*addr) - .map(|failed_rav| now > failed_rav.failed_rav_backoff_time) - .unwrap_or(true) - }) - // map to the value minus fees in buffer - .map(|(addr, fee)| { - ( - addr, - fee.fee - - self - .buffer_window_fee - .get_mut(addr) - .map(|expiring| expiring.get_sum(&self.buffer_window_duration)) - .unwrap_or_default(), - ) - }) - .filter(|(_, fee)| *fee > 0) - .fold(None, |acc: Option<(&Address, u128)>, (addr, fee)| { - if let Some((_, max_fee)) = acc { - if fee > max_fee { - Some((addr, fee)) - } else { - acc - } - } else { - Some((addr, fee)) - } - }) - .map(|(&id, _)| id) - } - - pub fn get_list_of_allocation_ids(&self) -> HashSet
{ - self.id_to_fee.keys().cloned().collect() - } - - pub fn get_total_fee(&self) -> u128 { - self.total_fee - self.fees_requesting - } - - pub fn get_total_fee_outside_buffer(&mut self) -> u128 { - self.get_total_fee() - self.get_buffer_fee().min(self.total_fee) - } - - pub fn get_total_counter_outside_buffer_for_allocation( - &mut self, - allocation_id: &Address, - ) -> u64 { - let Some(allocation_counter) = self - .id_to_fee - .get(allocation_id) - .map(|fee_counter| fee_counter.count) - else { - return 0; - }; - let counter_in_buffer = self - .buffer_window_fee - .get_mut(allocation_id) - .map(|window| window.get_count(&self.buffer_window_duration)) - .unwrap_or(0); - allocation_counter - counter_in_buffer - } - - pub fn get_buffer_fee(&mut self) -> u128 { - self.buffer_window_fee - .values_mut() - .fold(0u128, |acc, expiring| { - acc + expiring.get_sum(&self.buffer_window_duration) - }) - } - - pub fn start_rav_request(&mut self, allocation_id: Address) { - let current_fee = self.id_to_fee.entry(allocation_id).or_default(); - self.ids_requesting.insert(allocation_id); - self.fees_requesting += current_fee.fee; - } - - /// Should be called before `update` - pub fn finish_rav_request(&mut self, allocation_id: Address) { - let current_fee = self.id_to_fee.entry(allocation_id).or_default(); - self.fees_requesting -= current_fee.fee; - self.ids_requesting.remove(&allocation_id); - } - - pub fn failed_rav_backoff(&mut self, allocation_id: Address) { - // backoff = max(100ms * 2 ^ retries, 60s) - let failed_rav = self.failed_ravs.entry(allocation_id).or_default(); - failed_rav.failed_rav_backoff_time = Instant::now() - + (Duration::from_millis(100) * 2u32.pow(failed_rav.failed_ravs_count)) - .min(Duration::from_secs(60)); - failed_rav.failed_ravs_count += 1; - } - pub fn ok_rav_request(&mut self, allocation_id: Address) { - self.failed_ravs.remove(&allocation_id); - } - - pub fn check_allocation_has_rav_request_running(&self, allocation_id: Address) -> bool { - self.ids_requesting.contains(&allocation_id) - } -} - -#[cfg(test)] -mod tests { - use super::SenderFeeTracker; - use alloy::primitives::address; - use std::{thread::sleep, time::Duration}; - - #[test] - fn test_allocation_id_tracker() { - let allocation_id_0 = address!("abababababababababababababababababababab"); - let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); - let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"); - - let mut tracker = SenderFeeTracker::default(); - assert_eq!(tracker.get_heaviest_allocation_id(), None); - assert_eq!(tracker.get_total_fee(), 0); - - tracker.update(allocation_id_0, 10, 0); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); - assert_eq!(tracker.get_total_fee(), 10); - - tracker.block_allocation_id(allocation_id_0); - assert_eq!(tracker.get_heaviest_allocation_id(), None); - assert_eq!(tracker.get_total_fee(), 10); - - tracker.unblock_allocation_id(allocation_id_0); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); - - tracker.update(allocation_id_2, 20, 0); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); - assert_eq!(tracker.get_total_fee(), 30); - - tracker.block_allocation_id(allocation_id_2); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); - assert_eq!(tracker.get_total_fee(), 30); - - tracker.unblock_allocation_id(allocation_id_2); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); - - tracker.update(allocation_id_1, 30, 0); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); - assert_eq!(tracker.get_total_fee(), 60); - - tracker.update(allocation_id_2, 10, 0); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); - assert_eq!(tracker.get_total_fee(), 50); - - tracker.update(allocation_id_2, 40, 0); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); - assert_eq!(tracker.get_total_fee(), 80); - - tracker.update(allocation_id_1, 0, 0); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); - assert_eq!(tracker.get_total_fee(), 50); - - tracker.update(allocation_id_2, 0, 0); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); - assert_eq!(tracker.get_total_fee(), 10); - - tracker.update(allocation_id_0, 0, 0); - assert_eq!(tracker.get_heaviest_allocation_id(), None); - assert_eq!(tracker.get_total_fee(), 0); - } - - #[test] - fn test_buffer_tracker_window() { - let allocation_id_0 = address!("abababababababababababababababababababab"); - let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); - let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"); - - const BUFFER_WINDOW: Duration = Duration::from_millis(20); - let mut tracker = SenderFeeTracker::new(BUFFER_WINDOW); - assert_eq!(tracker.get_heaviest_allocation_id(), None); - assert_eq!(tracker.get_total_fee_outside_buffer(), 0); - assert_eq!(tracker.get_total_fee(), 0); - - tracker.add(allocation_id_0, 10); - assert_eq!(tracker.get_heaviest_allocation_id(), None); - assert_eq!(tracker.get_total_fee_outside_buffer(), 0); - assert_eq!(tracker.get_total_fee(), 10); - - sleep(BUFFER_WINDOW); - - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); - assert_eq!(tracker.get_total_fee_outside_buffer(), 10); - assert_eq!(tracker.get_total_fee(), 10); - - tracker.add(allocation_id_2, 20); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); - assert_eq!(tracker.get_total_fee_outside_buffer(), 10); - assert_eq!(tracker.get_total_fee(), 30); - - sleep(BUFFER_WINDOW); - - tracker.block_allocation_id(allocation_id_2); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); - assert_eq!(tracker.get_total_fee_outside_buffer(), 30); - assert_eq!(tracker.get_total_fee(), 30); - - tracker.unblock_allocation_id(allocation_id_2); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); - - tracker.add(allocation_id_1, 30); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); - assert_eq!(tracker.get_total_fee_outside_buffer(), 30); - assert_eq!(tracker.get_total_fee(), 60); - - sleep(BUFFER_WINDOW); - - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); - assert_eq!(tracker.get_total_fee_outside_buffer(), 60); - assert_eq!(tracker.get_total_fee(), 60); - - tracker.add(allocation_id_2, 20); - tracker.update(allocation_id_2, 0, 0); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); - assert_eq!(tracker.get_total_fee_outside_buffer(), 20); - assert_eq!(tracker.get_total_fee(), 40); - - sleep(BUFFER_WINDOW); - - tracker.add(allocation_id_2, 100); - tracker.update(allocation_id_2, 0, 0); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); - assert_eq!(tracker.get_total_fee_outside_buffer(), 0); - assert_eq!(tracker.get_total_fee(), 40); - - sleep(BUFFER_WINDOW); - - tracker.update(allocation_id_1, 0, 0); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); - assert_eq!(tracker.get_total_fee_outside_buffer(), 10); - assert_eq!(tracker.get_total_fee(), 10); - - tracker.update(allocation_id_0, 0, 0); - assert_eq!(tracker.get_heaviest_allocation_id(), None); - assert_eq!(tracker.get_total_fee_outside_buffer(), 0); - assert_eq!(tracker.get_total_fee(), 0); - } - - #[test] - fn test_filtered_backed_off_allocations() { - let allocation_id_0 = address!("abababababababababababababababababababab"); - let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); - const BACK_SLEEP_DURATION: Duration = Duration::from_millis(201); - - let mut tracker = SenderFeeTracker::default(); - assert_eq!(tracker.get_heaviest_allocation_id(), None); - assert_eq!(tracker.get_total_fee(), 0); - - tracker.update(allocation_id_0, 10, 0); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); - assert_eq!(tracker.get_total_fee(), 10); - - tracker.update(allocation_id_1, 20, 0); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); - assert_eq!(tracker.get_total_fee(), 30); - - // Simulate failed rav and backoff - tracker.failed_rav_backoff(allocation_id_1); - - // Heaviest should be the first since its not blocked nor failed - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); - assert_eq!(tracker.get_total_fee(), 30); - - sleep(BACK_SLEEP_DURATION); - - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); - assert_eq!(tracker.get_total_fee(), 30); - } - - #[test] - fn test_ongoing_rav_requests() { - let allocation_id_0 = address!("abababababababababababababababababababab"); - let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); - let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"); - - let mut tracker = SenderFeeTracker::default(); - assert_eq!(tracker.get_heaviest_allocation_id(), None); - assert_eq!(tracker.get_total_fee_outside_buffer(), 0); - assert_eq!(tracker.get_total_fee(), 0); - - tracker.add(allocation_id_0, 10); - tracker.add(allocation_id_1, 20); - tracker.add(allocation_id_2, 30); - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); - assert_eq!(tracker.get_total_fee(), 60); - assert_eq!(tracker.get_total_fee_outside_buffer(), 60); - - tracker.start_rav_request(allocation_id_2); - - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); - assert_eq!(tracker.get_total_fee(), 30); - assert_eq!(tracker.get_total_fee_outside_buffer(), 30); - - tracker.finish_rav_request(allocation_id_2); - - assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); - assert_eq!(tracker.get_total_fee(), 60); - assert_eq!(tracker.get_total_fee_outside_buffer(), 60); - } - - #[test] - fn check_counter_and_fee_outside_buffer_unordered() { - let allocation_id_0 = address!("abababababababababababababababababababab"); - let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"); - - const BUFFER_WINDOW: Duration = Duration::from_millis(20); - let mut tracker = SenderFeeTracker::new(BUFFER_WINDOW); - assert_eq!(tracker.get_total_fee_outside_buffer(), 0); - assert_eq!( - tracker.get_total_counter_outside_buffer_for_allocation(&allocation_id_0), - 0 - ); - - tracker.add(allocation_id_0, 10); - assert_eq!(tracker.get_total_fee_outside_buffer(), 0); - assert_eq!( - tracker.get_total_counter_outside_buffer_for_allocation(&allocation_id_0), - 0 - ); - - sleep(BUFFER_WINDOW); - - assert_eq!(tracker.get_total_fee_outside_buffer(), 10); - assert_eq!( - tracker.get_total_counter_outside_buffer_for_allocation(&allocation_id_0), - 1 - ); - - tracker.add(allocation_id_2, 20); - assert_eq!( - tracker.get_total_counter_outside_buffer_for_allocation(&allocation_id_2), - 0 - ); - assert_eq!(tracker.get_total_fee_outside_buffer(), 10); - - sleep(BUFFER_WINDOW); - - tracker.block_allocation_id(allocation_id_2); - assert_eq!( - tracker.get_total_counter_outside_buffer_for_allocation(&allocation_id_2), - 1 - ); - assert_eq!(tracker.get_total_fee_outside_buffer(), 30); - } - - #[test] - fn check_get_count_updates_sum() { - let allocation_id_0 = address!("abababababababababababababababababababab"); - - const BUFFER_WINDOW: Duration = Duration::from_millis(20); - let mut tracker = SenderFeeTracker::new(BUFFER_WINDOW); - - tracker.add(allocation_id_0, 10); - let expiring_sum = tracker - .buffer_window_fee - .get_mut(&allocation_id_0) - .expect("there should be something here"); - assert_eq!(expiring_sum.get_sum(&BUFFER_WINDOW), 10); - assert_eq!(expiring_sum.get_count(&BUFFER_WINDOW), 1); - - sleep(BUFFER_WINDOW); - - assert_eq!(expiring_sum.get_sum(&BUFFER_WINDOW), 0); - assert_eq!(expiring_sum.get_count(&BUFFER_WINDOW), 0); - - tracker.add(allocation_id_0, 10); - let expiring_sum = tracker - .buffer_window_fee - .get_mut(&allocation_id_0) - .expect("there should be something here"); - - assert_eq!(expiring_sum.get_count(&BUFFER_WINDOW), 1); - assert_eq!(expiring_sum.get_sum(&BUFFER_WINDOW), 10); - - sleep(BUFFER_WINDOW); - - assert_eq!(expiring_sum.get_count(&BUFFER_WINDOW), 0); - assert_eq!(expiring_sum.get_sum(&BUFFER_WINDOW), 0); - } -} diff --git a/tap-agent/src/agent/unaggregated_receipts.rs b/tap-agent/src/agent/unaggregated_receipts.rs index 863ed97bf..6f40b9c22 100644 --- a/tap-agent/src/agent/unaggregated_receipts.rs +++ b/tap-agent/src/agent/unaggregated_receipts.rs @@ -1,7 +1,7 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -#[derive(Default, Debug, Clone, Eq, PartialEq)] +#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)] pub struct UnaggregatedReceipts { pub value: u128, /// The ID of the last receipt value added to the unaggregated fees value. diff --git a/tap-agent/src/lib.rs b/tap-agent/src/lib.rs index 53593a9ca..121b14f40 100644 --- a/tap-agent/src/lib.rs +++ b/tap-agent/src/lib.rs @@ -20,3 +20,4 @@ pub mod config; pub mod database; pub mod metrics; pub mod tap; +pub mod tracker; diff --git a/tap-agent/src/tracker.rs b/tap-agent/src/tracker.rs new file mode 100644 index 000000000..d795118eb --- /dev/null +++ b/tap-agent/src/tracker.rs @@ -0,0 +1,28 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +pub use extra_data::{DefaultFromExtra, DurationInfo, NoExtraData}; +use generic_tracker::GenericTracker; +pub use sender_fee_stats::SenderFeeStats; + +mod extra_data; +mod generic_tracker; +mod global_tracker; +mod sender_fee_stats; +#[cfg(test)] +mod tracker_tests; + +pub use generic_tracker::GlobalFeeTracker; + +use crate::agent::unaggregated_receipts::UnaggregatedReceipts; + +pub type SimpleFeeTracker = GenericTracker; +pub type SenderFeeTracker = + GenericTracker; + +pub trait AllocationStats { + fn update(&mut self, v: U); + fn is_allowed_to_trigger_rav_request(&self) -> bool; + fn get_stats(&self) -> U; + fn get_valid_fee(&mut self) -> u128; +} diff --git a/tap-agent/src/tracker/extra_data.rs b/tap-agent/src/tracker/extra_data.rs new file mode 100644 index 000000000..b86573318 --- /dev/null +++ b/tap-agent/src/tracker/extra_data.rs @@ -0,0 +1,25 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; + +pub trait DefaultFromExtra { + fn default_from_extra(extra: &E) -> Self; +} + +#[derive(Debug, Clone)] +pub struct DurationInfo { + pub(super) buffer_duration: Duration, +} + +#[derive(Debug, Clone, Default)] +pub struct NoExtraData; + +impl DefaultFromExtra for T +where + T: Default, +{ + fn default_from_extra(_: &NoExtraData) -> Self { + Default::default() + } +} diff --git a/tap-agent/src/tracker/generic_tracker.rs b/tap-agent/src/tracker/generic_tracker.rs new file mode 100644 index 000000000..99dfa69cf --- /dev/null +++ b/tap-agent/src/tracker/generic_tracker.rs @@ -0,0 +1,241 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use crate::agent::unaggregated_receipts::UnaggregatedReceipts; + +use super::{ + global_tracker::GlobalTracker, AllocationStats, DefaultFromExtra, DurationInfo, SenderFeeStats, +}; +use alloy::primitives::Address; +use std::{ + collections::{HashMap, HashSet}, + marker::PhantomData, + ops::{AddAssign, SubAssign}, + time::Duration, +}; + +#[derive(Debug, Clone, Copy, Default, PartialEq)] +pub struct GlobalFeeTracker { + pub(super) requesting: u128, + pub(super) total_fee: u128, +} + +impl PartialOrd for GlobalFeeTracker { + fn partial_cmp(&self, other: &Self) -> Option { + self.get_total_fee().partial_cmp(&other.get_total_fee()) + } +} + +impl SubAssign for GlobalFeeTracker { + fn sub_assign(&mut self, rhs: UnaggregatedReceipts) { + self.total_fee -= rhs.value; + } +} + +impl AddAssign for GlobalFeeTracker { + fn add_assign(&mut self, rhs: UnaggregatedReceipts) { + self.total_fee += rhs.value; + } +} + +#[derive(Debug, Clone, Default)] +pub struct GenericTracker +where + G: GlobalTracker, + F: AllocationStats + DefaultFromExtra, +{ + pub(super) global: G, + pub(super) id_to_fee: HashMap, + pub(super) extra_data: E, + + _update: PhantomData, +} + +impl GenericTracker +where + G: GlobalTracker, + U: Copy, + F: AllocationStats + DefaultFromExtra, +{ + /// Updates and overwrite the fee counter into the specific + /// value provided. + /// + /// IMPORTANT: This function does not affect the buffer window fee + pub fn update(&mut self, id: Address, value: U) { + // insert or update, if update remove old fee from total + let fee = self + .id_to_fee + .entry(id) + .or_insert(F::default_from_extra(&self.extra_data)); + self.global -= fee.get_stats(); + fee.update(value); + self.global += value; + } + + pub fn remove(&mut self, id: Address) { + if let Some(old_fee) = self.id_to_fee.remove(&id) { + self.global -= old_fee.get_stats(); + } + } + + pub fn get_heaviest_allocation_id(&mut self) -> Option
{ + // just loop over and get the biggest fee + self.id_to_fee + .iter_mut() + .filter(|(_, fee)| fee.is_allowed_to_trigger_rav_request()) + .fold(None, |acc: Option<(&Address, u128)>, (addr, value)| { + if let Some((_, max_fee)) = acc { + if value.get_valid_fee() > max_fee { + Some((addr, value.get_valid_fee())) + } else { + acc + } + } else { + Some((addr, value.get_valid_fee())) + } + }) + .filter(|(_, fee)| *fee > 0) + .map(|(&id, _)| id) + } + + pub fn get_list_of_allocation_ids(&self) -> HashSet
{ + self.id_to_fee.keys().cloned().collect() + } + + pub fn get_total_fee(&self) -> u128 { + self.global.get_total_fee() + } +} + +impl GenericTracker { + pub fn new(buffer_duration: Duration) -> Self { + Self { + extra_data: DurationInfo { buffer_duration }, + global: Default::default(), + id_to_fee: Default::default(), + _update: Default::default(), + } + } + + /// Adds into the total_fee entry and buffer window totals + /// + /// It's important to notice that `value` cannot be less than + /// zero, so the only way to make this counter lower is by using + /// `update` function + pub fn add(&mut self, id: Address, value: u128, timestamp_ns: u64) { + let contains_buffer = self.contains_buffer(); + let entry = self + .id_to_fee + .entry(id) + .or_insert(SenderFeeStats::default_from_extra(&self.extra_data)); + + // fee + self.global.total_fee += value; + entry.total_fee += value; + + // counter for allocation + entry.count += 1; + + if contains_buffer { + entry.buffer_info.new_entry(value, timestamp_ns); + } + } + + fn contains_buffer(&self) -> bool { + self.extra_data.buffer_duration > Duration::ZERO + } + + pub fn get_ravable_total_fee(&mut self) -> u128 { + self.get_total_fee() - self.get_buffered_fee().min(self.global.total_fee) + } + + fn get_buffered_fee(&mut self) -> u128 { + self.id_to_fee + .values_mut() + .fold(0u128, |acc, expiring| acc + expiring.buffer_info.get_sum()) + } + + pub fn get_count_outside_buffer_for_allocation(&mut self, allocation_id: &Address) -> u64 { + self.id_to_fee + .get_mut(allocation_id) + .map(|alloc| alloc.ravable_count()) + .unwrap_or_default() + } + + pub fn start_rav_request(&mut self, allocation_id: Address) { + let entry = self + .id_to_fee + .entry(allocation_id) + .or_insert(SenderFeeStats::default_from_extra(&self.extra_data)); + entry.requesting = true; + self.global.requesting += entry.total_fee; + } + + /// Should be called before `update` + pub fn finish_rav_request(&mut self, allocation_id: Address) { + let entry = self + .id_to_fee + .entry(allocation_id) + .or_insert(SenderFeeStats::default_from_extra(&self.extra_data)); + entry.requesting = false; + self.global.requesting -= entry.total_fee; + } + + pub fn ok_rav_request(&mut self, allocation_id: Address) { + let entry = self + .id_to_fee + .entry(allocation_id) + .or_insert(SenderFeeStats::default_from_extra(&self.extra_data)); + entry.backoff_info.ok(); + } + + pub fn failed_rav_backoff(&mut self, allocation_id: Address) { + let entry = self + .id_to_fee + .entry(allocation_id) + .or_insert(SenderFeeStats::default_from_extra(&self.extra_data)); + entry.backoff_info.fail(); + } +} + +impl GenericTracker +where + G: GlobalTracker, +{ + pub fn block_allocation_id(&mut self, address: Address) { + self.id_to_fee.entry(address).and_modify(|v| { + v.blocked = true; + }); + } + + pub fn unblock_allocation_id(&mut self, address: Address) { + self.id_to_fee.entry(address).and_modify(|v| { + v.blocked = false; + }); + } + + pub fn can_trigger_rav(&self, allocation_id: Address) -> bool { + self.id_to_fee + .get(&allocation_id) + .map(|alloc| alloc.is_allowed_to_trigger_rav_request()) + .unwrap_or_default() + } +} + +impl AllocationStats for u128 { + fn update(&mut self, v: u128) { + *self = v; + } + + fn is_allowed_to_trigger_rav_request(&self) -> bool { + *self > 0 + } + + fn get_stats(&self) -> u128 { + *self + } + + fn get_valid_fee(&mut self) -> u128 { + self.get_stats() + } +} diff --git a/tap-agent/src/tracker/global_tracker.rs b/tap-agent/src/tracker/global_tracker.rs new file mode 100644 index 000000000..33ae9ccd1 --- /dev/null +++ b/tap-agent/src/tracker/global_tracker.rs @@ -0,0 +1,24 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::ops::{AddAssign, SubAssign}; + +use crate::agent::unaggregated_receipts::UnaggregatedReceipts; + +use super::GlobalFeeTracker; + +pub trait GlobalTracker: SubAssign + AddAssign + Sized { + fn get_total_fee(&self) -> u128; +} + +impl GlobalTracker for u128 { + fn get_total_fee(&self) -> u128 { + *self + } +} + +impl GlobalTracker for GlobalFeeTracker { + fn get_total_fee(&self) -> u128 { + self.total_fee - self.requesting + } +} diff --git a/tap-agent/src/tracker/sender_fee_stats.rs b/tap-agent/src/tracker/sender_fee_stats.rs new file mode 100644 index 000000000..9893e6782 --- /dev/null +++ b/tap-agent/src/tracker/sender_fee_stats.rs @@ -0,0 +1,156 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + collections::VecDeque, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; + +use crate::agent::unaggregated_receipts::UnaggregatedReceipts; + +use super::{AllocationStats, DefaultFromExtra, DurationInfo}; + +#[derive(Debug, Clone, Default)] +pub struct SenderFeeStats { + pub(super) total_fee: u128, + pub(super) count: u64, + // there are some allocations that we don't want it to be + // heaviest allocation, because they are already marked for finalization, + // and thus requesting RAVs on their own in their `post_stop` routine. + pub(super) blocked: bool, + pub(super) requesting: bool, + + // Buffer info + pub(super) buffer_info: BufferInfo, + + // Backoff info + pub(super) backoff_info: BackoffInfo, +} + +impl SenderFeeStats { + pub(super) fn ravable_count(&mut self) -> u64 { + let allocation_counter = self.count; + let counter_in_buffer = self.buffer_info.get_count(); + allocation_counter - counter_in_buffer + } +} + +#[derive(Debug, Clone, Default)] +pub struct BufferInfo { + pub entries: VecDeque<(SystemTime, u128)>, + pub fee_in_buffer: u128, + pub duration: Duration, +} + +impl BufferInfo { + pub(super) fn new_entry(&mut self, value: u128, timestamp_ns: u64) { + let duration_since_epoch = Duration::from_nanos(timestamp_ns); + // Create a SystemTime from the UNIX_EPOCH plus the duration + let system_time = UNIX_EPOCH + duration_since_epoch; + let system_time = system_time + .checked_add(self.duration) + .expect("Should be within bounds"); + self.entries.push_back((system_time, value)); + self.fee_in_buffer += value; + } + + pub(super) fn get_sum(&mut self) -> u128 { + self.cleanup(); + self.fee_in_buffer + } + + pub(super) fn get_count(&mut self) -> u64 { + self.cleanup(); + self.entries.len() as u64 + } + + // O(Receipts expired) + fn cleanup(&mut self) -> (u128, u64) { + let now = SystemTime::now(); + + let mut total_value_expired = 0; + let mut total_count_expired = 0; + while let Some(&(timestamp, value)) = self.entries.front() { + if now >= timestamp { + self.entries.pop_front(); + total_value_expired += value; + total_count_expired += 1; + } else { + break; + } + } + self.fee_in_buffer -= total_value_expired; + (total_value_expired, total_count_expired) + } +} + +#[derive(Debug, Clone)] +pub struct BackoffInfo { + failed_ravs_count: u32, + failed_rav_backoff_time: Instant, +} + +impl BackoffInfo { + pub fn ok(&mut self) { + self.failed_ravs_count = 0; + } + + pub fn fail(&mut self) { + // backoff = max(100ms * 2 ^ retries, 60s) + self.failed_rav_backoff_time = Instant::now() + + (Duration::from_millis(100) * 2u32.pow(self.failed_ravs_count)) + .min(Duration::from_secs(60)); + self.failed_ravs_count += 1; + } + + pub fn in_backoff(&self) -> bool { + let now = Instant::now(); + now < self.failed_rav_backoff_time + } +} + +impl DefaultFromExtra for SenderFeeStats { + fn default_from_extra(extra: &DurationInfo) -> Self { + SenderFeeStats { + buffer_info: BufferInfo { + duration: extra.buffer_duration, + ..Default::default() + }, + ..Default::default() + } + } +} + +impl Default for BackoffInfo { + fn default() -> Self { + Self { + failed_ravs_count: 0, + failed_rav_backoff_time: Instant::now(), + } + } +} + +impl AllocationStats for SenderFeeStats { + fn update(&mut self, v: UnaggregatedReceipts) { + self.total_fee = v.value; + self.count = v.counter; + } + + fn is_allowed_to_trigger_rav_request(&self) -> bool { + !self.backoff_info.in_backoff() && !self.blocked && !self.requesting + } + + fn get_stats(&self) -> UnaggregatedReceipts { + UnaggregatedReceipts { + value: self.total_fee, + counter: self.count, + + // TODO remove use of UnaggregatedReceipts + last_id: 0, + } + } + + fn get_valid_fee(&mut self) -> u128 { + self.total_fee - self.buffer_info.get_sum().min(self.total_fee) + } +} diff --git a/tap-agent/src/tracker/tracker_tests.rs b/tap-agent/src/tracker/tracker_tests.rs new file mode 100644 index 000000000..ee9411c49 --- /dev/null +++ b/tap-agent/src/tracker/tracker_tests.rs @@ -0,0 +1,357 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{agent::unaggregated_receipts::UnaggregatedReceipts, tracker::SenderFeeTracker}; + +use super::SimpleFeeTracker; +use alloy::primitives::address; +use std::{ + thread::sleep, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +#[test] +fn test_allocation_id_tracker() { + let allocation_id_0 = address!("abababababababababababababababababababab"); + let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); + let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"); + + let mut tracker = SimpleFeeTracker::default(); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_total_fee(), 0); + + tracker.update(allocation_id_0, 10); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_total_fee(), 10); + + tracker.update(allocation_id_2, 20); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + assert_eq!(tracker.get_total_fee(), 30); + + tracker.update(allocation_id_1, 30); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_total_fee(), 60); + + tracker.update(allocation_id_2, 10); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_total_fee(), 50); + + tracker.update(allocation_id_2, 40); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + assert_eq!(tracker.get_total_fee(), 80); + + tracker.update(allocation_id_1, 0); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + assert_eq!(tracker.get_total_fee(), 50); + + tracker.update(allocation_id_2, 0); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_total_fee(), 10); + + tracker.update(allocation_id_0, 0); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_total_fee(), 0); +} + +impl From for UnaggregatedReceipts { + fn from(value: u128) -> Self { + UnaggregatedReceipts { + value, + last_id: 0, + counter: 0, + } + } +} + +#[test] +fn test_blocking_allocations() { + let allocation_id_0 = address!("abababababababababababababababababababab"); + let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); + let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"); + + const BUFFER_WINDOW: Duration = Duration::from_millis(0); + let mut tracker = SenderFeeTracker::new(BUFFER_WINDOW); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_total_fee(), 0); + + tracker.update(allocation_id_0, 10.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_total_fee(), 10); + + tracker.block_allocation_id(allocation_id_0); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_total_fee(), 10); + + tracker.unblock_allocation_id(allocation_id_0); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + + tracker.update(allocation_id_2, 20.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + assert_eq!(tracker.get_total_fee(), 30); + + tracker.block_allocation_id(allocation_id_2); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_total_fee(), 30); + + tracker.unblock_allocation_id(allocation_id_2); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + + tracker.update(allocation_id_1, 30.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_total_fee(), 60); + + tracker.update(allocation_id_2, 10.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_total_fee(), 50); + + tracker.update(allocation_id_2, 40.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + assert_eq!(tracker.get_total_fee(), 80); + + tracker.update(allocation_id_1, 0.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + assert_eq!(tracker.get_total_fee(), 50); + + tracker.update(allocation_id_2, 0.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_total_fee(), 10); + + tracker.update(allocation_id_0, 0.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_total_fee(), 0); +} + +fn get_current_timestamp_u64_ns() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() as u64 +} + +#[test] +fn test_buffer_tracker_window() { + let allocation_id_0 = address!("abababababababababababababababababababab"); + let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); + let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"); + + const BUFFER_WINDOW: Duration = Duration::from_millis(20); + let mut tracker = SenderFeeTracker::new(BUFFER_WINDOW); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_ravable_total_fee(), 0); + assert_eq!(tracker.get_total_fee(), 0); + + tracker.add(allocation_id_0, 10, get_current_timestamp_u64_ns()); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_ravable_total_fee(), 0); + assert_eq!(tracker.get_total_fee(), 10); + + sleep(BUFFER_WINDOW); + + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_ravable_total_fee(), 10); + assert_eq!(tracker.get_total_fee(), 10); + + tracker.add(allocation_id_2, 20, get_current_timestamp_u64_ns()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_ravable_total_fee(), 10); + assert_eq!(tracker.get_total_fee(), 30); + + sleep(BUFFER_WINDOW); + + tracker.block_allocation_id(allocation_id_2); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_ravable_total_fee(), 30); + assert_eq!(tracker.get_total_fee(), 30); + + tracker.unblock_allocation_id(allocation_id_2); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + + tracker.add(allocation_id_1, 30, get_current_timestamp_u64_ns()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + assert_eq!(tracker.get_ravable_total_fee(), 30); + assert_eq!(tracker.get_total_fee(), 60); + + sleep(BUFFER_WINDOW); + + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_ravable_total_fee(), 60); + assert_eq!(tracker.get_total_fee(), 60); + + tracker.add(allocation_id_2, 20, get_current_timestamp_u64_ns()); + // we just removed, the buffer should have been removed as well + tracker.update(allocation_id_2, 0.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_ravable_total_fee(), 20); + assert_eq!(tracker.get_total_fee(), 40); + + sleep(BUFFER_WINDOW); + + tracker.add(allocation_id_2, 200, get_current_timestamp_u64_ns()); + tracker.update(allocation_id_2, 100.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_ravable_total_fee(), 0); + assert_eq!(tracker.get_total_fee(), 140); + + sleep(BUFFER_WINDOW); + + tracker.update(allocation_id_2, 0.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_ravable_total_fee(), 40); + assert_eq!(tracker.get_total_fee(), 40); + + tracker.update(allocation_id_1, 0.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_ravable_total_fee(), 10); + assert_eq!(tracker.get_total_fee(), 10); + + tracker.update(allocation_id_0, 0.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_ravable_total_fee(), 0); + assert_eq!(tracker.get_total_fee(), 0); +} + +#[test] +fn test_filtered_backed_off_allocations() { + let allocation_id_0 = address!("abababababababababababababababababababab"); + let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); + const BACK_SLEEP_DURATION: Duration = Duration::from_millis(201); + + const BUFFER_WINDOW: Duration = Duration::from_millis(0); + let mut tracker = SenderFeeTracker::new(BUFFER_WINDOW); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_total_fee(), 0); + + tracker.update(allocation_id_0, 10.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_total_fee(), 10); + + tracker.update(allocation_id_1, 20.into()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_total_fee(), 30); + + // Simulate failed rav and backoff + tracker.failed_rav_backoff(allocation_id_1); + + // Heaviest should be the first since its not blocked nor failed + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_total_fee(), 30); + + sleep(BACK_SLEEP_DURATION); + + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_total_fee(), 30); +} + +#[test] +fn test_ongoing_rav_requests() { + let allocation_id_0 = address!("abababababababababababababababababababab"); + let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); + let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"); + + const BUFFER_WINDOW: Duration = Duration::from_millis(0); + let mut tracker = SenderFeeTracker::new(BUFFER_WINDOW); + + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_ravable_total_fee(), 0); + assert_eq!(tracker.get_total_fee(), 0); + + tracker.add(allocation_id_0, 10, get_current_timestamp_u64_ns()); + tracker.add(allocation_id_1, 20, get_current_timestamp_u64_ns()); + tracker.add(allocation_id_2, 30, get_current_timestamp_u64_ns()); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + assert_eq!(tracker.get_total_fee(), 60); + assert_eq!(tracker.get_ravable_total_fee(), 60); + + tracker.start_rav_request(allocation_id_2); + + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_total_fee(), 30); + assert_eq!(tracker.get_ravable_total_fee(), 30); + + tracker.finish_rav_request(allocation_id_2); + + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + assert_eq!(tracker.get_total_fee(), 60); + assert_eq!(tracker.get_ravable_total_fee(), 60); +} + +#[test] +fn check_counter_and_fee_outside_buffer_unordered() { + let allocation_id_0 = address!("abababababababababababababababababababab"); + let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"); + + const BUFFER_WINDOW: Duration = Duration::from_millis(20); + let mut tracker = SenderFeeTracker::new(BUFFER_WINDOW); + + assert_eq!(tracker.get_ravable_total_fee(), 0); + assert_eq!( + tracker.get_count_outside_buffer_for_allocation(&allocation_id_0), + 0 + ); + + tracker.add(allocation_id_0, 10, get_current_timestamp_u64_ns()); + assert_eq!(tracker.get_ravable_total_fee(), 0); + assert_eq!( + tracker.get_count_outside_buffer_for_allocation(&allocation_id_0), + 0 + ); + + sleep(BUFFER_WINDOW); + + assert_eq!(tracker.get_ravable_total_fee(), 10); + assert_eq!( + tracker.get_count_outside_buffer_for_allocation(&allocation_id_0), + 1 + ); + + tracker.add(allocation_id_2, 20, get_current_timestamp_u64_ns()); + assert_eq!( + tracker.get_count_outside_buffer_for_allocation(&allocation_id_2), + 0 + ); + assert_eq!(tracker.get_ravable_total_fee(), 10); + + sleep(BUFFER_WINDOW); + + tracker.block_allocation_id(allocation_id_2); + assert_eq!( + tracker.get_count_outside_buffer_for_allocation(&allocation_id_2), + 1 + ); + assert_eq!(tracker.get_ravable_total_fee(), 30); +} + +#[test] +fn check_get_count_updates_sum() { + let allocation_id_0 = address!("abababababababababababababababababababab"); + + const BUFFER_WINDOW: Duration = Duration::from_millis(20); + let mut tracker = SenderFeeTracker::new(BUFFER_WINDOW); + + tracker.add(allocation_id_0, 10, get_current_timestamp_u64_ns()); + let expiring_sum = tracker + .id_to_fee + .get_mut(&allocation_id_0) + .expect("there should be something here"); + assert_eq!(expiring_sum.buffer_info.get_sum(), 10); + assert_eq!(expiring_sum.buffer_info.get_count(), 1); + + sleep(BUFFER_WINDOW); + + assert_eq!(expiring_sum.buffer_info.get_sum(), 0); + assert_eq!(expiring_sum.buffer_info.get_count(), 0); + + tracker.add(allocation_id_0, 10, get_current_timestamp_u64_ns()); + let expiring_sum = tracker + .id_to_fee + .get_mut(&allocation_id_0) + .expect("there should be something here"); + + assert_eq!(expiring_sum.buffer_info.get_count(), 1); + assert_eq!(expiring_sum.buffer_info.get_sum(), 10); + + sleep(BUFFER_WINDOW); + + assert_eq!(expiring_sum.buffer_info.get_count(), 0); + assert_eq!(expiring_sum.buffer_info.get_sum(), 0); +}