diff --git a/tap-agent/src/adaptative_concurrency.rs b/tap-agent/src/adaptative_concurrency.rs new file mode 100644 index 000000000..7a07af49e --- /dev/null +++ b/tap-agent/src/adaptative_concurrency.rs @@ -0,0 +1,86 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::ops::Range; + +pub struct AdaptiveLimiter { + range: Range, + current_limit: usize, + in_flight: usize, +} + +impl AdaptiveLimiter { + pub fn new(initial_limit: usize, range: Range) -> Self { + Self { + range, + current_limit: initial_limit, + in_flight: 0, + } + } + + pub fn acquire(&mut self) -> bool { + self.has_limit() && { + self.in_flight += 1; + true + } + } + + pub fn has_limit(&self) -> bool { + self.in_flight < self.current_limit + } + + pub fn on_success(&mut self) { + self.in_flight -= 1; + if self.current_limit < self.range.end { + self.current_limit += 1; // Additive Increase + } + } + + pub fn on_failure(&mut self) { + // Multiplicative Decrease + self.in_flight -= 1; + self.current_limit = (self.current_limit / 2).max(self.range.start); + } +} + +#[cfg(test)] +mod tests { + use super::AdaptiveLimiter; + + #[test] + fn test_adaptative_concurrency() { + let mut limiter = AdaptiveLimiter::new(2, 1..10); + assert_eq!(limiter.current_limit, 2); + assert_eq!(limiter.in_flight, 0); + + assert!(limiter.acquire()); + assert!(limiter.acquire()); + assert!(!limiter.acquire()); + assert_eq!(limiter.in_flight, 2); + + limiter.on_success(); + assert_eq!(limiter.in_flight, 1); + assert_eq!(limiter.current_limit, 3); + limiter.on_success(); + assert_eq!(limiter.in_flight, 0); + assert_eq!(limiter.current_limit, 4); + + assert!(limiter.acquire()); + assert!(limiter.acquire()); + assert!(limiter.acquire()); + assert!(limiter.acquire()); + assert!(!limiter.acquire()); + assert_eq!(limiter.in_flight, 4); + assert_eq!(limiter.current_limit, 4); + + limiter.on_failure(); + assert_eq!(limiter.current_limit, 2); + assert_eq!(limiter.in_flight, 3); + limiter.on_success(); + assert_eq!(limiter.current_limit, 3); + assert_eq!(limiter.in_flight, 2); + + assert!(limiter.acquire()); + assert!(!limiter.acquire()); + } +} diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 690ef2817..73f1bff96 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -27,6 +27,7 @@ use tap_core::rav::SignedRAV; use tracing::{error, Level}; use super::sender_allocation::{SenderAllocation, SenderAllocationArgs}; +use crate::adaptative_concurrency::AdaptiveLimiter; use crate::agent::sender_allocation::SenderAllocationMessage; use crate::agent::unaggregated_receipts::UnaggregatedReceipts; use crate::tracker::{SenderFeeTracker, SimpleFeeTracker}; @@ -77,6 +78,8 @@ lazy_static! { .unwrap(); } +const INITIAL_RAV_REQUEST_CONCURRENT: usize = 1; + type RavMap = HashMap; type Balance = U256; @@ -146,6 +149,9 @@ pub struct State { sender_balance: U256, retry_interval: Duration, + // concurrent rav request + adaptive_limiter: AdaptiveLimiter, + //Eventuals escrow_accounts: Eventual, @@ -233,6 +239,7 @@ impl State { "Error while sending and waiting message for actor {allocation_id}. Error: {e}" ) })?; + self.adaptive_limiter.acquire(); self.sender_fee_tracker.start_rav_request(allocation_id); Ok(()) @@ -247,6 +254,7 @@ impl State { match rav_result { Ok((fees, rav)) => { self.sender_fee_tracker.ok_rav_request(allocation_id); + self.adaptive_limiter.on_success(); let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate); self.update_rav(allocation_id, rav_value); @@ -257,6 +265,7 @@ impl State { Err(err) => { // TODO we should update the total value too self.sender_fee_tracker.failed_rav_backoff(allocation_id); + self.adaptive_limiter.on_failure(); error!( "Error while requesting RAV for sender {} and allocation {}: {}", self.sender, allocation_id, err @@ -536,6 +545,7 @@ impl Actor for SenderAccount { sender_balance, retry_interval, scheduled_rav_request: None, + adaptive_limiter: AdaptiveLimiter::new(INITIAL_RAV_REQUEST_CONCURRENT, 1..50), }; for allocation_id in &allocation_ids { @@ -637,46 +647,50 @@ impl Actor for SenderAccount { if should_deny { state.add_to_denylist().await; } - let total_counter_for_allocation = state - .sender_fee_tracker - .get_count_outside_buffer_for_allocation(&allocation_id); - let can_trigger_rav = state.sender_fee_tracker.can_trigger_rav(allocation_id); - let counter_greater_receipt_limit = total_counter_for_allocation - >= state.config.tap.rav_request_receipt_limit - && can_trigger_rav; - let total_fee_outside_buffer = state.sender_fee_tracker.get_ravable_total_fee(); - let total_fee_greater_trigger_value = - total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value; - let rav_result = match ( - counter_greater_receipt_limit, - total_fee_greater_trigger_value, - ) { - (true, _) => { - tracing::debug!( - total_counter_for_allocation, - rav_request_receipt_limit = state.config.tap.rav_request_receipt_limit, - %allocation_id, - "Total counter greater than the receipt limit per rav. Triggering RAV request" - ); - state.rav_request_for_allocation(allocation_id).await - } - (_, true) => { - tracing::debug!( - total_fee_outside_buffer, - trigger_value = state.config.tap.rav_request_trigger_value, - "Total fee greater than the trigger value. Triggering RAV request" + let has_available_slots_for_requests = state.adaptive_limiter.has_limit(); + if has_available_slots_for_requests { + let total_counter_for_allocation = state + .sender_fee_tracker + .get_count_outside_buffer_for_allocation(&allocation_id); + let can_trigger_rav = state.sender_fee_tracker.can_trigger_rav(allocation_id); + let counter_greater_receipt_limit = total_counter_for_allocation + >= state.config.tap.rav_request_receipt_limit + && can_trigger_rav; + let total_fee_outside_buffer = state.sender_fee_tracker.get_ravable_total_fee(); + let total_fee_greater_trigger_value = + total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value; + let rav_result = match ( + counter_greater_receipt_limit, + total_fee_greater_trigger_value, + ) { + (true, _) => { + tracing::debug!( + total_counter_for_allocation, + rav_request_receipt_limit = state.config.tap.rav_request_receipt_limit, + %allocation_id, + "Total counter greater than the receipt limit per rav. Triggering RAV request" + ); + + state.rav_request_for_allocation(allocation_id).await + } + (_, true) => { + tracing::debug!( + total_fee_outside_buffer, + trigger_value = state.config.tap.rav_request_trigger_value, + "Total fee greater than the trigger value. Triggering RAV request" + ); + state.rav_request_for_heaviest_allocation().await + } + _ => Ok(()), + }; + // In case we fail, we want our actor to keep running + if let Err(err) = rav_result { + tracing::error!( + error = %err, + "There was an error while requesting a RAV." ); - state.rav_request_for_heaviest_allocation().await } - _ => Ok(()), - }; - // In case we fail, we want our actor to keep running - if let Err(err) = rav_result { - tracing::error!( - error = %err, - "There was an error while requesting a RAV." - ); } match (state.denied, state.deny_condition_reached()) { diff --git a/tap-agent/src/lib.rs b/tap-agent/src/lib.rs index 121b14f40..e90671f97 100644 --- a/tap-agent/src/lib.rs +++ b/tap-agent/src/lib.rs @@ -15,6 +15,7 @@ lazy_static! { ); } +pub mod adaptative_concurrency; pub mod agent; pub mod config; pub mod database;