Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 63 additions & 33 deletions tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type Balance = U256;

#[derive(Debug, Eq, PartialEq)]
pub enum ReceiptFees {
NewValue(UnaggregatedReceipts),
NewReceipt(u128),
UpdateValue(UnaggregatedReceipts),
Retry,
}

Expand Down Expand Up @@ -198,8 +199,13 @@ impl State {
async fn rav_requester_single(&mut self) -> Result<()> {
let Some(allocation_id) = self.sender_fee_tracker.get_heaviest_allocation_id() else {
anyhow::bail!(
"Error while getting the heaviest allocation because \
no unblocked allocation has enough unaggregated fees tracked"
"Error while getting the heaviest allocation, \
this is due one of the following reasons: \n
1. allocations have too much fees under their buffer\n
2. allocations are blocked to be redeemed due to ongoing last rav. \n
If you keep seeing this message try to increase your `amount_willing_to_lose` \
and restart your `tap-agent`\n
If this doesn't work, open an issue on our Github."
);
};
let sender_allocation_id = self.format_sender_allocation(&allocation_id);
Expand Down Expand Up @@ -478,7 +484,9 @@ impl Actor for SenderAccount {
.set(config.tap.rav_request_trigger_value as f64);

let state = State {
sender_fee_tracker: SenderFeeTracker::default(),
sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis(
config.tap.rav_request_timestamp_buffer_ms,
)),
rav_tracker: SenderFeeTracker::default(),
invalid_receipts_tracker: SenderFeeTracker::default(),
allocation_ids: allocation_ids.clone(),
Expand Down Expand Up @@ -564,14 +572,30 @@ impl Actor for SenderAccount {
scheduled_rav_request.abort();
}

if let ReceiptFees::NewValue(unaggregated_fees) = receipt_fees {
state
.sender_fee_tracker
.update(allocation_id, unaggregated_fees.value);
match receipt_fees {
ReceiptFees::NewReceipt(value) => {
state.sender_fee_tracker.add(allocation_id, value);

UNAGGREGATED_FEES
.with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()])
.set(unaggregated_fees.value as f64);
UNAGGREGATED_FEES
.with_label_values(&[
&state.sender.to_string(),
&allocation_id.to_string(),
])
.add(value as f64);
}
ReceiptFees::UpdateValue(unaggregated_fees) => {
state
.sender_fee_tracker
.update(allocation_id, unaggregated_fees.value);

UNAGGREGATED_FEES
.with_label_values(&[
&state.sender.to_string(),
&allocation_id.to_string(),
])
.set(unaggregated_fees.value as f64);
}
ReceiptFees::Retry => {}
}

// Eagerly deny the sender (if needed), before the RAV request. To be sure not to
Expand All @@ -582,7 +606,7 @@ impl Actor for SenderAccount {
state.add_to_denylist().await;
}

if state.sender_fee_tracker.get_total_fee()
if state.sender_fee_tracker.get_total_fee_outside_buffer()
>= state.config.tap.rav_request_trigger_value
{
tracing::debug!(
Expand Down Expand Up @@ -769,7 +793,7 @@ impl Actor for SenderAccount {
// update the receipt fees by reseting to 0
myself.cast(SenderAccountMessage::UpdateReceiptFees(
allocation_id,
ReceiptFees::NewValue(UnaggregatedReceipts::default()),
ReceiptFees::UpdateValue(UnaggregatedReceipts::default()),
))?;

// rav tracker is not updated because it's still not redeemed
Expand Down Expand Up @@ -884,6 +908,7 @@ pub mod tests {
const DUMMY_URL: &str = "http://localhost:1234";
const TRIGGER_VALUE: u128 = 500;
const ESCROW_VALUE: u128 = 1000;
const BUFFER_MS: u64 = 100;

async fn create_sender_account(
pgpool: PgPool,
Expand All @@ -904,7 +929,7 @@ pub mod tests {
},
tap: config::Tap {
rav_request_trigger_value,
rav_request_timestamp_buffer_ms: 1,
rav_request_timestamp_buffer_ms: BUFFER_MS,
rav_request_timeout_secs: 5,
max_unnaggregated_fees_per_sender,
..Default::default()
Expand Down Expand Up @@ -1191,14 +1216,11 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE - 1,
last_id: 10,
}),
ReceiptFees::NewReceipt(TRIGGER_VALUE - 1),
))
.unwrap();

tokio::time::sleep(Duration::from_millis(10)).await;
tokio::time::sleep(Duration::from_millis(BUFFER_MS)).await;

assert_eq!(
triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst),
Expand Down Expand Up @@ -1230,10 +1252,24 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE,
last_id: 10,
}),
ReceiptFees::NewReceipt(TRIGGER_VALUE),
))
.unwrap();

