11// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22// SPDX-License-Identifier: Apache-2.0
33
4+ //! # Adaptative concurrency
5+ //! This module provide [AdaptiveLimiter] as a tool to allow concurrency.
6+ //! It's implemented with a Additive increase, Multiplicative decrease
7+ //! ([AIMD](https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease))
8+ //! strategy.
9+ //!
10+ //!
11+ //!
12+ //! This allows us to have a big number of rav requests running
13+ //! concurrently, but in case of any of them fails, we limit
14+ //! the following requests until the aggregator recovers.
15+ //!
16+ //! ## Behaviour
17+ //! On every request, the caller acquires a slot by calling [AdaptiveLimiter::acquire()],
18+ //! this will increment the number of in_flight connections.
19+ //!
20+ //! If we receive a successful response, we increment our limit to be able to process
21+ //! one more request concurrently.
22+ //!
23+ //! If we receive a failed response, we decrement our limit by half so quickly
24+ //! relieve the pressure in the system.
25+
426use std:: ops:: Range ;
527
28+ /// Simple struct that keeps track of concurrent requests
29+ ///
30+ /// More information on [crate::adaptative_concurrency]
631pub struct AdaptiveLimiter {
732 range : Range < usize > ,
833 current_limit : usize ,
934 in_flight : usize ,
1035}
1136
1237impl AdaptiveLimiter {
38+ /// Creates an instance of [AdaptiveLimiter] with an `initial_limit`
39+ /// and a `range` that contains the minimum and maximum of concurrent
40+ /// requests
1341 pub fn new ( initial_limit : usize , range : Range < usize > ) -> Self {
1442 Self {
1543 range,
@@ -18,24 +46,33 @@ impl AdaptiveLimiter {
1846 }
1947 }
2048
49+ /// Acquires a slot in our limiter, returning `bool`
50+ /// representing if we had limit available or not
2151 pub fn acquire ( & mut self ) -> bool {
2252 self . has_limit ( ) && {
2353 self . in_flight += 1 ;
2454 true
2555 }
2656 }
2757
58+ /// Returns if there're slots available
2859 pub fn has_limit ( & self ) -> bool {
2960 self . in_flight < self . current_limit
3061 }
3162
63+ /// Callback function that removes in_flight counter
64+ /// and if the current limit is lower than the provided
65+ /// limit, increase the current limit by 1.
3266 pub fn on_success ( & mut self ) {
3367 self . in_flight -= 1 ;
3468 if self . current_limit < self . range . end {
3569 self . current_limit += 1 ; // Additive Increase
3670 }
3771 }
3872
73+ /// Callback function that removes in_flight counter
74+ /// and decreasing the current limit by half, with
75+ /// minimum value to configured value.
3976 pub fn on_failure ( & mut self ) {
4077 // Multiplicative Decrease
4178 self . in_flight -= 1 ;
0 commit comments