Skip to content

Commit 35b9952

Browse files
committed
fix: calculate unaggregate receipts up to last_id
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent f30b210 commit 35b9952

File tree

1 file changed

+35
-16
lines changed

1 file changed

+35
-16
lines changed

tap-agent/src/agent/sender_allocation.rs

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use std::{
5+
i64,
56
sync::Arc,
67
time::{Duration, Instant},
78
};
@@ -154,7 +155,7 @@ impl Actor for SenderAllocation {
154155
}
155156

156157
// update unaggregated_fees
157-
state.unaggregated_fees = state.calculate_unaggregated_fee().await?;
158+
state.unaggregated_fees = state.initialize_unaggregated_receipts().await?;
158159

159160
sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees(
160161
allocation_id,
@@ -226,9 +227,17 @@ impl Actor for SenderAllocation {
226227
let NewReceiptNotification {
227228
id, value: fees, ..
228229
} = notification;
229-
if id > unaggregated_fees.last_id {
230-
unaggregated_fees.last_id = id;
231-
unaggregated_fees.value = unaggregated_fees
230+
if id <= unaggregated_fees.last_id {
231+
// our world assumption is wrong
232+
warn!(
233+
last_id = %id,
234+
"Received an receipt notification that was already calculated."
235+
);
236+
return Ok(());
237+
}
238+
unaggregated_fees.last_id = id;
239+
unaggregated_fees.value =
240+
unaggregated_fees
232241
.value
233242
.checked_add(fees)
234243
.unwrap_or_else(|| {
@@ -241,14 +250,13 @@ impl Actor for SenderAllocation {
241250
);
242251
u128::MAX
243252
});
244-
// it's fine to crash the actor, could not send a message to its parent
245-
state
246-
.sender_account_ref
247-
.cast(SenderAccountMessage::UpdateReceiptFees(
248-
state.allocation_id,
249-
ReceiptFees::NewReceipt(fees),
250-
))?;
251-
}
253+
// it's fine to crash the actor, could not send a message to its parent
254+
state
255+
.sender_account_ref
256+
.cast(SenderAccountMessage::UpdateReceiptFees(
257+
state.allocation_id,
258+
ReceiptFees::NewReceipt(fees),
259+
))?;
252260
}
253261
SenderAllocationMessage::TriggerRAVRequest => {
254262
let rav_result = if state.unaggregated_fees.value > 0 {
@@ -336,9 +344,18 @@ impl SenderAllocationState {
336344
})
337345
}
338346

347+
async fn initialize_unaggregated_receipts(&self) -> Result<UnaggregatedReceipts> {
348+
self.calculate_fee_until_last_id(i64::MAX).await
349+
}
350+
351+
async fn calculate_unaggregated_fee(&self) -> Result<UnaggregatedReceipts> {
352+
self.calculate_fee_until_last_id(self.unaggregated_fees.last_id as i64)
353+
.await
354+
}
355+
339356
/// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager
340357
/// with the latest unaggregated fees from the database.
341-
async fn calculate_unaggregated_fee(&self) -> Result<UnaggregatedReceipts> {
358+
async fn calculate_fee_until_last_id(&self, last_id: i64) -> Result<UnaggregatedReceipts> {
342359
tracing::trace!("calculate_unaggregated_fee()");
343360
self.tap_manager.remove_obsolete_receipts().await?;
344361

@@ -353,10 +370,12 @@ impl SenderAllocationState {
353370
scalar_tap_receipts
354371
WHERE
355372
allocation_id = $1
356-
AND signer_address IN (SELECT unnest($2::text[]))
357-
AND timestamp_ns > $3
373+
AND id <= $2
374+
AND signer_address IN (SELECT unnest($3::text[]))
375+
AND timestamp_ns > $4
358376
"#,
359377
self.allocation_id.encode_hex(),
378+
last_id,
360379
&signers,
361380
BigDecimal::from(
362381
self.latest_rav
@@ -599,7 +618,7 @@ impl SenderAllocationState {
599618
}
600619
Ok(response.data)
601620
}
602-
(Err(_), true, true) => Err(anyhow!(
621+
(Err(tap_core::Error::NoValidReceiptsForRAVRequest), true, true) => Err(anyhow!(
603622
"It looks like there are no valid receipts for the RAV request.\
604623
This may happen if your `rav_request_trigger_value` is too low \
605624
and no receipts were found outside the `rav_request_timestamp_buffer_ms`.\

0 commit comments

Comments
 (0)