Skip to content

Commit 1bde9b4

Browse files
carlosvdrgusinacio
andauthored
feat: Move backoff to to the tracker to remove errors (#377)
* feat: Move backoff to to the tracker to remove errors Co-authored-by: Gustavo Inacio <[email protected]>
1 parent b9b8b82 commit 1bde9b4

File tree

3 files changed

+103
-40
lines changed

3 files changed

+103
-40
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -216,9 +216,24 @@ impl State {
216216
);
217217
};
218218
// we call and wait for the response so we don't process anymore update
219-
let Ok((fees, rav)) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest) else {
219+
let Ok(rav_result) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest) else {
220220
anyhow::bail!("Error while sending and waiting message for actor {allocation_id}");
221221
};
222+
let (fees, rav) = match rav_result {
223+
Ok(ok_value) => {
224+
self.rav_tracker.ok_rav_request(allocation_id);
225+
ok_value
226+
}
227+
Err(err) => {
228+
self.rav_tracker.failed_rav_backoff(allocation_id);
229+
anyhow::bail!(
230+
"Error while requesting RAV for sender {} and allocation {}: {}",
231+
self.sender,
232+
allocation_id,
233+
err
234+
);
235+
}
236+
};
222237

223238
let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
224239
// update rav tracker
@@ -1058,7 +1073,6 @@ pub mod tests {
10581073
next_unaggregated_fees_value: Arc<Mutex<u128>>,
10591074
receipts: Arc<Mutex<Vec<NewReceiptNotification>>>,
10601075
}
1061-
10621076
impl MockSenderAllocation {
10631077
pub fn new_with_triggered_rav_request() -> (Self, Arc<AtomicU32>, Arc<Mutex<u128>>) {
10641078
let triggered_rav_request = Arc::new(AtomicU32::new(0));
@@ -1145,13 +1159,13 @@ pub mod tests {
11451159
4,
11461160
*self.next_rav_value.lock().unwrap(),
11471161
);
1148-
reply.send((
1162+
reply.send(Ok((
11491163
UnaggregatedReceipts {
11501164
value: *self.next_unaggregated_fees_value.lock().unwrap(),
11511165
last_id: 0,
11521166
},
11531167
Some(signed_rav),
1154-
))?;
1168+
)))?;
11551169
}
11561170
SenderAllocationMessage::NewReceipt(receipt) => {
11571171
self.receipts.lock().unwrap().push(receipt);

tap-agent/src/agent/sender_allocation.rs

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,6 @@ pub struct SenderAllocationState {
105105
domain_separator: Eip712Domain,
106106
sender_account_ref: ActorRef<SenderAccountMessage>,
107107

108-
failed_ravs_count: u32,
109-
failed_rav_backoff: Instant,
110-
111108
http_client: jsonrpsee::http_client::HttpClient,
112109
}
113110

@@ -127,7 +124,7 @@ pub struct SenderAllocationArgs {
127124
#[derive(Debug)]
128125
pub enum SenderAllocationMessage {
129126
NewReceipt(NewReceiptNotification),
130-
TriggerRAVRequest(RpcReplyPort<(UnaggregatedReceipts, Option<SignedRAV>)>),
127+
TriggerRAVRequest(RpcReplyPort<anyhow::Result<(UnaggregatedReceipts, Option<SignedRAV>)>>),
131128
#[cfg(test)]
132129
GetUnaggregatedReceipts(RpcReplyPort<UnaggregatedReceipts>),
133130
}
@@ -255,21 +252,17 @@ impl Actor for SenderAllocation {
255252
}
256253
// we use a blocking call here to ensure that only one RAV request is running at a time.
257254
SenderAllocationMessage::TriggerRAVRequest(reply) => {
258-
if state.unaggregated_fees.value > 0 {
259-
// auto backoff retry, on error ignore
260-
if Instant::now() > state.failed_rav_backoff {
261-
if let Err(err) = state.request_rav().await {
262-
error!(error = %err, "Error while requesting rav.");
263-
}
264-
} else {
265-
error!(
266-
"Can't trigger rav request until {:?} (backoff)",
267-
state.failed_rav_backoff
268-
);
269-
}
270-
}
255+
let rav_result = if state.unaggregated_fees.value > 0 {
256+
state
257+
.request_rav()
258+
.await
259+
.map(|_| (state.unaggregated_fees.clone(), state.latest_rav.clone()))
260+
} else {
261+
Err(anyhow!("Unaggregated fee equals zero"))
262+
};
263+
271264
if !reply.is_closed() {
272-
let _ = reply.send((state.unaggregated_fees.clone(), state.latest_rav.clone()));
265+
let _ = reply.send(rav_result);
273266
}
274267
}
275268
#[cfg(test)]
@@ -340,8 +333,6 @@ impl SenderAllocationState {
340333
sender_account_ref: sender_account_ref.clone(),
341334
unaggregated_fees: UnaggregatedReceipts::default(),
342335
invalid_receipts_fees: UnaggregatedReceipts::default(),
343-
failed_rav_backoff: Instant::now(),
344-
failed_ravs_count: 0,
345336
latest_rav,
346337
http_client,
347338
})
@@ -454,25 +445,15 @@ impl SenderAllocationState {
454445
RAVS_CREATED
455446
.with_label_values(&[&self.sender.to_string(), &self.allocation_id.to_string()])
456447
.inc();
457-
self.failed_ravs_count = 0;
458448
Ok(())
459449
}
460450
Err(e) => {
461-
error!(
462-
"Error while requesting RAV for sender {} and allocation {}: {}",
463-
self.sender, self.allocation_id, e
464-
);
465451
if let RavError::AllReceiptsInvalid = e {
466452
self.unaggregated_fees = self.calculate_unaggregated_fee().await?;
467453
}
468454
RAVS_FAILED
469455
.with_label_values(&[&self.sender.to_string(), &self.allocation_id.to_string()])
470456
.inc();
471-
// backoff = max(100ms * 2 ^ retries, 60s)
472-
self.failed_rav_backoff = Instant::now()
473-
+ (Duration::from_millis(100) * 2u32.pow(self.failed_ravs_count))
474-
.max(Duration::from_secs(60));
475-
self.failed_ravs_count += 1;
476457
Err(e.into())
477458
}
478459
}
@@ -1181,6 +1162,7 @@ pub mod tests {
11811162
sender_allocation,
11821163
SenderAllocationMessage::TriggerRAVRequest
11831164
)
1165+
.unwrap()
11841166
.unwrap();
11851167

11861168
// Check that the unaggregated fees are correct.
@@ -1494,19 +1476,20 @@ pub mod tests {
14941476
let sender_allocation =
14951477
create_sender_allocation(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None).await;
14961478

1497-
// Trigger a RAV request manually and wait for updated fees.
1498-
// this should fail because there's no receipt with valid timestamp
1499-
let (total_unaggregated_fees, _rav) = call!(
1479+
let rav_response = call!(
15001480
sender_allocation,
15011481
SenderAllocationMessage::TriggerRAVRequest
15021482
)
15031483
.unwrap();
1484+
// If it is an error then rav request failed
1485+
assert!(rav_response.is_err());
15041486

15051487
// expect the actor to keep running
15061488
assert_eq!(sender_allocation.get_status(), ActorStatus::Running);
15071489

15081490
// Check that the unaggregated fees return the same value
1509-
assert_eq!(total_unaggregated_fees.value, 45u128);
1491+
// TODO: Maybe this can no longer be checked?
1492+
//assert_eq!(total_unaggregated_fees.value, 45u128);
15101493
}
15111494

15121495
#[sqlx::test(migrations = "../migrations")]
@@ -1567,12 +1550,13 @@ pub mod tests {
15671550
.await;
15681551
// Trigger a RAV request manually and wait for updated fees.
15691552
// this should fail because there's no receipt with valid timestamp
1570-
let (total_unaggregated_fees, _rav) = call!(
1553+
let rav_response = call!(
15711554
sender_allocation,
15721555
SenderAllocationMessage::TriggerRAVRequest
15731556
)
15741557
.unwrap();
1575-
assert_eq!(total_unaggregated_fees.value, 0);
1558+
// If it is an error then rav request failed
1559+
assert!(rav_response.is_err());
15761560

15771561
let invalid_receipts = sqlx::query!(
15781562
r#"

tap-agent/src/agent/sender_fee_tracker.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,22 @@ pub struct SenderFeeTracker {
4040
// heaviest allocation, because they are already marked for finalization,
4141
// and thus requesting RAVs on their own in their `post_stop` routine.
4242
blocked_addresses: HashSet<Address>,
43+
failed_ravs: HashMap<Address, FailedRavInfo>,
44+
}
45+
46+
#[derive(Debug, Clone)]
47+
pub struct FailedRavInfo {
48+
failed_ravs_count: u32,
49+
failed_rav_backoff_time: Instant,
50+
}
51+
52+
impl Default for FailedRavInfo {
53+
fn default() -> Self {
54+
Self {
55+
failed_ravs_count: 0,
56+
failed_rav_backoff_time: Instant::now(),
57+
}
58+
}
4359
}
4460

4561
impl SenderFeeTracker {
@@ -100,9 +116,16 @@ impl SenderFeeTracker {
100116

101117
pub fn get_heaviest_allocation_id(&mut self) -> Option<Address> {
102118
// just loop over and get the biggest fee
119+
let now = Instant::now();
103120
self.id_to_fee
104121
.iter()
105122
.filter(|(addr, _)| !self.blocked_addresses.contains(*addr))
123+
.filter(|(addr, _)| {
124+
self.failed_ravs
125+
.get(*addr)
126+
.map(|failed_rav| now > failed_rav.failed_rav_backoff_time)
127+
.unwrap_or(true)
128+
})
106129
// map to the value minus fees in buffer
107130
.map(|(addr, fee)| {
108131
(
@@ -148,6 +171,17 @@ impl SenderFeeTracker {
148171
acc + expiring.get_sum(&self.buffer_window_duration)
149172
})
150173
}
174+
pub fn failed_rav_backoff(&mut self, allocation_id: Address) {
175+
// backoff = max(100ms * 2 ^ retries, 60s)
176+
let failed_rav = self.failed_ravs.entry(allocation_id).or_default();
177+
failed_rav.failed_rav_backoff_time = Instant::now()
178+
+ (Duration::from_millis(100) * 2u32.pow(failed_rav.failed_ravs_count))
179+
.min(Duration::from_secs(60));
180+
failed_rav.failed_ravs_count += 1;
181+
}
182+
pub fn ok_rav_request(&mut self, allocation_id: Address) {
183+
self.failed_ravs.remove(&allocation_id);
184+
}
151185
}
152186

153187
#[cfg(test)]
@@ -288,4 +322,35 @@ mod tests {
288322
assert_eq!(tracker.get_total_fee_outside_buffer(), 0);
289323
assert_eq!(tracker.get_total_fee(), 0);
290324
}
325+
326+
#[test]
327+
fn test_filtered_backed_off_allocations() {
328+
let allocation_id_0 = address!("abababababababababababababababababababab");
329+
let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc");
330+
const BACK_SLEEP_DURATION: Duration = Duration::from_millis(201);
331+
332+
let mut tracker = SenderFeeTracker::default();
333+
assert_eq!(tracker.get_heaviest_allocation_id(), None);
334+
assert_eq!(tracker.get_total_fee(), 0);
335+
336+
tracker.update(allocation_id_0, 10);
337+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0));
338+
assert_eq!(tracker.get_total_fee(), 10);
339+
340+
tracker.update(allocation_id_1, 20);
341+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1));
342+
assert_eq!(tracker.get_total_fee(), 30);
343+
344+
// Simulate failed rav and backoff
345+
tracker.failed_rav_backoff(allocation_id_1);
346+
347+
// Heaviest should be the first since its not blocked nor failed
348+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0));
349+
assert_eq!(tracker.get_total_fee(), 30);
350+
351+
sleep(BACK_SLEEP_DURATION);
352+
353+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1));
354+
assert_eq!(tracker.get_total_fee(), 30);
355+
}
291356
}

0 commit comments

Comments
 (0)