Skip to content

Commit 66507e1

Browse files
authored
feat: Client-side rate limiting (#564)
1 parent 6eb145f commit 66507e1

File tree

12 files changed

+391
-46
lines changed

12 files changed

+391
-46
lines changed
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import struct Foundation.TimeInterval
9+
import struct Foundation.Date
10+
import func Foundation.pow
11+
12+
actor ClientSideRateLimiter {
13+
14+
// these are constants defined in Retry Behavior 2.0
15+
let minFillRate: Double = 0.5
16+
let minCapacity: Double = 1.0
17+
let smooth: Double = 0.8
18+
let beta = 0.7
19+
let scaleConstant = 0.4
20+
21+
// these are state variables explicitly declared in Retry Behavior 2.0
22+
var fillRate: Double = 0.0
23+
var maxCapacity: Double = 0.0
24+
var currentCapacity: Double = 0.0
25+
var lastTimestamp: TimeInterval? = 0.0
26+
var enabled = false
27+
var measuredTXRate: Double = 0.0
28+
var lastTXRateBucket: Double
29+
var requestCount: Int = 0
30+
var lastMaxRate: Double = 0.0
31+
var lastThrottleTime: TimeInterval
32+
33+
// not explicitly included as state in Retry Behavior 2.0, but it said to cache the
34+
// value when lastMaxRate changes
35+
var timeWindow: Double = 0.0
36+
37+
// Returns the current time when called.
38+
// Exposed so time may be mocked for testing.
39+
var clock: () -> TimeInterval
40+
41+
/// Creates a new client-side rate limiter.
42+
///
43+
/// Parameters are for use during testing. To create this type for actual use, call `.init()`.
44+
/// - Parameters:
45+
/// - lastMaxRate: The last max rate to set. For testing use only.
46+
/// - lastThrottleTime: The last throttle time to set. For testing use only.
47+
/// - clock: An anonymous closure that provides the current time in the form of a timestamp. Defaults to actual time. For testing use only.
48+
init(
49+
lastMaxRate: Double = 0.0,
50+
lastThrottleTime: TimeInterval? = nil,
51+
clock: @escaping () -> TimeInterval = { Date().timeIntervalSinceReferenceDate }
52+
) {
53+
self.lastMaxRate = lastMaxRate
54+
self.lastThrottleTime = lastThrottleTime ?? clock()
55+
self.lastTXRateBucket = Self.floor(clock())
56+
self.clock = clock
57+
}
58+
59+
// The following functions are built exactly as described in Retry Behavior 2.0.
60+
61+
func tokenBucketAcquire(amount: Double) -> TimeInterval? {
62+
if !enabled { return nil }
63+
tokenBucketRefill()
64+
if amount <= currentCapacity {
65+
currentCapacity -= amount
66+
return nil
67+
} else {
68+
let delay = (amount - currentCapacity) / fillRate
69+
currentCapacity -= amount
70+
return delay
71+
}
72+
}
73+
74+
private func tokenBucketRefill() {
75+
let timestamp = clock()
76+
guard let lastTimestamp = lastTimestamp else {
77+
self.lastTimestamp = timestamp
78+
return
79+
}
80+
let fillAmount = (timestamp - lastTimestamp) * fillRate
81+
currentCapacity = min(maxCapacity, currentCapacity + fillAmount)
82+
self.lastTimestamp = timestamp
83+
}
84+
85+
private func tokenBucketUpdateRate(newRPS: Double) {
86+
tokenBucketRefill()
87+
fillRate = max(newRPS, minFillRate)
88+
maxCapacity = max(newRPS, minCapacity)
89+
currentCapacity = min(currentCapacity, maxCapacity)
90+
}
91+
92+
private func tokenBucketEnable() {
93+
enabled = true
94+
}
95+
96+
private func updateMeasuredRate() {
97+
let t = clock()
98+
let timeBucket = Self.floor(t * 2.0) / 2.0
99+
requestCount += 1
100+
if timeBucket > lastTXRateBucket {
101+
let currentRate = Double(requestCount) / (timeBucket - lastTXRateBucket)
102+
measuredTXRate = (currentRate * smooth) + (measuredTXRate * (1.0 - smooth))
103+
requestCount = 0
104+
lastTXRateBucket = timeBucket
105+
}
106+
}
107+
108+
// Exposed internally for use while testing.
109+
func updateClientSendingRate(isThrottling: Bool) {
110+
updateMeasuredRate()
111+
let calculatedRate: Double
112+
if isThrottling {
113+
let rateToUse = enabled ? min(measuredTXRate, fillRate) : measuredTXRate
114+
lastMaxRate = rateToUse
115+
calculateTimeWindow()
116+
lastThrottleTime = clock()
117+
calculatedRate = cubicThrottle(rateToUse: rateToUse)
118+
tokenBucketEnable()
119+
} else {
120+
calculateTimeWindow()
121+
calculatedRate = cubicSuccess(timestamp: clock())
122+
}
123+
let newRate = min(calculatedRate, 2.0 * measuredTXRate)
124+
tokenBucketUpdateRate(newRPS: newRate)
125+
}
126+
127+
// Exposed internally for use while testing.
128+
func calculateTimeWindow() {
129+
timeWindow = pow(lastMaxRate * (1.0 - beta) / scaleConstant, 1.0 / 3.0)
130+
}
131+
132+
// Exposed internally for use while testing.
133+
func cubicSuccess(timestamp: TimeInterval) -> Double {
134+
let dt = timestamp - lastThrottleTime
135+
return scaleConstant * pow(dt - timeWindow, 3.0) + lastMaxRate
136+
}
137+
138+
// Exposed internally for use while testing.
139+
func cubicThrottle(rateToUse: Double) -> Double {
140+
return rateToUse * beta
141+
}
142+
143+
private static func floor(_ time: TimeInterval) -> TimeInterval {
144+
time.rounded(.down)
145+
}
146+
147+
// The following functions are not described in Retry Behavior 2.0 but are
148+
// used to set test conditions.
149+
150+
func setLastMaxRate(_ newValue: Double) { lastMaxRate = newValue }
151+
152+
func setLastThrottleTime(_ newValue: Double) { lastThrottleTime = newValue }
153+
154+
func setClock(_ newClock: @escaping () -> TimeInterval) { clock = newClock }
155+
}

Sources/ClientRuntime/Retries/DefaultRetryStrategy/DefaultRetryStrategy.swift

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
// SPDX-License-Identifier: Apache-2.0
66
//
77

8-
import Foundation
8+
import struct Foundation.TimeInterval
99

1010
public struct DefaultRetryStrategy: RetryStrategy {
1111
public typealias Token = DefaultRetryToken
@@ -17,27 +17,26 @@ public struct DefaultRetryStrategy: RetryStrategy {
1717
/// Used to inject a mock during unit tests that simulates sleeping.
1818
/// The default `sleeper` function actually sleeps asynchronously.
1919
var sleeper: (TimeInterval) async throws -> Void = { delay in
20+
guard delay > 0.0 else { return }
2021
try await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000.0))
2122
}
2223

2324
public init(options: RetryStrategyOptions) {
2425
self.options = options
25-
self.quotaRepository = RetryQuotaRepository(
26-
availableCapacity: options.availableCapacity,
27-
maxCapacity: options.maxCapacity
28-
)
26+
self.quotaRepository = RetryQuotaRepository(options: options)
2927
}
3028

3129
public func acquireInitialRetryToken(tokenScope: String) async throws -> DefaultRetryToken {
3230
let quota = await quotaRepository.quota(partitionID: tokenScope)
31+
let rateLimitDelay = await quota.getRateLimitDelay()
32+
try await sleeper(rateLimitDelay)
3333
return DefaultRetryToken(quota: quota)
3434
}
3535

3636
public func refreshRetryTokenForRetry(tokenToRenew: DefaultRetryToken, errorInfo: RetryErrorInfo) async throws {
37-
let delay = errorInfo.retryAfterHint ??
37+
let backoffDelay = errorInfo.retryAfterHint ??
3838
options.backoffStrategy.computeNextBackoffDelay(attempt: tokenToRenew.retryCount)
3939
tokenToRenew.retryCount += 1
40-
tokenToRenew.delay = delay
4140
if tokenToRenew.retryCount > options.maxRetriesBase {
4241
throw RetryError.maxAttemptsReached
4342
}
@@ -46,7 +45,10 @@ public struct DefaultRetryStrategy: RetryStrategy {
4645
} else {
4746
throw RetryError.insufficientQuota
4847
}
49-
try await sleeper(tokenToRenew.delay ?? 0.0)
48+
let isThrottling = errorInfo.errorType == .throttling
49+
await tokenToRenew.quota.updateClientSendingRate(isThrottling: isThrottling)
50+
let rateLimitDelay = await tokenToRenew.quota.getRateLimitDelay()
51+
try await sleeper(backoffDelay + rateLimitDelay)
5052
}
5153

5254
public func recordSuccess(token: DefaultRetryToken) async {

Sources/ClientRuntime/Retries/DefaultRetryStrategy/DefaultRetryToken.swift

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,12 @@ import struct Foundation.TimeInterval
1111
///
1212
/// The retry token contains all the state relevant to one request that is needed to manage retry
1313
/// until the request succeeds or fails after zero or more retries.
14-
public class DefaultRetryToken: RetryToken {
14+
public final class DefaultRetryToken: RetryToken {
1515

1616
/// The number of retry attempts that have been made using this token.
1717
/// Defaults to zero at the initial attempt, goes up by one for each subsequent attempt.
1818
public internal(set) var retryCount: Int = 0
1919

20-
/// The delay, in seconds, to the next retry.
21-
public internal(set) var delay: TimeInterval?
22-
2320
/// The amount of quota capacity amount held by this token, if any.
2421
///
2522
/// Tokens have nil capacity amount when created. Quota value is set to a prescribed value when attempting a retry.
@@ -28,6 +25,11 @@ public class DefaultRetryToken: RetryToken {
2825
/// The quota for this token. More than one token (i.e. for requests against the same endpoint) may share a quota.
2926
let quota: RetryQuota
3027

28+
/// Creates a new retry token.
29+
///
30+
/// The quota for this token may be shared with other tokens if other requests are made against an endpoint with
31+
/// the same partition ID.
32+
/// - Parameter quota: The retry quota associated with this token's request.
3133
init(quota: RetryQuota) {
3234
self.quota = quota
3335
}

Sources/ClientRuntime/Retries/DefaultRetryStrategy/RetryQuota.swift

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
// SPDX-License-Identifier: Apache-2.0
66
//
77

8+
import struct Foundation.TimeInterval
9+
810
/// Keeps the retry quota count for one partition ID.
911
///
1012
/// Is shared across all requests with the same partition ID; typically this also correlates to one network connection.
@@ -29,14 +31,36 @@ final actor RetryQuota {
2931
/// The number of tokens this quota currently holds.
3032
var availableCapacity: Int
3133

34+
/// The rate limiter to be used, if any.
35+
private var rateLimiter: ClientSideRateLimiter?
36+
3237
/// Sets the current capacity in this quota. To be used for testing only.
3338
func setAvailableCapacity(_ availableCapacity: Int) { self.availableCapacity = availableCapacity }
3439

3540
/// Creates a new quota, optionally with reduced available capacity (used for testing.)
3641
/// `maxCapacity` cannot be set less than available.
37-
init(availableCapacity: Int, maxCapacity: Int) {
42+
/// - Parameters:
43+
/// - availableCapacity: The number of tokens in this quota at creation.
44+
/// - maxCapacity: <#maxCapacity description#>
45+
/// - rateLimitingMode: <#rateLimitingMode description#>
46+
init(
47+
availableCapacity: Int,
48+
maxCapacity: Int,
49+
rateLimitingMode: RetryStrategyOptions.RateLimitingMode = .standard
50+
) {
3851
self.availableCapacity = availableCapacity
3952
self.maxCapacity = max(maxCapacity, availableCapacity)
53+
self.rateLimiter = rateLimitingMode == .adaptive ? ClientSideRateLimiter() : nil
54+
}
55+
56+
/// Creates a new quota with settings from the passed options.
57+
/// - Parameter options: The retry strategy options from which to configure this retry quota
58+
convenience init(options: RetryStrategyOptions) {
59+
self.init(
60+
availableCapacity: options.availableCapacity,
61+
maxCapacity: options.maxCapacity,
62+
rateLimitingMode: options.rateLimitingMode
63+
)
4064
}
4165

4266
/// Deducts the proper number of tokens from available & returns them.
@@ -59,4 +83,12 @@ final actor RetryQuota {
5983
availableCapacity += capacityAmount ?? Self.noRetryIncrement
6084
availableCapacity = min(availableCapacity, maxCapacity)
6185
}
86+
87+
func getRateLimitDelay() async -> TimeInterval {
88+
await rateLimiter?.tokenBucketAcquire(amount: 1.0) ?? 0.0
89+
}
90+
91+
func updateClientSendingRate(isThrottling: Bool) async {
92+
await rateLimiter?.updateClientSendingRate(isThrottling: isThrottling)
93+
}
6294
}

Sources/ClientRuntime/Retries/DefaultRetryStrategy/RetryQuotaRepository.swift

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,11 @@
77

88
/// Holds multiple quotas, keyed by partition IDs.
99
actor RetryQuotaRepository {
10-
let maxCapacity: Int
11-
let availableCapacity: Int
10+
let options: RetryStrategyOptions
1211
private var quotas = [String: RetryQuota]()
1312

14-
init(availableCapacity: Int, maxCapacity: Int) {
15-
self.availableCapacity = availableCapacity
16-
self.maxCapacity = maxCapacity
13+
init(options: RetryStrategyOptions) {
14+
self.options = options
1715
}
1816

1917
/// Returns the quota for the given partition ID.
@@ -26,7 +24,7 @@ actor RetryQuotaRepository {
2624
if let quota = quotas[partitionID] {
2725
return quota
2826
} else {
29-
let newQuota = RetryQuota(availableCapacity: availableCapacity, maxCapacity: maxCapacity)
27+
let newQuota = RetryQuota(options: options)
3028
quotas[partitionID] = newQuota
3129
return newQuota
3230
}

Sources/ClientRuntime/Retries/RetryStrategyOptions.swift

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,26 @@ public struct RetryStrategyOptions {
1313
/// This is more of a hint since a custom retry strategy could be aware of certain operational contexts ("partition fail over")
1414
public let maxRetriesBase: Int
1515

16+
/// Sets the mode used for rate limiting requests in response to throttling.
17+
public enum RateLimitingMode {
18+
19+
/// Requests may be sent immediately, and are not delayed for rate limiting when throttling is detected.
20+
///
21+
/// This is default retry behavior.
22+
case standard
23+
24+
/// Initial and retry requests may be delayed by an additional amount when throttling is detected.
25+
///
26+
/// This is sometimes called "adaptive" or "client-side rate limiting" mode, and is available opt-in.
27+
case adaptive
28+
}
29+
30+
/// The mode to be used for rate-limiting requests.
31+
///
32+
/// In `standard` mode, requests are only delayed according to the backoff strategy in use. In `adaptive` mode, requests are
33+
/// delayed when the server indicates that requests are being throttled.
34+
public let rateLimitingMode: RateLimitingMode
35+
1636
/// Sets the initial available capacity for this retry strategy's quotas.
1737
///
1838
/// Used only during testing, production uses the default values.
@@ -33,11 +53,13 @@ public struct RetryStrategyOptions {
3353
backoffStrategy: RetryBackoffStrategy = ExponentialBackoffStrategy(),
3454
maxRetriesBase: Int = 2,
3555
availableCapacity: Int = 500,
36-
maxCapacity: Int = 500
56+
maxCapacity: Int = 500,
57+
rateLimitingMode: RateLimitingMode = .standard
3758
) {
3859
self.backoffStrategy = backoffStrategy
3960
self.maxRetriesBase = maxRetriesBase
4061
self.availableCapacity = availableCapacity
4162
self.maxCapacity = maxCapacity
63+
self.rateLimitingMode = rateLimitingMode
4264
}
4365
}

Sources/ClientRuntime/Retries/RetryToken.swift

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,4 @@ public protocol RetryToken: AnyObject {
1414

1515
/// The number of retries (i.e. NOT including the initial attempt) that this token has made.
1616
var retryCount: Int { get }
17-
18-
/// The delay for this request (TODO: not used, maybe get rid of this?)
19-
var delay: TimeInterval? { get }
2017
}

Tests/ClientRuntimeTests/Retry/DefaultRetryErrorInfoProviderTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import Foundation
99
import XCTest
1010
@testable import ClientRuntime
1111

12-
class DefaultRetryErrorInfoProviderTests: XCTestCase {
12+
final class DefaultRetryErrorInfoProviderTests: XCTestCase {
1313

1414
// MARK: - Modeled errors
1515

0 commit comments

Comments
 (0)