tokio::time::sleep(Duration::from_millis(20)).await;

assert_eq!(
triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst),
0
);

// wait for it to be outside buffer
tokio::time::sleep(Duration::from_millis(BUFFER_MS)).await;

sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::Retry,
))
.unwrap();

Expand Down Expand Up @@ -1342,10 +1378,7 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE,
last_id: 11,
}),
ReceiptFees::NewReceipt(TRIGGER_VALUE),
))
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
Expand Down Expand Up @@ -1388,7 +1421,7 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
ReceiptFees::UpdateValue(UnaggregatedReceipts {
value: $value,
last_id: 11,
}),
Expand Down Expand Up @@ -1529,7 +1562,7 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
ReceiptFees::UpdateValue(UnaggregatedReceipts {
value: $value,
last_id: 11,
}),
Expand Down Expand Up @@ -1730,10 +1763,7 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE,
last_id: 11,
}),
ReceiptFees::NewReceipt(TRIGGER_VALUE),
))
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
Expand Down
2 changes: 1 addition & 1 deletion tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ lazy_static! {
.unwrap();
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, PartialEq, Eq)]
pub struct NewReceiptNotification {
pub id: u64,
pub allocation_id: Address,
Expand Down
20 changes: 9 additions & 11 deletions tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl Actor for SenderAllocation {

sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees(
allocation_id,
ReceiptFees::NewValue(state.unaggregated_fees.clone()),
ReceiptFees::UpdateValue(state.unaggregated_fees.clone()),
))?;

// update rav tracker for sender account
Expand Down Expand Up @@ -225,9 +225,10 @@ impl Actor for SenderAllocation {
);
let unaggregated_fees = &mut state.unaggregated_fees;
match message {
SenderAllocationMessage::NewReceipt(NewReceiptNotification {
id, value: fees, ..
}) => {
SenderAllocationMessage::NewReceipt(notification) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update mostly solves the problem but it's not 100% accurate. We have a system where we trigger a complete re-calculation of unnagregated receipts. When this happens, newer receipts that are in the queue would already be on the counter and that's why we check if id > unaggregated_fees.last_id so we don't have a double counter on that.

The problem is that current system for buffer needs them to be called via add instead of update to be added to the buffer counter. That means that information about what is in buffer is lost under higher loads.

For this to be updated accordingly, tracker would need to have an internal last_id meaning that the value is added to the buffer, but not to the total tracker.

This is not a big issue since the update is usually called in rav requests which reduces total amount, but this issue must be tracked in case problems happen.

let NewReceiptNotification {
id, value: fees, ..
} = notification;
if id > unaggregated_fees.last_id {
unaggregated_fees.last_id = id;
unaggregated_fees.value = unaggregated_fees
Expand All @@ -248,7 +249,7 @@ impl Actor for SenderAllocation {
.sender_account_ref
.cast(SenderAccountMessage::UpdateReceiptFees(
state.allocation_id,
ReceiptFees::NewValue(unaggregated_fees.clone()),
ReceiptFees::NewReceipt(fees),
))?;
}
}
Expand Down Expand Up @@ -974,7 +975,7 @@ pub mod tests {
// Should emit a message to the sender account with the unaggregated fees.
let expected_message = SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
ReceiptFees::UpdateValue(UnaggregatedReceipts {
last_id: 10,
value: 55u128,
}),
Expand Down Expand Up @@ -1073,10 +1074,7 @@ pub mod tests {
// should emit update aggregate fees message to sender account
let expected_message = SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
last_id: 1,
value: 20,
}),
ReceiptFees::NewReceipt(20u128),
);
let last_message_emitted = last_message_emitted.lock().unwrap();
assert_eq!(last_message_emitted.len(), 2);
Expand Down Expand Up @@ -1191,7 +1189,7 @@ pub mod tests {
last_message_emitted.lock().unwrap().last(),
Some(&SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts::default())
ReceiptFees::UpdateValue(UnaggregatedReceipts::default())
))
);
}
Expand Down
Loading
Loading