Skip to content

Commit f9b64f3

Browse files
authored
fix: add backoff if could not find heaviest allocation (#397)
1 parent b4c876d commit f9b64f3

File tree

4 files changed

+68
-62
lines changed

4 files changed

+68
-62
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use super::sender_allocation::{SenderAllocation, SenderAllocationArgs};
3030
use crate::adaptative_concurrency::AdaptiveLimiter;
3131
use crate::agent::sender_allocation::SenderAllocationMessage;
3232
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
33+
use crate::backoff::BackoffInfo;
3334
use crate::tracker::{SenderFeeTracker, SimpleFeeTracker};
3435
use crate::{
3536
config::{self},
@@ -161,6 +162,9 @@ pub struct State {
161162
config: &'static config::Config,
162163
pgpool: PgPool,
163164
sender_aggregator: jsonrpsee::http_client::HttpClient,
165+
166+
// Backoff info
167+
backoff_info: BackoffInfo,
164168
}
165169

166170
impl State {
@@ -211,6 +215,7 @@ impl State {
211215
.sender_fee_tracker
212216
.get_heaviest_allocation_id()
213217
.ok_or_else(|| {
218+
self.backoff_info.fail();
214219
anyhow::anyhow!(
215220
"Error while getting the heaviest allocation, \
216221
this is due one of the following reasons: \n
@@ -221,6 +226,7 @@ impl State {
221226
If this doesn't work, open an issue on our Github."
222227
)
223228
})?;
229+
self.backoff_info.ok();
224230
self.rav_request_for_allocation(allocation_id).await
225231
}
226232

@@ -546,6 +552,7 @@ impl Actor for SenderAccount {
546552
retry_interval,
547553
scheduled_rav_request: None,
548554
adaptive_limiter: AdaptiveLimiter::new(INITIAL_RAV_REQUEST_CONCURRENT, 1..50),
555+
backoff_info: BackoffInfo::default(),
549556
};
550557

551558
for allocation_id in &allocation_ids {
@@ -650,39 +657,33 @@ impl Actor for SenderAccount {
650657

651658
let has_available_slots_for_requests = state.adaptive_limiter.has_limit();
652659
if has_available_slots_for_requests {
660+
let total_fee_outside_buffer = state.sender_fee_tracker.get_ravable_total_fee();
653661
let total_counter_for_allocation = state
654662
.sender_fee_tracker
655663
.get_count_outside_buffer_for_allocation(&allocation_id);
656664
let can_trigger_rav = state.sender_fee_tracker.can_trigger_rav(allocation_id);
657665
let counter_greater_receipt_limit = total_counter_for_allocation
658666
>= state.config.tap.rav_request_receipt_limit
659667
&& can_trigger_rav;
660-
let total_fee_outside_buffer = state.sender_fee_tracker.get_ravable_total_fee();
661-
let total_fee_greater_trigger_value =
662-
total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value;
663-
let rav_result = match (
664-
counter_greater_receipt_limit,
665-
total_fee_greater_trigger_value,
666-
) {
667-
(true, _) => {
668-
tracing::debug!(
669-
total_counter_for_allocation,
670-
rav_request_receipt_limit = state.config.tap.rav_request_receipt_limit,
671-
%allocation_id,
672-
"Total counter greater than the receipt limit per rav. Triggering RAV request"
673-
);
674-
675-
state.rav_request_for_allocation(allocation_id).await
676-
}
677-
(_, true) => {
678-
tracing::debug!(
679-
total_fee_outside_buffer,
680-
trigger_value = state.config.tap.rav_request_trigger_value,
681-
"Total fee greater than the trigger value. Triggering RAV request"
682-
);
683-
state.rav_request_for_heaviest_allocation().await
684-
}
685-
_ => Ok(()),
668+
let rav_result = if !state.backoff_info.in_backoff()
669+
&& total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value
670+
{
671+
tracing::debug!(
672+
total_fee_outside_buffer,
673+
trigger_value = state.config.tap.rav_request_trigger_value,
674+
"Total fee greater than the trigger value. Triggering RAV request"
675+
);
676+
state.rav_request_for_heaviest_allocation().await
677+
} else if counter_greater_receipt_limit {
678+
tracing::debug!(
679+
total_counter_for_allocation,
680+
rav_request_receipt_limit = state.config.tap.rav_request_receipt_limit,
681+
%allocation_id,
682+
"Total counter greater than the receipt limit per rav. Triggering RAV request"
683+
);
684+
state.rav_request_for_allocation(allocation_id).await
685+
} else {
686+
Ok(())
686687
};
687688
// In case we fail, we want our actor to keep running
688689
if let Err(err) = rav_result {

tap-agent/src/backoff.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::time::{Duration, Instant};
5+
6+
#[derive(Debug, Clone)]
7+
pub struct BackoffInfo {
8+
failed_count: u32,
9+
failed_backoff_time: Instant,
10+
}
11+
12+
impl BackoffInfo {
13+
pub fn ok(&mut self) {
14+
self.failed_count = 0;
15+
}
16+
17+
pub fn fail(&mut self) {
18+
// backoff = max(100ms * 2 ^ retries, 60s)
19+
self.failed_backoff_time = Instant::now()
20+
+ (Duration::from_millis(100) * 2u32.pow(self.failed_count))
21+
.min(Duration::from_secs(60));
22+
self.failed_count += 1;
23+
}
24+
25+
pub fn in_backoff(&self) -> bool {
26+
let now = Instant::now();
27+
now < self.failed_backoff_time
28+
}
29+
}
30+
31+
impl Default for BackoffInfo {
32+
fn default() -> Self {
33+
Self {
34+
failed_count: 0,
35+
failed_backoff_time: Instant::now(),
36+
}
37+
}
38+
}

tap-agent/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ lazy_static! {
1717

1818
pub mod adaptative_concurrency;
1919
pub mod agent;
20+
pub mod backoff;
2021
pub mod config;
2122
pub mod database;
2223
pub mod metrics;

tap-agent/src/tracker/sender_fee_stats.rs

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33

44
use std::{
55
collections::VecDeque,
6-
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
6+
time::{Duration, SystemTime, UNIX_EPOCH},
77
};
88

9-
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
9+
use crate::{agent::unaggregated_receipts::UnaggregatedReceipts, backoff::BackoffInfo};
1010

1111
use super::{AllocationStats, DefaultFromExtra, DurationInfo};
1212

@@ -84,31 +84,6 @@ impl BufferInfo {
8484
}
8585
}
8686

87-
#[derive(Debug, Clone)]
88-
pub struct BackoffInfo {
89-
failed_ravs_count: u32,
90-
failed_rav_backoff_time: Instant,
91-
}
92-
93-
impl BackoffInfo {
94-
pub fn ok(&mut self) {
95-
self.failed_ravs_count = 0;
96-
}
97-
98-
pub fn fail(&mut self) {
99-
// backoff = max(100ms * 2 ^ retries, 60s)
100-
self.failed_rav_backoff_time = Instant::now()
101-
+ (Duration::from_millis(100) * 2u32.pow(self.failed_ravs_count))
102-
.min(Duration::from_secs(60));
103-
self.failed_ravs_count += 1;
104-
}
105-
106-
pub fn in_backoff(&self) -> bool {
107-
let now = Instant::now();
108-
now < self.failed_rav_backoff_time
109-
}
110-
}
111-
11287
impl DefaultFromExtra<DurationInfo> for SenderFeeStats {
11388
fn default_from_extra(extra: &DurationInfo) -> Self {
11489
SenderFeeStats {
@@ -121,15 +96,6 @@ impl DefaultFromExtra<DurationInfo> for SenderFeeStats {
12196
}
12297
}
12398

124-
impl Default for BackoffInfo {
125-
fn default() -> Self {
126-
Self {
127-
failed_ravs_count: 0,
128-
failed_rav_backoff_time: Instant::now(),
129-
}
130-
}
131-
}
132-
13399
impl AllocationStats<UnaggregatedReceipts> for SenderFeeStats {
134100
fn update(&mut self, v: UnaggregatedReceipts) {
135101
self.total_fee = v.value;

0 commit comments

Comments
 (0)