Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 85 additions & 9 deletions tinyufo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,18 @@ impl<T: Clone + Send + Sync + 'static> FiFoQueues<T> {
// the `extra_weight` is to essentially tell the cache to reserve that amount of weight for
// admission. It is used when calling `evict_to_limit` before admitting the asset itself.
fn evict_to_limit(&self, extra_weight: Weight, buckets: &Buckets<T>) -> Vec<KV<T>> {
let mut evicted = if self.total_weight_limit
< self.small_weight.load(SeqCst) + self.main_weight.load(SeqCst) + extra_weight as usize
{
let current_weight = || {
self.small_weight
.load(SeqCst)
.saturating_add(self.main_weight.load(SeqCst))
.saturating_add(extra_weight as usize)
};
let mut evicted = if self.total_weight_limit < current_weight() {
Vec::with_capacity(1)
} else {
vec![]
};
while self.total_weight_limit
< self.small_weight.load(SeqCst) + self.main_weight.load(SeqCst) + extra_weight as usize
while self.total_weight_limit < current_weight()
{
if let Some(evicted_item) = self.evict_one(buckets) {
evicted.push(evicted_item);
Expand Down Expand Up @@ -292,7 +295,7 @@ impl<T: Clone + Send + Sync + 'static> FiFoQueues<T> {
let v = buckets
.get_map(&to_evict, |bucket| {
let weight = bucket.weight;
self.small_weight.fetch_sub(weight as usize, SeqCst);
self.small_weight.fetch_update(SeqCst, SeqCst, |v| Some(v.saturating_sub(weight as usize))).unwrap();

if bucket.uses.uses() > 1 {
// move to main
Expand Down Expand Up @@ -334,7 +337,7 @@ impl<T: Clone + Send + Sync + 'static> FiFoQueues<T> {
} else {
// evict
let weight = bucket.weight;
self.main_weight.fetch_sub(weight as usize, SeqCst);
self.main_weight.fetch_update(SeqCst, SeqCst, |v| Some(v.saturating_sub(weight as usize))).unwrap();
let data = bucket.data.clone();
buckets.remove(&to_evict);
Some(KV {
Expand Down Expand Up @@ -437,9 +440,9 @@ impl<K: Hash, T: Clone + Send + Sync + 'static> TinyUfo<K, T> {

// Update weight based on queue location
if bucket.queue.is_main() {
self.queues.main_weight.fetch_sub(weight as usize, SeqCst);
self.queues.main_weight.fetch_update(SeqCst, SeqCst, |v| Some(v.saturating_sub(weight as usize))).unwrap();
} else {
self.queues.small_weight.fetch_sub(weight as usize, SeqCst);
self.queues.small_weight.fetch_update(SeqCst, SeqCst, |v| Some(v.saturating_sub(weight as usize))).unwrap();
}

data
Expand Down Expand Up @@ -787,4 +790,77 @@ mod tests {
cache.queues.small_weight.load(SeqCst) + cache.queues.main_weight.load(SeqCst);
assert!(total_weight <= 5); // Should not exceed limit
}

// Regression test for https://github.com/cloudflare/pingora/issues/768
// Verify that weight tracking uses saturating arithmetic to prevent
// underflow panics during concurrent put/evict operations.
#[test]
fn test_weight_saturating_sub_no_underflow() {
// Directly test that the weight atomics handle underflow gracefully
// by simulating the race condition: weight is subtracted before it's added
let cache = TinyUfo::<u64, u64>::new(10, 10);

// Set small_weight to 0 explicitly
cache.queues.small_weight.store(0, SeqCst);

// This simulates what happens during the race: an eviction subtracts
// weight that hasn't been fully added yet. With fetch_sub this would
// underflow (wrap to usize::MAX), with saturating_sub it stays at 0.
cache
.queues
.small_weight
.fetch_update(SeqCst, SeqCst, |v| Some(v.saturating_sub(5)))
.unwrap();

assert_eq!(cache.queues.small_weight.load(SeqCst), 0);

// Same for main_weight
cache.queues.main_weight.store(0, SeqCst);
cache
.queues
.main_weight
.fetch_update(SeqCst, SeqCst, |v| Some(v.saturating_sub(5)))
.unwrap();
assert_eq!(cache.queues.main_weight.load(SeqCst), 0);

// Verify evict_to_limit doesn't panic with saturating addition
// even if weights were somehow inconsistent
cache.queues.small_weight.store(usize::MAX - 1, SeqCst);
cache.queues.main_weight.store(2, SeqCst);
// This would overflow with regular addition: (usize::MAX - 1) + 2 + 0
// With saturating_add it should cap at usize::MAX and not panic
let evicted = cache.queues.evict_to_limit(0, &cache.buckets);
// Should not panic, eviction loop should terminate
drop(evicted);
}

// Regression test for https://github.com/cloudflare/pingora/issues/768
// Concurrent put/evict should not panic from weight underflow
#[test]
fn test_concurrent_put_no_overflow_panic() {
use std::sync::Arc;
use std::thread;

let cache = Arc::new(TinyUfo::<String, String>::new(5, 5));
let mut handles = vec![];

for id in 0..20 {
let cache = cache.clone();
handles.push(thread::spawn(move || {
let keys: Vec<String> = (0..5).map(|k| format!("{}_{}", id, k)).collect();
for iteration in 0..200 {
let value = format!("v_{}", iteration);
for key in &keys {
cache.put(key.clone(), value.clone(), 1);
cache.get(key);
cache.get(key);
}
}
}));
}

for handle in handles {
handle.join().expect("thread should not panic");
}
}
}