diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 390af48df..f030cedd0 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -217,9 +217,24 @@ impl State { ); }; // we call and wait for the response so we don't process anymore update - let Ok((fees, rav)) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest) else { + let Ok(rav_result) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest) else { anyhow::bail!("Error while sending and waiting message for actor {allocation_id}"); }; + let (fees, rav) = match rav_result { + Ok(ok_value) => { + self.rav_tracker.ok_rav_request(allocation_id); + ok_value + } + Err(err) => { + self.rav_tracker.failed_rav_backoff(allocation_id); + anyhow::bail!( + "Error while requesting RAV for sender {} and allocation {}: {}", + self.sender, + allocation_id, + err + ); + } + }; let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate); // update rav tracker @@ -1082,7 +1097,6 @@ pub mod tests { next_unaggregated_fees_value: Arc>, receipts: Arc>>, } - impl MockSenderAllocation { pub fn new_with_triggered_rav_request() -> (Self, Arc, Arc>) { let triggered_rav_request = Arc::new(AtomicU32::new(0)); @@ -1169,13 +1183,13 @@ pub mod tests { 4, *self.next_rav_value.lock().unwrap(), ); - reply.send(( + reply.send(Ok(( UnaggregatedReceipts { value: *self.next_unaggregated_fees_value.lock().unwrap(), last_id: 0, }, Some(signed_rav), - ))?; + )))?; } SenderAllocationMessage::NewReceipt(receipt) => { self.receipts.lock().unwrap().push(receipt); diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index f661608a4..c1df14060 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -105,9 +105,6 @@ pub struct SenderAllocationState { domain_separator: Eip712Domain, sender_account_ref: ActorRef, - failed_ravs_count: u32, - failed_rav_backoff: Instant, - http_client: jsonrpsee::http_client::HttpClient, } @@ -127,7 +124,7 @@ pub struct SenderAllocationArgs { #[derive(Debug)] pub enum SenderAllocationMessage { NewReceipt(NewReceiptNotification), - TriggerRAVRequest(RpcReplyPort<(UnaggregatedReceipts, Option)>), + TriggerRAVRequest(RpcReplyPort)>>), #[cfg(test)] GetUnaggregatedReceipts(RpcReplyPort), } @@ -255,21 +252,17 @@ impl Actor for SenderAllocation { } // we use a blocking call here to ensure that only one RAV request is running at a time. SenderAllocationMessage::TriggerRAVRequest(reply) => { - if state.unaggregated_fees.value > 0 { - // auto backoff retry, on error ignore - if Instant::now() > state.failed_rav_backoff { - if let Err(err) = state.request_rav().await { - error!(error = %err, "Error while requesting rav."); - } - } else { - error!( - "Can't trigger rav request until {:?} (backoff)", - state.failed_rav_backoff - ); - } - } + let rav_result = if state.unaggregated_fees.value > 0 { + state + .request_rav() + .await + .map(|_| (state.unaggregated_fees.clone(), state.latest_rav.clone())) + } else { + Err(anyhow!("Unaggregated fee equals zero")) + }; + if !reply.is_closed() { - let _ = reply.send((state.unaggregated_fees.clone(), state.latest_rav.clone())); + let _ = reply.send(rav_result); } } #[cfg(test)] @@ -340,8 +333,6 @@ impl SenderAllocationState { sender_account_ref: sender_account_ref.clone(), unaggregated_fees: UnaggregatedReceipts::default(), invalid_receipts_fees: UnaggregatedReceipts::default(), - failed_rav_backoff: Instant::now(), - failed_ravs_count: 0, latest_rav, http_client, }) @@ -454,25 +445,15 @@ impl SenderAllocationState { RAVS_CREATED .with_label_values(&[&self.sender.to_string(), &self.allocation_id.to_string()]) .inc(); - self.failed_ravs_count = 0; Ok(()) } Err(e) => { - error!( - "Error while requesting RAV for sender {} and allocation {}: {}", - self.sender, self.allocation_id, e - ); if let RavError::AllReceiptsInvalid = e { self.unaggregated_fees = self.calculate_unaggregated_fee().await?; } RAVS_FAILED .with_label_values(&[&self.sender.to_string(), &self.allocation_id.to_string()]) .inc(); - // backoff = max(100ms * 2 ^ retries, 60s) - self.failed_rav_backoff = Instant::now() - + (Duration::from_millis(100) * 2u32.pow(self.failed_ravs_count)) - .max(Duration::from_secs(60)); - self.failed_ravs_count += 1; Err(e.into()) } } @@ -1181,6 +1162,7 @@ pub mod tests { sender_allocation, SenderAllocationMessage::TriggerRAVRequest ) + .unwrap() .unwrap(); // Check that the unaggregated fees are correct. @@ -1494,19 +1476,20 @@ pub mod tests { let sender_allocation = create_sender_allocation(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None).await; - // Trigger a RAV request manually and wait for updated fees. - // this should fail because there's no receipt with valid timestamp - let (total_unaggregated_fees, _rav) = call!( + let rav_response = call!( sender_allocation, SenderAllocationMessage::TriggerRAVRequest ) .unwrap(); + // If it is an error then rav request failed + assert!(rav_response.is_err()); // expect the actor to keep running assert_eq!(sender_allocation.get_status(), ActorStatus::Running); // Check that the unaggregated fees return the same value - assert_eq!(total_unaggregated_fees.value, 45u128); + // TODO: Maybe this can no longer be checked? + //assert_eq!(total_unaggregated_fees.value, 45u128); } #[sqlx::test(migrations = "../migrations")] @@ -1567,12 +1550,13 @@ pub mod tests { .await; // Trigger a RAV request manually and wait for updated fees. // this should fail because there's no receipt with valid timestamp - let (total_unaggregated_fees, _rav) = call!( + let rav_response = call!( sender_allocation, SenderAllocationMessage::TriggerRAVRequest ) .unwrap(); - assert_eq!(total_unaggregated_fees.value, 0); + // If it is an error then rav request failed + assert!(rav_response.is_err()); let invalid_receipts = sqlx::query!( r#" diff --git a/tap-agent/src/agent/sender_fee_tracker.rs b/tap-agent/src/agent/sender_fee_tracker.rs index 721404b28..1a57b38a9 100644 --- a/tap-agent/src/agent/sender_fee_tracker.rs +++ b/tap-agent/src/agent/sender_fee_tracker.rs @@ -40,6 +40,22 @@ pub struct SenderFeeTracker { // heaviest allocation, because they are already marked for finalization, // and thus requesting RAVs on their own in their `post_stop` routine. blocked_addresses: HashSet
, + failed_ravs: HashMap, +} + +#[derive(Debug, Clone)] +pub struct FailedRavInfo { + failed_ravs_count: u32, + failed_rav_backoff_time: Instant, +} + +impl Default for FailedRavInfo { + fn default() -> Self { + Self { + failed_ravs_count: 0, + failed_rav_backoff_time: Instant::now(), + } + } } impl SenderFeeTracker { @@ -100,9 +116,16 @@ impl SenderFeeTracker { pub fn get_heaviest_allocation_id(&mut self) -> Option
{ // just loop over and get the biggest fee + let now = Instant::now(); self.id_to_fee .iter() .filter(|(addr, _)| !self.blocked_addresses.contains(*addr)) + .filter(|(addr, _)| { + self.failed_ravs + .get(*addr) + .map(|failed_rav| now > failed_rav.failed_rav_backoff_time) + .unwrap_or(true) + }) // map to the value minus fees in buffer .map(|(addr, fee)| { ( @@ -148,6 +171,17 @@ impl SenderFeeTracker { acc + expiring.get_sum(&self.buffer_window_duration) }) } + pub fn failed_rav_backoff(&mut self, allocation_id: Address) { + // backoff = max(100ms * 2 ^ retries, 60s) + let failed_rav = self.failed_ravs.entry(allocation_id).or_default(); + failed_rav.failed_rav_backoff_time = Instant::now() + + (Duration::from_millis(100) * 2u32.pow(failed_rav.failed_ravs_count)) + .min(Duration::from_secs(60)); + failed_rav.failed_ravs_count += 1; + } + pub fn ok_rav_request(&mut self, allocation_id: Address) { + self.failed_ravs.remove(&allocation_id); + } } #[cfg(test)] @@ -288,4 +322,35 @@ mod tests { assert_eq!(tracker.get_total_fee_outside_buffer(), 0); assert_eq!(tracker.get_total_fee(), 0); } + + #[test] + fn test_filtered_backed_off_allocations() { + let allocation_id_0 = address!("abababababababababababababababababababab"); + let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); + const BACK_SLEEP_DURATION: Duration = Duration::from_millis(201); + + let mut tracker = SenderFeeTracker::default(); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_total_fee(), 0); + + tracker.update(allocation_id_0, 10); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_total_fee(), 10); + + tracker.update(allocation_id_1, 20); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_total_fee(), 30); + + // Simulate failed rav and backoff + tracker.failed_rav_backoff(allocation_id_1); + + // Heaviest should be the first since its not blocked nor failed + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_total_fee(), 30); + + sleep(BACK_SLEEP_DURATION); + + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_total_fee(), 30); + } }