diff --git a/.sqlx/query-7a7fb81674a67e9b9dbf82a318d60dd78565765831dc4dd84777dfb751736ce6.json b/.sqlx/query-e803de8899de4d54bb7c40be0459fa53d0cd2798d219947cc6fc8b5b98362794.json similarity index 70% rename from .sqlx/query-7a7fb81674a67e9b9dbf82a318d60dd78565765831dc4dd84777dfb751736ce6.json rename to .sqlx/query-e803de8899de4d54bb7c40be0459fa53d0cd2798d219947cc6fc8b5b98362794.json index e6bca75ed..b53d4397c 100644 --- a/.sqlx/query-7a7fb81674a67e9b9dbf82a318d60dd78565765831dc4dd84777dfb751736ce6.json +++ b/.sqlx/query-e803de8899de4d54bb7c40be0459fa53d0cd2798d219947cc6fc8b5b98362794.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n MAX(id),\n SUM(value)\n FROM\n scalar_tap_receipts\n WHERE\n allocation_id = $1\n AND signer_address IN (SELECT unnest($2::text[]))\n AND timestamp_ns > $3\n ", + "query": "\n SELECT\n MAX(id),\n SUM(value)\n FROM\n scalar_tap_receipts\n WHERE\n allocation_id = $1\n AND id <= $2\n AND signer_address IN (SELECT unnest($3::text[]))\n AND timestamp_ns > $4\n ", "describe": { "columns": [ { @@ -17,6 +17,7 @@ "parameters": { "Left": [ "Bpchar", + "Int8", "TextArray", "Numeric" ] @@ -26,5 +27,5 @@ null ] }, - "hash": "7a7fb81674a67e9b9dbf82a318d60dd78565765831dc4dd84777dfb751736ce6" + "hash": "e803de8899de4d54bb7c40be0459fa53d0cd2798d219947cc6fc8b5b98362794" } diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 34601f200..f33663290 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -154,7 +154,7 @@ impl Actor for SenderAllocation { } // update unaggregated_fees - state.unaggregated_fees = state.calculate_unaggregated_fee().await?; + state.unaggregated_fees = state.initialize_unaggregated_receipts().await?; sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees( allocation_id, @@ -226,9 +226,17 @@ impl Actor for SenderAllocation { let NewReceiptNotification { id, value: fees, .. } = notification; - if id > unaggregated_fees.last_id { - unaggregated_fees.last_id = id; - unaggregated_fees.value = unaggregated_fees + if id <= unaggregated_fees.last_id { + // our world assumption is wrong + warn!( + last_id = %id, + "Received a receipt notification that was already calculated." + ); + return Ok(()); + } + unaggregated_fees.last_id = id; + unaggregated_fees.value = + unaggregated_fees .value .checked_add(fees) .unwrap_or_else(|| { @@ -241,14 +249,13 @@ impl Actor for SenderAllocation { ); u128::MAX }); - // it's fine to crash the actor, could not send a message to its parent - state - .sender_account_ref - .cast(SenderAccountMessage::UpdateReceiptFees( - state.allocation_id, - ReceiptFees::NewReceipt(fees), - ))?; - } + // it's fine to crash the actor, could not send a message to its parent + state + .sender_account_ref + .cast(SenderAccountMessage::UpdateReceiptFees( + state.allocation_id, + ReceiptFees::NewReceipt(fees), + ))?; } SenderAllocationMessage::TriggerRAVRequest => { let rav_result = if state.unaggregated_fees.value > 0 { @@ -336,9 +343,18 @@ impl SenderAllocationState { }) } + async fn initialize_unaggregated_receipts(&self) -> Result { + self.calculate_fee_until_last_id(i64::MAX).await + } + + async fn calculate_unaggregated_fee(&self) -> Result { + self.calculate_fee_until_last_id(self.unaggregated_fees.last_id as i64) + .await + } + /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager /// with the latest unaggregated fees from the database. - async fn calculate_unaggregated_fee(&self) -> Result { + async fn calculate_fee_until_last_id(&self, last_id: i64) -> Result { tracing::trace!("calculate_unaggregated_fee()"); self.tap_manager.remove_obsolete_receipts().await?; @@ -353,10 +369,12 @@ impl SenderAllocationState { scalar_tap_receipts WHERE allocation_id = $1 - AND signer_address IN (SELECT unnest($2::text[])) - AND timestamp_ns > $3 + AND id <= $2 + AND signer_address IN (SELECT unnest($3::text[])) + AND timestamp_ns > $4 "#, self.allocation_id.encode_hex(), + last_id, &signers, BigDecimal::from( self.latest_rav @@ -599,7 +617,7 @@ impl SenderAllocationState { } Ok(response.data) } - (Err(_), true, true) => Err(anyhow!( + (Err(tap_core::Error::NoValidReceiptsForRAVRequest), true, true) => Err(anyhow!( "It looks like there are no valid receipts for the RAV request.\ This may happen if your `rav_request_trigger_value` is too low \ and no receipts were found outside the `rav_request_timestamp_buffer_ms`.\ @@ -1346,7 +1364,7 @@ pub mod tests { } // calculate unaggregated fee - let total_unaggregated_fees = state.calculate_unaggregated_fee().await.unwrap(); + let total_unaggregated_fees = state.initialize_unaggregated_receipts().await.unwrap(); // Check that the unaggregated fees are correct. assert_eq!(total_unaggregated_fees.value, 45u128); @@ -1401,7 +1419,7 @@ pub mod tests { .unwrap(); } - let total_unaggregated_fees = state.calculate_unaggregated_fee().await.unwrap(); + let total_unaggregated_fees = state.initialize_unaggregated_receipts().await.unwrap(); // Check that the unaggregated fees are correct. assert_eq!(total_unaggregated_fees.value, 35u128);