Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 36 additions & 18 deletions tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl Actor for SenderAllocation {
}

// update unaggregated_fees
state.unaggregated_fees = state.calculate_unaggregated_fee().await?;
state.unaggregated_fees = state.initialize_unaggregated_receipts().await?;

sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees(
allocation_id,
Expand Down Expand Up @@ -226,9 +226,17 @@ impl Actor for SenderAllocation {
let NewReceiptNotification {
id, value: fees, ..
} = notification;
if id > unaggregated_fees.last_id {
unaggregated_fees.last_id = id;
unaggregated_fees.value = unaggregated_fees
if id <= unaggregated_fees.last_id {
// our world assumption is wrong
warn!(
last_id = %id,
"Received a receipt notification that was already calculated."
);
return Ok(());
}
unaggregated_fees.last_id = id;
unaggregated_fees.value =
unaggregated_fees
.value
.checked_add(fees)
.unwrap_or_else(|| {
Expand All @@ -241,14 +249,13 @@ impl Actor for SenderAllocation {
);
u128::MAX
});
// it's fine to crash the actor, could not send a message to its parent
state
.sender_account_ref
.cast(SenderAccountMessage::UpdateReceiptFees(
state.allocation_id,
ReceiptFees::NewReceipt(fees),
))?;
}
// it's fine to crash the actor, could not send a message to its parent
state
.sender_account_ref
.cast(SenderAccountMessage::UpdateReceiptFees(
state.allocation_id,
ReceiptFees::NewReceipt(fees),
))?;
}
SenderAllocationMessage::TriggerRAVRequest => {
let rav_result = if state.unaggregated_fees.value > 0 {
Expand Down Expand Up @@ -336,9 +343,18 @@ impl SenderAllocationState {
})
}

async fn initialize_unaggregated_receipts(&self) -> Result<UnaggregatedReceipts> {
self.calculate_fee_until_last_id(i64::MAX).await
}

async fn calculate_unaggregated_fee(&self) -> Result<UnaggregatedReceipts> {
self.calculate_fee_until_last_id(self.unaggregated_fees.last_id as i64)
.await
}

/// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager
/// with the latest unaggregated fees from the database.
async fn calculate_unaggregated_fee(&self) -> Result<UnaggregatedReceipts> {
async fn calculate_fee_until_last_id(&self, last_id: i64) -> Result<UnaggregatedReceipts> {
tracing::trace!("calculate_unaggregated_fee()");
self.tap_manager.remove_obsolete_receipts().await?;

Expand All @@ -353,10 +369,12 @@ impl SenderAllocationState {
scalar_tap_receipts
WHERE
allocation_id = $1
AND signer_address IN (SELECT unnest($2::text[]))
AND timestamp_ns > $3
AND id <= $2
AND signer_address IN (SELECT unnest($3::text[]))
AND timestamp_ns > $4
"#,
self.allocation_id.encode_hex(),
last_id,
&signers,
BigDecimal::from(
self.latest_rav
Expand Down Expand Up @@ -599,7 +617,7 @@ impl SenderAllocationState {
}
Ok(response.data)
}
(Err(_), true, true) => Err(anyhow!(
(Err(tap_core::Error::NoValidReceiptsForRAVRequest), true, true) => Err(anyhow!(
"It looks like there are no valid receipts for the RAV request.\
This may happen if your `rav_request_trigger_value` is too low \
and no receipts were found outside the `rav_request_timestamp_buffer_ms`.\
Expand Down Expand Up @@ -1346,7 +1364,7 @@ pub mod tests {
}

// calculate unaggregated fee
let total_unaggregated_fees = state.calculate_unaggregated_fee().await.unwrap();
let total_unaggregated_fees = state.initialize_unaggregated_receipts().await.unwrap();

// Check that the unaggregated fees are correct.
assert_eq!(total_unaggregated_fees.value, 45u128);
Expand Down Expand Up @@ -1401,7 +1419,7 @@ pub mod tests {
.unwrap();
}

let total_unaggregated_fees = state.calculate_unaggregated_fee().await.unwrap();
let total_unaggregated_fees = state.initialize_unaggregated_receipts().await.unwrap();

// Check that the unaggregated fees are correct.
assert_eq!(total_unaggregated_fees.value, 35u128);
Expand Down
Loading