Skip to content

Commit 635e31f

Browse files
committed
feat: add adaptative concurrency to rav requests
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 739c8ea commit 635e31f

File tree

3 files changed

+98
-2
lines changed

3 files changed

+98
-2
lines changed
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use std::ops::Range;
2+
3+
pub struct AdaptiveLimiter {
4+
range: Range<usize>,
5+
current_limit: usize,
6+
in_flight: usize,
7+
}
8+
9+
impl AdaptiveLimiter {
10+
pub fn new(initial_limit: usize, range: Range<usize>) -> Self {
11+
Self {
12+
range,
13+
current_limit: initial_limit,
14+
in_flight: 0,
15+
}
16+
}
17+
18+
pub fn acquire(&mut self) -> bool {
19+
self.has_limit() && {
20+
self.in_flight += 1;
21+
true
22+
}
23+
}
24+
25+
pub fn has_limit(&self) -> bool {
26+
self.in_flight < self.current_limit
27+
}
28+
29+
pub fn on_success(&mut self) {
30+
self.in_flight -= 1;
31+
if self.current_limit < self.range.end {
32+
self.current_limit += 1; // Additive Increase
33+
}
34+
}
35+
36+
pub fn on_failure(&mut self) {
37+
// Multiplicative Decrease
38+
self.in_flight -= 1;
39+
self.current_limit = (self.current_limit / 2).max(self.range.start);
40+
}
41+
}
42+
43+
#[cfg(test)]
44+
mod tests {
45+
use super::AdaptiveLimiter;
46+
47+
#[test]
48+
fn test_adaptative_concurrency() {
49+
let mut limiter = AdaptiveLimiter::new(2, 1..10);
50+
assert_eq!(limiter.current_limit, 2);
51+
assert_eq!(limiter.in_flight, 0);
52+
53+
assert!(limiter.acquire());
54+
assert!(limiter.acquire());
55+
assert!(!limiter.acquire());
56+
assert_eq!(limiter.in_flight, 2);
57+
58+
limiter.on_success();
59+
assert_eq!(limiter.in_flight, 1);
60+
assert_eq!(limiter.current_limit, 3);
61+
limiter.on_success();
62+
assert_eq!(limiter.in_flight, 0);
63+
assert_eq!(limiter.current_limit, 4);
64+
65+
assert!(limiter.acquire());
66+
assert!(limiter.acquire());
67+
assert!(limiter.acquire());
68+
assert!(limiter.acquire());
69+
assert!(!limiter.acquire());
70+
assert_eq!(limiter.in_flight, 4);
71+
assert_eq!(limiter.current_limit, 4);
72+
73+
limiter.on_failure();
74+
assert_eq!(limiter.current_limit, 2);
75+
assert_eq!(limiter.in_flight, 3);
76+
limiter.on_success();
77+
assert_eq!(limiter.current_limit, 3);
78+
assert_eq!(limiter.in_flight, 2);
79+
80+
assert!(limiter.acquire());
81+
assert!(!limiter.acquire());
82+
}
83+
}

tap-agent/src/agent/sender_account.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use tap_core::rav::SignedRAV;
2727
use tracing::{error, Level};
2828

2929
use super::sender_allocation::{SenderAllocation, SenderAllocationArgs};
30+
use crate::adaptative_concurrency::AdaptiveLimiter;
3031
use crate::agent::sender_allocation::SenderAllocationMessage;
3132
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
3233
use crate::tracker::{SenderFeeTracker, SimpleFeeTracker};
@@ -77,6 +78,8 @@ lazy_static! {
7778
.unwrap();
7879
}
7980

81+
const INITIAL_RAV_REQUEST_CONCURRENT: usize = 1;
82+
8083
type RavMap = HashMap<Address, u128>;
8184
type Balance = U256;
8285

@@ -146,6 +149,9 @@ pub struct State {
146149
sender_balance: U256,
147150
retry_interval: Duration,
148151

152+
// concurrent rav request
153+
adaptive_limiter: AdaptiveLimiter,
154+
149155
//Eventuals
150156
escrow_accounts: Eventual<EscrowAccounts>,
151157

@@ -233,6 +239,7 @@ impl State {
233239
"Error while sending and waiting message for actor {allocation_id}. Error: {e}"
234240
)
235241
})?;
242+
self.adaptive_limiter.acquire();
236243
self.sender_fee_tracker.start_rav_request(allocation_id);
237244

238245
Ok(())
@@ -247,6 +254,7 @@ impl State {
247254
match rav_result {
248255
Ok((fees, rav)) => {
249256
self.sender_fee_tracker.ok_rav_request(allocation_id);
257+
self.adaptive_limiter.on_success();
250258

251259
let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
252260
self.update_rav(allocation_id, rav_value);
@@ -257,6 +265,7 @@ impl State {
257265
Err(err) => {
258266
// TODO we should update the total value too
259267
self.sender_fee_tracker.failed_rav_backoff(allocation_id);
268+
self.adaptive_limiter.on_failure();
260269
error!(
261270
"Error while requesting RAV for sender {} and allocation {}: {}",
262271
self.sender, allocation_id, err
@@ -536,6 +545,7 @@ impl Actor for SenderAccount {
536545
sender_balance,
537546
retry_interval,
538547
scheduled_rav_request: None,
548+
adaptive_limiter: AdaptiveLimiter::new(INITIAL_RAV_REQUEST_CONCURRENT, 1..50),
539549
};
540550

541551
for allocation_id in &allocation_ids {
@@ -647,11 +657,13 @@ impl Actor for SenderAccount {
647657
let total_fee_outside_buffer = state.sender_fee_tracker.get_ravable_total_fee();
648658
let total_fee_greater_trigger_value =
649659
total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value;
660+
let has_limit_available = state.adaptive_limiter.has_limit();
650661
let rav_result = match (
662+
has_limit_available,
651663
counter_greater_receipt_limit,
652664
total_fee_greater_trigger_value,
653665
) {
654-
(true, _) => {
666+
(true, true, _) => {
655667
tracing::debug!(
656668
total_counter_for_allocation,
657669
rav_request_receipt_limit = state.config.tap.rav_request_receipt_limit,
@@ -661,7 +673,7 @@ impl Actor for SenderAccount {
661673

662674
state.rav_request_for_allocation(allocation_id).await
663675
}
664-
(_, true) => {
676+
(true, _, true) => {
665677
tracing::debug!(
666678
total_fee_outside_buffer,
667679
trigger_value = state.config.tap.rav_request_trigger_value,

tap-agent/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ lazy_static! {
1515
);
1616
}
1717

18+
pub mod adaptative_concurrency;
1819
pub mod agent;
1920
pub mod config;
2021
pub mod database;

0 commit comments

Comments
 (0)