Skip to content

Commit 3911922

Browse files
Preslav LeConvex, Inc.
authored andcommitted
Implement Drop for CacheOp::Go (#20711)
Fix a bug where if a future that execute a request gets dropped, the entry might stay in waiting state in the cache indefintelly. Implement WaitingEntryGuard to make the code error and cancellation safe. GitOrigin-RevId: 78b37224a4e4c7313f1585f87919ff819c4158db
1 parent eb10a09 commit 3911922

File tree

1 file changed

+59
-28
lines changed
  • crates/application/src/cache

1 file changed

+59
-28
lines changed

crates/application/src/cache/mod.rs

Lines changed: 59 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,17 @@ impl<RT: Runtime> CacheManager<RT> {
291291
None => continue 'top,
292292
};
293293

294+
// Create a waiting entry in order to guarantee the waiting entry always
295+
// get cleaned up if the current future returns an error or gets dropped.
296+
let waiting_entry_id = match op {
297+
CacheOp::Go {
298+
waiting_entry_id, ..
299+
} => waiting_entry_id,
300+
_ => None,
301+
};
302+
let mut waiting_entry_guard =
303+
WaitingEntryGuard::new(waiting_entry_id, &key, self.cache.clone());
304+
294305
// Step 2: Perform our cache operation, potentially running the UDF.
295306
let is_cache_hit = match op {
296307
// Serving from cache.
@@ -301,46 +312,30 @@ impl<RT: Runtime> CacheManager<RT> {
301312
// We are executing ourselves.
302313
CacheOp::Go { .. } => false,
303314
};
304-
let waiting_entry_id = match op {
305-
CacheOp::Go {
306-
waiting_entry_id, ..
307-
} => waiting_entry_id,
308-
_ => None,
315+
let (result, table_stats) = match self
316+
.perform_cache_op(&key, op, usage_tracker.clone())
317+
.await?
318+
{
319+
Some(r) => r,
320+
None => continue 'top,
309321
};
310-
let (result, table_stats) =
311-
match self.perform_cache_op(&key, op, usage_tracker.clone()).await {
312-
Ok(Some(r)) => r,
313-
Ok(None) => continue 'top,
314-
Err(err) => {
315-
if let Some(waiting_entry_id) = waiting_entry_id {
316-
self.cache.remove_waiting(&key, waiting_entry_id);
317-
}
318-
return Err(err);
319-
},
320-
};
321322

322323
// Step 3: Validate that the cache result we got is good enough. Is our desired
323324
// timestamp in its validity interval? If it looked at system time, is it not
324325
// too old?
325-
let cache_result = match self.validate_cache_result(&key, ts, result).await {
326-
Ok(Some(r)) => r,
327-
Ok(None) => continue 'top,
328-
Err(err) => {
329-
if let Some(waiting_entry_id) = waiting_entry_id {
330-
self.cache.remove_waiting(&key, waiting_entry_id);
331-
}
332-
return Err(err);
333-
},
326+
let cache_result = match self.validate_cache_result(&key, ts, result).await? {
327+
Some(r) => r,
328+
None => continue 'top,
334329
};
335330

336331
// Step 4: Rewrite the value into the cache. This method will discard the new
337332
// value if the UDF failed or if a newer (i.e. higher `original_ts`)
338333
// value is in the cache.
339334
if cache_result.outcome.result.is_ok() {
340335
// We do not cache JSErrors
341-
self.cache.put_ready(key.clone(), cache_result.clone());
342-
} else if let Some(waiting_entry_id) = waiting_entry_id {
343-
self.cache.remove_waiting(&key, waiting_entry_id);
336+
waiting_entry_guard.complete(cache_result.clone());
337+
} else {
338+
drop(waiting_entry_guard);
344339
}
345340

346341
// Step 5: Log some stuff and return.
@@ -557,6 +552,42 @@ impl<RT: Runtime> CacheManager<RT> {
557552
}
558553
}
559554

555+
// A wrapper struct that makes sure that the waiting entry always gets removed
556+
// when the performing operation is dropped, even if the caller future gets
557+
// canceled.
558+
struct WaitingEntryGuard<'a, RT: Runtime> {
559+
entry_id: Option<u64>,
560+
key: &'a CacheKey,
561+
cache: Cache<RT>,
562+
}
563+
564+
impl<'a, RT: Runtime> WaitingEntryGuard<'a, RT> {
565+
fn new(entry_id: Option<u64>, key: &'a CacheKey, cache: Cache<RT>) -> Self {
566+
Self {
567+
entry_id,
568+
key,
569+
cache,
570+
}
571+
}
572+
573+
// Marks the waiting entry as removed, so we don't have to remove it on Drop
574+
fn complete(&mut self, result: CacheResult) {
575+
self.cache.put_ready(self.key.clone(), result);
576+
// We just performed put_ready so there is no need to perform remove_waiting
577+
// on Drop.
578+
self.entry_id.take();
579+
}
580+
}
581+
582+
impl<'a, RT: Runtime> Drop for WaitingEntryGuard<'a, RT> {
583+
fn drop(&mut self) {
584+
// Remove the cache entry from the cache if still present.
585+
if let Some(entry_id) = self.entry_id {
586+
self.cache.remove_waiting(self.key, entry_id)
587+
}
588+
}
589+
}
590+
560591
struct Inner<RT: Runtime> {
561592
cache: LruCache<CacheKey, CacheEntry<RT>>,
562593
size: usize,

0 commit comments

Comments
 (0)