Skip to content

Commit 5942373

Browse files
authored
feat(quotas): Implement a limit threshold for the cache (#5437)
1 parent ff9ad8b commit 5942373

File tree

6 files changed

+202
-44
lines changed

6 files changed

+202
-44
lines changed

relay-config/src/config.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,6 +1204,13 @@ pub struct Processing {
12041204
///
12051205
/// By default quota caching is disabled.
12061206
pub quota_cache_ratio: Option<f32>,
1207+
/// Relative amount of the total quota limit to which quota caching is applied.
1208+
///
1209+
/// If exceeded, the rate limiter will no longer cache the quota and sync with Redis on every call instead.
1210+
/// Lowering this value reduces the probability of incorrectly over-accepting.
1211+
///
1212+
/// Must be between `0.0` and `1.0`, by default there is no limit configured.
1213+
pub quota_cache_max: Option<f32>,
12071214
/// Configuration for attachment uploads.
12081215
#[serde(default)]
12091216
pub upload: UploadServiceConfig,
@@ -1226,6 +1233,7 @@ impl Default for Processing {
12261233
projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
12271234
max_rate_limit: default_max_rate_limit(),
12281235
quota_cache_ratio: None,
1236+
quota_cache_max: None,
12291237
upload: UploadServiceConfig::default(),
12301238
}
12311239
}
@@ -2618,6 +2626,11 @@ impl Config {
26182626
self.values.processing.quota_cache_ratio
26192627
}
26202628

2629+
/// Maximum limit (ratio) for the in memory quota cache.
2630+
pub fn quota_cache_max(&self) -> Option<f32> {
2631+
self.values.processing.quota_cache_max
2632+
}
2633+
26212634
/// Cache vacuum interval for the cardinality limiter in memory cache.
26222635
///
26232636
/// The cache will scan for expired values based on this interval.

relay-quotas/src/cache.rs

Lines changed: 166 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::num::NonZeroUsize;
1+
use std::num::NonZeroU64;
22
use std::sync::atomic::{AtomicU64, Ordering};
33
use std::time::Duration;
44

@@ -7,12 +7,14 @@ use relay_common::time::UnixTimestamp;
77
/// A ratio is converted to a divisor to perform integer arithmetic, instead of floating point.
88
///
99
/// This is done with the configured precision here.
10-
const RATIO_PRECISION: usize = 10;
10+
const RATIO_PRECISION: u64 = 10;
1111

1212
/// A quota to be checked with the [`OpportunisticQuotaCache`].
1313
#[derive(Debug, Clone, Copy)]
1414
pub struct Quota<T> {
1515
/// The quota limit.
16+
///
17+
/// A negative limit is treated as infinite/unlimited.
1618
pub limit: i64,
1719
/// The quota window size in seconds.
1820
pub window: u64,
@@ -40,7 +42,12 @@ where
4042
/// For example: Setting this to `10 * RATIO_PRECISION` means, if there is 100 quota remaining,
4143
/// the cache will opportunistically accept the next 10 items, if there is a quota of 90 remaining,
4244
/// the cache will accept the next 9 items.
43-
max_over_spend_divisor: NonZeroUsize,
45+
max_over_spend_divisor: NonZeroU64,
46+
47+
/// The maximum amount of quota the cache considers for activation.
48+
///
49+
/// See also: [`Self::with_max`].
50+
limit_max_divisor: Option<NonZeroU64>,
4451

4552
/// Minimum interval between vacuum of the cache.
4653
vacuum_interval: Duration,
@@ -58,14 +65,15 @@ where
5865
/// until it requires synchronization.
5966
///
6067
/// The configured ratio must be in range `[0, 1]`.
61-
pub fn new(max_over_spend_ratio: f32) -> Self {
62-
let max_over_spend_divisor = 1.0f32 / max_over_spend_ratio * RATIO_PRECISION as f32;
68+
pub fn new(max_over_spend: f32) -> Self {
69+
let max_over_spend_divisor = 1.0f32 / max_over_spend * RATIO_PRECISION as f32;
6370
let max_over_spend_divisor =
64-
NonZeroUsize::new(max_over_spend_divisor as usize).unwrap_or(NonZeroUsize::MIN);
71+
NonZeroU64::new(max_over_spend_divisor as u64).unwrap_or(NonZeroU64::MIN);
6572

6673
Self {
6774
cache: Default::default(),
6875
max_over_spend_divisor,
76+
limit_max_divisor: None,
6977
vacuum_interval: Duration::from_secs(30),
7078
// Initialize to 0, this means a vacuum run immediately, but it is going to be fast
7179
// (empty cache) and it requires us to be time/environment independent, time is purely
@@ -74,6 +82,32 @@ where
7482
}
7583
}
7684

85+
/// Relative amount of the total quota limit to which caching is applied.
86+
///
87+
/// If exceeded the cache will no longer cache values for the quota.
88+
/// Lowering this value reduces the probability of incorrectly over-accepting.
89+
///
90+
/// For example: A quota with limit `100` and a configured limit threshold of `0.7` (70%),
91+
/// will no longer be considered for caching if 70% (70) of the quota is consumed.
92+
///
93+
/// By default, no maximum is configured and the entire quota is cached.
94+
pub fn with_max(mut self, max: Option<f32>) -> Self {
95+
self.limit_max_divisor = max.map(|v| {
96+
// Inverting the threshold here simplifies the checking code, but also retains more
97+
// precision for the integer division, since we can expect this value to be large.
98+
//
99+
// 1.0 / 0.95 * 10 = 10
100+
// 1.0 / (1.0 - 0.95) * 10 = 200
101+
//
102+
// 100 * 10 / 10 = 100
103+
// 100 - (100 * 10 / 200) = 95
104+
let max_div = 1.0f32 / (1.0f32 - v.clamp(0.0, 1.0)) * RATIO_PRECISION as f32;
105+
NonZeroU64::new(max_div as u64).unwrap_or(NonZeroU64::MAX)
106+
});
107+
108+
self
109+
}
110+
77111
/// Checks a quota with quantity against the cache.
78112
///
79113
/// The cache may return [`Action::Accept`] indicating the quantity should be accepted.
@@ -82,9 +116,14 @@ where
82116
///
83117
/// Whenever the cache returns [`Action::Check`], the cache requires a call to [`Self::update_quota`],
84118
/// with a synchronized 'consumed' amount.
85-
pub fn check_quota(&self, quota: Quota<T>, quantity: usize) -> Action {
119+
pub fn check_quota(&self, quota: Quota<T>, quantity: u64) -> Action {
86120
let cache = self.cache.pin();
87121

122+
let Ok(limit) = u64::try_from(quota.limit) else {
123+
// Negative limits are infinite.
124+
return Action::Accept;
125+
};
126+
88127
// We can potentially short circuit here with a simple read, the cases:
89128
// 1. `NeedsSync`
90129
// 2. Active with `consumed >= limit`
@@ -110,23 +149,28 @@ where
110149

111150
let total_local_use = local_use + quantity;
112151

152+
let threshold = match self.limit_max_divisor.map(NonZeroU64::get) {
153+
Some(div) => limit * RATIO_PRECISION / div,
154+
None => 0,
155+
};
156+
113157
// Can short circuit here already if consumed is already above or equal to the limit.
114158
//
115159
// We could also propagate this out to the caller as a definitive negative in the
116160
// future. This does require some additional consideration how this would interact with
117161
// refunds, which can reduce the consumed.
118-
if consumed >= quota.limit {
162+
if consumed >= limit.saturating_sub(threshold) {
119163
return CachedQuota::new_needs_sync(total_local_use);
120164
}
121165

122-
let remaining = usize::try_from(quota.limit - consumed).unwrap_or(usize::MAX);
166+
let remaining = limit.saturating_sub(consumed);
123167
let max_allowed_spend = remaining
124168
// Normalize the remaining quota with the window size, to apply the ratio/divisor to the
125169
// per second rate.
126170
//
127171
// This means we get a consistent behaviour for short (10s) quotas (e.g. abuse) as well
128172
// as long (1h) quotas (e.g. spike protection) with a more predictable error.
129-
/ usize::try_from(quota.window).unwrap_or(usize::MAX).max(1)
173+
/ quota.window.max(1)
130174
// Apply ratio precision, which is already pre-multiplied into `max_over_spend_divisor`.
131175
* RATIO_PRECISION
132176
// Apply the actual ratio with the pre-computed divisor.
@@ -156,6 +200,12 @@ where
156200
pub fn update_quota(&self, quota: Quota<T>, consumed: i64) {
157201
let cache = self.cache.pin();
158202

203+
// Consumed quota can be negative due to refunds, we choose to deal with negative quotas
204+
// like they are simply unused.
205+
//
206+
// This only makes the cache stricter and less likely to over accept.
207+
let consumed = u64::try_from(consumed).unwrap_or(0);
208+
159209
cache.update_or_insert(
160210
quota.key,
161211
|q| match q {
@@ -235,7 +285,7 @@ pub enum Action {
235285
/// Accept the quota request.
236286
Accept,
237287
/// Synchronize the quota with the returned quantity.
238-
Check(usize),
288+
Check(u64),
239289
}
240290

241291
/// State of a cached quota.
@@ -246,19 +296,19 @@ enum CachedQuota {
246296
NeedsSync,
247297
/// Like [`Self::NeedsSync`], but also carries a total quantity which needs to be synchronized
248298
/// with the store.
249-
NeedsSyncWithQuantity(NonZeroUsize),
299+
NeedsSyncWithQuantity(NonZeroU64),
250300
/// The cache is active and can still make decisions without a synchronization.
251301
Active {
252-
consumed: i64,
253-
local_use: usize,
302+
consumed: u64,
303+
local_use: u64,
254304
expiry: UnixTimestamp,
255305
},
256306
}
257307

258308
impl CachedQuota {
259309
/// Creates [`Self::NeedsSync`] for a quantity of `0`, [`Self::NeedsSyncWithQuantity`] otherwise.
260-
pub fn new_needs_sync(quantity: usize) -> Self {
261-
NonZeroUsize::new(quantity)
310+
pub fn new_needs_sync(quantity: u64) -> Self {
311+
NonZeroU64::new(quantity)
262312
.map(Self::NeedsSyncWithQuantity)
263313
.unwrap_or(Self::NeedsSync)
264314
}
@@ -279,6 +329,16 @@ impl CachedQuota {
279329
mod tests {
280330
use super::*;
281331

332+
/// Returns a simple quota with a limit and a window of 1 second.
333+
fn simple_quota(limit: i64) -> Quota<&'static str> {
334+
Quota {
335+
limit,
336+
window: 1,
337+
key: "simple_quota_key",
338+
expiry: UnixTimestamp::from_secs(300),
339+
}
340+
}
341+
282342
#[test]
283343
fn test_opp_quota() {
284344
let cache = OpportunisticQuotaCache::new(0.1);
@@ -331,11 +391,24 @@ mod tests {
331391
assert_eq!(cache.check_quota(q2, 1), Action::Check(1));
332392

333393
// Negative state can exist due to refunds.
334-
cache.update_quota(q1, -100);
335-
// We now have `200` remaining quota -> 20 (= 10%).
336-
assert_eq!(cache.check_quota(q1, 20), Action::Accept);
394+
cache.update_quota(q1, -123);
395+
// The cache considers a negative quota like `0`, `100` remaining quota -> 10 (= 10%).
396+
assert_eq!(cache.check_quota(q1, 10), Action::Accept);
337397
// Too much, check the entire local usage.
338-
assert_eq!(cache.check_quota(q1, 1), Action::Check(21));
398+
assert_eq!(cache.check_quota(q1, 1), Action::Check(11));
399+
}
400+
401+
#[test]
402+
fn test_opp_quota_100_percent() {
403+
let cache = OpportunisticQuotaCache::new(1.0);
404+
405+
let q1 = simple_quota(100);
406+
407+
cache.update_quota(q1, 0);
408+
for _ in 0..100 {
409+
assert_eq!(cache.check_quota(q1, 1), Action::Accept,);
410+
}
411+
assert_eq!(cache.check_quota(q1, 1), Action::Check(101));
339412
}
340413

341414
#[test]
@@ -389,6 +462,78 @@ mod tests {
389462
assert_eq!(cache.check_quota(q1, 1), Action::Check(1));
390463
}
391464

465+
#[test]
466+
fn test_opp_quota_limit_threshold() {
467+
let cache = OpportunisticQuotaCache::new(0.1).with_max(Some(0.7));
468+
469+
let q1 = simple_quota(100);
470+
471+
// First access always needs synchronization.
472+
assert_eq!(cache.check_quota(q1, 1), Action::Check(1));
473+
474+
// 50 remaining -> 5 (10%), consumption still under limit threshold (70).
475+
cache.update_quota(q1, 50);
476+
// Nothing special here.
477+
assert_eq!(cache.check_quota(q1, 5), Action::Accept);
478+
assert_eq!(cache.check_quota(q1, 1), Action::Check(6));
479+
480+
// 31 remaining -> 3 (10%), consumption still under limit threshold (70),
481+
// but maximum cached consumption would be *above* the threshold, this is currently
482+
// explicitly not considered (but this behaviour may be changed in the future).
483+
cache.update_quota(q1, 69);
484+
assert_eq!(cache.check_quota(q1, 3), Action::Accept);
485+
assert_eq!(cache.check_quota(q1, 1), Action::Check(4));
486+
487+
// 30 remaining -> 3 (10%), *but* threshold (70%) is now reached.
488+
cache.update_quota(q1, 70);
489+
assert_eq!(cache.check_quota(q1, 1), Action::Check(1));
490+
// Sanity check, that exhausting the limit fully, still works.
491+
cache.update_quota(q1, 100);
492+
assert_eq!(cache.check_quota(q1, 1), Action::Check(1));
493+
494+
// Resetting consumption to a lower value (refunds) should still work.
495+
cache.update_quota(q1, 50);
496+
assert_eq!(cache.check_quota(q1, 5), Action::Accept);
497+
assert_eq!(cache.check_quota(q1, 1), Action::Check(6));
498+
}
499+
500+
#[test]
501+
fn test_opp_quota_limit_threshold_very_large() {
502+
let cache = OpportunisticQuotaCache::new(0.1).with_max(Some(420.0));
503+
504+
let q1 = simple_quota(100);
505+
506+
cache.update_quota(q1, 90);
507+
assert_eq!(cache.check_quota(q1, 1), Action::Accept);
508+
assert_eq!(cache.check_quota(q1, 1), Action::Check(2));
509+
}
510+
511+
#[test]
512+
fn test_opp_quota_limit_threshold_very_small() {
513+
let cache = OpportunisticQuotaCache::new(0.1).with_max(Some(-1.0));
514+
515+
let q1 = simple_quota(100);
516+
517+
// A negative or `0` limit threshold essentially disables the cache.
518+
cache.update_quota(q1, 0);
519+
assert_eq!(cache.check_quota(q1, 1), Action::Check(1));
520+
}
521+
522+
/// Negative limits should be considered as infinite.
523+
#[test]
524+
fn test_opp_quota_negative_limit() {
525+
let cache = OpportunisticQuotaCache::new(0.1);
526+
527+
let q1 = Quota {
528+
limit: -1,
529+
window: 10,
530+
key: "k1",
531+
expiry: UnixTimestamp::from_secs(300),
532+
};
533+
534+
assert_eq!(cache.check_quota(q1, 99999), Action::Accept);
535+
}
536+
392537
/// The test asserts the cache behaves correctly if the limit of a quota changes.
393538
#[test]
394539
fn test_opp_quota_limit_change() {
@@ -422,12 +567,7 @@ mod tests {
422567
fn test_opp_quota_zero() {
423568
let cache = OpportunisticQuotaCache::new(0.0);
424569

425-
let q1 = Quota {
426-
limit: 100,
427-
window: 1,
428-
key: "k1",
429-
expiry: UnixTimestamp::from_secs(300),
430-
};
570+
let q1 = simple_quota(100);
431571

432572
// Not synchronized -> always check.
433573
assert_eq!(cache.check_quota(q1, 1), Action::Check(1));

0 commit comments

Comments
 (0)