Skip to content
22 changes: 18 additions & 4 deletions tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,24 @@ impl State {
);
};
// we call and wait for the response so we don't process anymore update
let Ok((fees, rav)) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest) else {
let Ok(rav_result) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest) else {
anyhow::bail!("Error while sending and waiting message for actor {allocation_id}");
};
let (fees, rav) = match rav_result {
Ok(ok_value) => {
self.rav_tracker.ok_rav_request(allocation_id);
ok_value
}
Err(err) => {
self.rav_tracker.failed_rav_backoff(allocation_id);
anyhow::bail!(
"Error while requesting RAV for sender {} and allocation {}: {}",
self.sender,
allocation_id,
err
);
}
};

let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
// update rav tracker
Expand Down Expand Up @@ -1082,7 +1097,6 @@ pub mod tests {
next_unaggregated_fees_value: Arc<Mutex<u128>>,
receipts: Arc<Mutex<Vec<NewReceiptNotification>>>,
}

impl MockSenderAllocation {
pub fn new_with_triggered_rav_request() -> (Self, Arc<AtomicU32>, Arc<Mutex<u128>>) {
let triggered_rav_request = Arc::new(AtomicU32::new(0));
Expand Down Expand Up @@ -1169,13 +1183,13 @@ pub mod tests {
4,
*self.next_rav_value.lock().unwrap(),
);
reply.send((
reply.send(Ok((
UnaggregatedReceipts {
value: *self.next_unaggregated_fees_value.lock().unwrap(),
last_id: 0,
},
Some(signed_rav),
))?;
)))?;
}
SenderAllocationMessage::NewReceipt(receipt) => {
self.receipts.lock().unwrap().push(receipt);
Expand Down
56 changes: 20 additions & 36 deletions tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ pub struct SenderAllocationState {
domain_separator: Eip712Domain,
sender_account_ref: ActorRef<SenderAccountMessage>,

failed_ravs_count: u32,
failed_rav_backoff: Instant,

http_client: jsonrpsee::http_client::HttpClient,
}

Expand All @@ -127,7 +124,7 @@ pub struct SenderAllocationArgs {
#[derive(Debug)]
pub enum SenderAllocationMessage {
NewReceipt(NewReceiptNotification),
TriggerRAVRequest(RpcReplyPort<(UnaggregatedReceipts, Option<SignedRAV>)>),
TriggerRAVRequest(RpcReplyPort<anyhow::Result<(UnaggregatedReceipts, Option<SignedRAV>)>>),
#[cfg(test)]
GetUnaggregatedReceipts(RpcReplyPort<UnaggregatedReceipts>),
}
Expand Down Expand Up @@ -255,21 +252,17 @@ impl Actor for SenderAllocation {
}
// we use a blocking call here to ensure that only one RAV request is running at a time.
SenderAllocationMessage::TriggerRAVRequest(reply) => {
if state.unaggregated_fees.value > 0 {
// auto backoff retry, on error ignore
if Instant::now() > state.failed_rav_backoff {
if let Err(err) = state.request_rav().await {
error!(error = %err, "Error while requesting rav.");
}
} else {
error!(
"Can't trigger rav request until {:?} (backoff)",
state.failed_rav_backoff
);
}
}
let rav_result = if state.unaggregated_fees.value > 0 {
state
.request_rav()
.await
.map(|_| (state.unaggregated_fees.clone(), state.latest_rav.clone()))
} else {
Err(anyhow!("Unaggregated fee equals zero"))
};

if !reply.is_closed() {
let _ = reply.send((state.unaggregated_fees.clone(), state.latest_rav.clone()));
let _ = reply.send(rav_result);
}
}
#[cfg(test)]
Expand Down Expand Up @@ -340,8 +333,6 @@ impl SenderAllocationState {
sender_account_ref: sender_account_ref.clone(),
unaggregated_fees: UnaggregatedReceipts::default(),
invalid_receipts_fees: UnaggregatedReceipts::default(),
failed_rav_backoff: Instant::now(),
failed_ravs_count: 0,
latest_rav,
http_client,
})
Expand Down Expand Up @@ -454,25 +445,15 @@ impl SenderAllocationState {
RAVS_CREATED
.with_label_values(&[&self.sender.to_string(), &self.allocation_id.to_string()])
.inc();
self.failed_ravs_count = 0;
Ok(())
}
Err(e) => {
error!(
"Error while requesting RAV for sender {} and allocation {}: {}",
self.sender, self.allocation_id, e
);
if let RavError::AllReceiptsInvalid = e {
self.unaggregated_fees = self.calculate_unaggregated_fee().await?;
}
RAVS_FAILED
.with_label_values(&[&self.sender.to_string(), &self.allocation_id.to_string()])
.inc();
// backoff = max(100ms * 2 ^ retries, 60s)
self.failed_rav_backoff = Instant::now()
+ (Duration::from_millis(100) * 2u32.pow(self.failed_ravs_count))
.max(Duration::from_secs(60));
self.failed_ravs_count += 1;
Err(e.into())
}
}
Expand Down Expand Up @@ -1181,6 +1162,7 @@ pub mod tests {
sender_allocation,
SenderAllocationMessage::TriggerRAVRequest
)
.unwrap()
.unwrap();

// Check that the unaggregated fees are correct.
Expand Down Expand Up @@ -1494,19 +1476,20 @@ pub mod tests {
let sender_allocation =
create_sender_allocation(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None).await;

// Trigger a RAV request manually and wait for updated fees.
// this should fail because there's no receipt with valid timestamp
let (total_unaggregated_fees, _rav) = call!(
let rav_response = call!(
sender_allocation,
SenderAllocationMessage::TriggerRAVRequest
)
.unwrap();
// If it is an error then rav request failed
assert!(rav_response.is_err());

// expect the actor to keep running
assert_eq!(sender_allocation.get_status(), ActorStatus::Running);

// Check that the unaggregated fees return the same value
assert_eq!(total_unaggregated_fees.value, 45u128);
// TODO: Maybe this can no longer be checked?
//assert_eq!(total_unaggregated_fees.value, 45u128);
}

#[sqlx::test(migrations = "../migrations")]
Expand Down Expand Up @@ -1567,12 +1550,13 @@ pub mod tests {
.await;
// Trigger a RAV request manually and wait for updated fees.
// this should fail because there's no receipt with valid timestamp
let (total_unaggregated_fees, _rav) = call!(
let rav_response = call!(
sender_allocation,
SenderAllocationMessage::TriggerRAVRequest
)
.unwrap();
assert_eq!(total_unaggregated_fees.value, 0);
// If it is an error then rav request failed
assert!(rav_response.is_err());

let invalid_receipts = sqlx::query!(
r#"
Expand Down
65 changes: 65 additions & 0 deletions tap-agent/src/agent/sender_fee_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ pub struct SenderFeeTracker {
// heaviest allocation, because they are already marked for finalization,
// and thus requesting RAVs on their own in their `post_stop` routine.
blocked_addresses: HashSet<Address>,
failed_ravs: HashMap<Address, FailedRavInfo>,
}

#[derive(Debug, Clone)]
pub struct FailedRavInfo {
failed_ravs_count: u32,
failed_rav_backoff_time: Instant,
}

impl Default for FailedRavInfo {
fn default() -> Self {
Self {
failed_ravs_count: 0,
failed_rav_backoff_time: Instant::now(),
}
}
}

impl SenderFeeTracker {
Expand Down Expand Up @@ -100,9 +116,16 @@ impl SenderFeeTracker {

pub fn get_heaviest_allocation_id(&mut self) -> Option<Address> {
// just loop over and get the biggest fee
let now = Instant::now();
self.id_to_fee
.iter()
.filter(|(addr, _)| !self.blocked_addresses.contains(*addr))
.filter(|(addr, _)| {
self.failed_ravs
.get(*addr)
.map(|failed_rav| now > failed_rav.failed_rav_backoff_time)
.unwrap_or(true)
})
// map to the value minus fees in buffer
.map(|(addr, fee)| {
(
Expand Down Expand Up @@ -148,6 +171,17 @@ impl SenderFeeTracker {
acc + expiring.get_sum(&self.buffer_window_duration)
})
}
pub fn failed_rav_backoff(&mut self, allocation_id: Address) {
// backoff = max(100ms * 2 ^ retries, 60s)
let failed_rav = self.failed_ravs.entry(allocation_id).or_default();
failed_rav.failed_rav_backoff_time = Instant::now()
+ (Duration::from_millis(100) * 2u32.pow(failed_rav.failed_ravs_count))
.min(Duration::from_secs(60));
failed_rav.failed_ravs_count += 1;
}
pub fn ok_rav_request(&mut self, allocation_id: Address) {
self.failed_ravs.remove(&allocation_id);
}
}

#[cfg(test)]
Expand Down Expand Up @@ -288,4 +322,35 @@ mod tests {
assert_eq!(tracker.get_total_fee_outside_buffer(), 0);
assert_eq!(tracker.get_total_fee(), 0);
}

#[test]
fn test_filtered_backed_off_allocations() {
let allocation_id_0 = address!("abababababababababababababababababababab");
let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc");
const BACK_SLEEP_DURATION: Duration = Duration::from_millis(201);

let mut tracker = SenderFeeTracker::default();
assert_eq!(tracker.get_heaviest_allocation_id(), None);
assert_eq!(tracker.get_total_fee(), 0);

tracker.update(allocation_id_0, 10);
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0));
assert_eq!(tracker.get_total_fee(), 10);

tracker.update(allocation_id_1, 20);
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1));
assert_eq!(tracker.get_total_fee(), 30);

// Simulate failed rav and backoff
tracker.failed_rav_backoff(allocation_id_1);

// Heaviest should be the first since its not blocked nor failed
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0));
assert_eq!(tracker.get_total_fee(), 30);

sleep(BACK_SLEEP_DURATION);

assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1));
assert_eq!(tracker.get_total_fee(), 30);
}
}
Loading