Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion tap-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use sender_accounts_manager::SenderAccountsManager;
pub mod sender_account;
pub mod sender_accounts_manager;
pub mod sender_allocation;
pub mod sender_fee_tracker;
pub mod unaggregated_receipts;

pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandle<()>) {
Expand Down
184 changes: 87 additions & 97 deletions tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use tracing::{error, Level};

use super::sender_allocation::{SenderAllocation, SenderAllocationArgs};
use crate::agent::sender_allocation::SenderAllocationMessage;
use crate::agent::sender_fee_tracker::SenderFeeTracker;
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
use crate::tracker::{SenderFeeTracker, SimpleFeeTracker};
use crate::{
config::{self},
tap::escrow_adapter::EscrowAdapter,
Expand Down Expand Up @@ -81,7 +81,7 @@ type Balance = U256;

#[derive(Debug)]
pub enum ReceiptFees {
NewReceipt(u128),
NewReceipt(u128, u64),
UpdateValue(UnaggregatedReceipts),
RavRequestResponse(anyhow::Result<(UnaggregatedReceipts, Option<SignedRAV>)>),
Retry,
Expand Down Expand Up @@ -131,8 +131,8 @@ pub struct SenderAccountArgs {
pub struct State {
prefix: Option<String>,
sender_fee_tracker: SenderFeeTracker,
rav_tracker: SenderFeeTracker,
invalid_receipts_tracker: SenderFeeTracker,
rav_tracker: SimpleFeeTracker,
invalid_receipts_tracker: SimpleFeeTracker,
allocation_ids: HashSet<Address>,
_indexer_allocations_handle: PipeHandle,
_escrow_account_monitor: PipeHandle,
Expand Down Expand Up @@ -237,6 +237,53 @@ impl State {
Ok(())
}

fn finalize_rav_request(
&mut self,
allocation_id: Address,
rav_result: anyhow::Result<(UnaggregatedReceipts, Option<SignedRAV>)>,
) {
self.sender_fee_tracker.finish_rav_request(allocation_id);
match rav_result {
Ok((fees, rav)) => {
self.sender_fee_tracker.ok_rav_request(allocation_id);

let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
self.update_rav(allocation_id, rav_value);

// update sender fee tracker
self.update_sender_fee(allocation_id, fees);
}
Err(err) => {
// TODO we should update the total value too
self.sender_fee_tracker.failed_rav_backoff(allocation_id);
error!(
"Error while requesting RAV for sender {} and allocation {}: {}",
self.sender, allocation_id, err
);
}
};
}

fn update_rav(&mut self, allocation_id: Address, rav_value: u128) {
self.rav_tracker.update(allocation_id, rav_value);
PENDING_RAV
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
.set(rav_value as f64);
}

fn update_sender_fee(
&mut self,
allocation_id: Address,
unaggregated_fees: UnaggregatedReceipts,
) {
self.sender_fee_tracker
.update(allocation_id, unaggregated_fees);

UNAGGREGATED_FEES
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
.set(unaggregated_fees.value as f64);
}

fn deny_condition_reached(&self) -> bool {
let pending_ravs = self.rav_tracker.get_total_fee();
let unaggregated_fees = self.sender_fee_tracker.get_total_fee();
Expand Down Expand Up @@ -469,8 +516,8 @@ impl Actor for SenderAccount {
sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis(
config.tap.rav_request_timestamp_buffer_ms,
)),
rav_tracker: SenderFeeTracker::default(),
invalid_receipts_tracker: SenderFeeTracker::default(),
rav_tracker: SimpleFeeTracker::default(),
invalid_receipts_tracker: SimpleFeeTracker::default(),
allocation_ids: allocation_ids.clone(),
_indexer_allocations_handle,
_escrow_account_monitor,
Expand Down Expand Up @@ -518,16 +565,7 @@ impl Actor for SenderAccount {

match message {
SenderAccountMessage::UpdateRav(rav) => {
state
.rav_tracker
.update(rav.message.allocationId, rav.message.valueAggregate, 0);

PENDING_RAV
.with_label_values(&[
&state.sender.to_string(),
&rav.message.allocationId.to_string(),
])
.set(rav.message.valueAggregate as f64);
state.update_rav(rav.message.allocationId, rav.message.valueAggregate);

let should_deny = !state.denied && state.deny_condition_reached();
if should_deny {
Expand All @@ -541,7 +579,7 @@ impl Actor for SenderAccount {

state
.invalid_receipts_tracker
.update(allocation_id, unaggregated_fees.value, 0);
.update(allocation_id, unaggregated_fees.value);

// invalid receipts can't go down
let should_deny = !state.denied && state.deny_condition_reached();
Expand All @@ -556,7 +594,7 @@ impl Actor for SenderAccount {
}

match receipt_fees {
ReceiptFees::NewReceipt(value) => {
ReceiptFees::NewReceipt(value, timestamp_ns) => {
// If state is denied and received new receipt, sender was removed manually from DB
if state.denied {
tracing::warn!(
Expand All @@ -569,8 +607,11 @@ impl Actor for SenderAccount {
);
SenderAccount::deny_sender(&state.pgpool, state.sender).await;
}
state.sender_fee_tracker.add(allocation_id, value);

// add new value
state
.sender_fee_tracker
.add(allocation_id, value, timestamp_ns);
UNAGGREGATED_FEES
.with_label_values(&[
&state.sender.to_string(),
Expand All @@ -579,58 +620,10 @@ impl Actor for SenderAccount {
.add(value as f64);
}
ReceiptFees::RavRequestResponse(rav_result) => {
state.sender_fee_tracker.finish_rav_request(allocation_id);
match rav_result {
Ok((fees, rav)) => {
state.rav_tracker.ok_rav_request(allocation_id);

let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
// update rav tracker
state.rav_tracker.update(allocation_id, rav_value, 0);
PENDING_RAV
.with_label_values(&[
&state.sender.to_string(),
&allocation_id.to_string(),
])
.set(rav_value as f64);

// update sender fee tracker
state.sender_fee_tracker.update(
allocation_id,
fees.value,
fees.counter,
);
UNAGGREGATED_FEES
.with_label_values(&[
&state.sender.to_string(),
&allocation_id.to_string(),
])
.set(fees.value as f64);
}
Err(err) => {
state.rav_tracker.failed_rav_backoff(allocation_id);
error!(
"Error while requesting RAV for sender {} and allocation {}: {}",
state.sender,
allocation_id,
err
);
}
};
state.finalize_rav_request(allocation_id, rav_result);
}
ReceiptFees::UpdateValue(unaggregated_fees) => {
state.sender_fee_tracker.update(
allocation_id,
unaggregated_fees.value,
unaggregated_fees.counter,
);

UNAGGREGATED_FEES
.with_label_values(&[
&state.sender.to_string(),
&allocation_id.to_string(),
])
.set(unaggregated_fees.value as f64);
state.update_sender_fee(allocation_id, unaggregated_fees);
}
ReceiptFees::Retry => {}
}
Expand All @@ -644,14 +637,12 @@ impl Actor for SenderAccount {
}
let total_counter_for_allocation = state
.sender_fee_tracker
.get_total_counter_outside_buffer_for_allocation(&allocation_id);
.get_count_outside_buffer_for_allocation(&allocation_id);
let can_trigger_rav = state.sender_fee_tracker.can_trigger_rav(allocation_id);
let counter_greater_receipt_limit = total_counter_for_allocation
>= state.config.tap.rav_request_receipt_limit
&& !state
.sender_fee_tracker
.check_allocation_has_rav_request_running(allocation_id);
let total_fee_outside_buffer =
state.sender_fee_tracker.get_total_fee_outside_buffer();
&& can_trigger_rav;
let total_fee_outside_buffer = state.sender_fee_tracker.get_ravable_total_fee();
let total_fee_greater_trigger_value =
total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value;
let rav_result = match (
Expand Down Expand Up @@ -773,7 +764,7 @@ impl Actor for SenderAccount {
for allocation_id in tracked_allocation_ids.difference(&active_allocation_ids) {
// if it's being tracked and we didn't receive any update from the non_final_last_ravs
// remove from the tracker
state.rav_tracker.update(*allocation_id, 0, 0);
state.rav_tracker.remove(*allocation_id);

let _ = PENDING_RAV.remove_label_values(&[
&state.sender.to_string(),
Expand All @@ -782,10 +773,7 @@ impl Actor for SenderAccount {
}

for (allocation_id, value) in non_final_last_ravs {
state.rav_tracker.update(allocation_id, value, 0);
PENDING_RAV
.with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()])
.set(value as f64);
state.update_rav(allocation_id, value);
}
// now that balance and rav tracker is updated, check
match (state.denied, state.deny_condition_reached()) {
Expand Down Expand Up @@ -849,15 +837,8 @@ impl Actor for SenderAccount {
return Ok(());
};

// clean up hashset
state
.sender_fee_tracker
.unblock_allocation_id(allocation_id);
// update the receipt fees by reseting to 0
myself.cast(SenderAccountMessage::UpdateReceiptFees(
allocation_id,
ReceiptFees::UpdateValue(UnaggregatedReceipts::default()),
))?;
// remove from sender_fee_tracker
state.sender_fee_tracker.remove(allocation_id);

// rav tracker is not updated because it's still not redeemed
}
Expand Down Expand Up @@ -937,7 +918,7 @@ pub mod tests {
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicU32;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use wiremock::matchers::{body_string_contains, method};
use wiremock::{Mock, MockServer, ResponseTemplate};

Expand All @@ -951,7 +932,9 @@ pub mod tests {
(Self::UpdateReceiptFees(l0, l1), Self::UpdateReceiptFees(r0, r1)) => {
l0 == r0
&& match (l1, r1) {
(ReceiptFees::NewReceipt(l), ReceiptFees::NewReceipt(r)) => r == l,
(ReceiptFees::NewReceipt(l1, l2), ReceiptFees::NewReceipt(r1, r2)) => {
r1 == l1 && r2 == l2
}
(ReceiptFees::UpdateValue(l), ReceiptFees::UpdateValue(r)) => r == l,
(
ReceiptFees::RavRequestResponse(l),
Expand Down Expand Up @@ -1298,6 +1281,13 @@ pub mod tests {
)
}

fn get_current_timestamp_u64_ns() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64
}

#[sqlx::test(migrations = "../migrations")]
async fn test_update_receipt_fees_no_rav(pgpool: PgPool) {
let (sender_account, handle, prefix, _) = create_sender_account(
Expand All @@ -1323,7 +1313,7 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewReceipt(TRIGGER_VALUE - 1),
ReceiptFees::NewReceipt(TRIGGER_VALUE - 1, get_current_timestamp_u64_ns()),
))
.unwrap();

Expand Down Expand Up @@ -1366,7 +1356,7 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewReceipt(TRIGGER_VALUE),
ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()),
))
.unwrap();

Expand Down Expand Up @@ -1426,7 +1416,7 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewReceipt(1),
ReceiptFees::NewReceipt(1, get_current_timestamp_u64_ns()),
))
.unwrap();

Expand All @@ -1439,7 +1429,7 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewReceipt(1),
ReceiptFees::NewReceipt(1, get_current_timestamp_u64_ns()),
))
.unwrap();

Expand Down Expand Up @@ -1567,7 +1557,7 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewReceipt(TRIGGER_VALUE),
ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()),
))
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
Expand Down Expand Up @@ -1961,7 +1951,7 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewReceipt(TRIGGER_VALUE),
ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()),
))
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
Expand Down
Loading
Loading