Skip to content

Commit a0ce2c7

Browse files
Finish implementing cancel
Note that this implements a recently-changed behavior in the compute platform. Previously, a `PendingTransaction::drop` would *block* on whichever request got the `GoGet` obligation; more recently (and in Viceroy), the `PendingTransaction::drop` does not block on any other transaction.
1 parent aca1cde commit a0ce2c7

File tree

4 files changed

+81
-30
lines changed

4 files changed

+81
-30
lines changed

lib/src/cache.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88
body::Body,
99
wiggle_abi::types::{BodyHandle, CacheOverrideTag},
1010
};
11+
1112
use http::{HeaderMap, HeaderValue};
1213

1314
mod store;
@@ -112,6 +113,15 @@ pub struct CacheEntry {
112113
}
113114

114115
impl CacheEntry {
116+
/// Return a stub entry to hold in CacheBusy.
117+
pub fn stub(&self) -> CacheEntry {
118+
Self {
119+
key: self.key.clone(),
120+
found: None,
121+
go_get: None,
122+
}
123+
}
124+
115125
/// Returns the key used to generate this CacheEntry.
116126
pub fn key(&self) -> &CacheKey {
117127
&self.key

lib/src/component/cache.rs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -301,10 +301,17 @@ impl api::Host for ComponentCtx {
301301
&mut self,
302302
handle: api::BusyHandle,
303303
) -> Result<api::Handle, types::Error> {
304-
let busy_handle: CacheBusyHandle = handle.into();
305-
let handle: CacheHandle = busy_handle.into();
306-
let _ = self.session.cache_entry_mut(handle).await?;
307-
Ok(handle.into())
304+
let handle = handle.into();
305+
// Swap out for a distinct handle, so we don't hit a repeated `close`+`close_busy`:
306+
let entry = self.session.cache_entry_mut(handle).await?;
307+
let mut other_entry = entry.stub();
308+
std::mem::swap(entry, &mut other_entry);
309+
let task = PeekableTask::spawn(Box::pin(async move { Ok(other_entry) })).await;
310+
let h: CacheHandle = self
311+
.session
312+
.insert_cache_op(PendingCacheTask::new(task))
313+
.into();
314+
Ok(h.into())
308315
}
309316

310317
async fn transaction_insert(
@@ -377,18 +384,20 @@ impl api::Host for ComponentCtx {
377384
}
378385
}
379386

380-
async fn close_busy(&mut self, _handle: api::BusyHandle) -> Result<(), types::Error> {
381-
Err(Error::Unsupported {
382-
msg: "Cache API primitives not yet supported",
383-
}
384-
.into())
387+
async fn close_busy(&mut self, handle: api::BusyHandle) -> Result<(), types::Error> {
388+
// Don't wait for the transaction to complete; drop the future to cancel.
389+
let _ = self.session.take_cache_entry(handle.into())?;
390+
Ok(())
385391
}
386392

387-
async fn close(&mut self, _handle: api::Handle) -> Result<(), types::Error> {
388-
Err(Error::Unsupported {
389-
msg: "Cache API primitives not yet supported",
390-
}
391-
.into())
393+
async fn close(&mut self, handle: api::Handle) -> Result<(), types::Error> {
394+
let _ = self
395+
.session
396+
.take_cache_entry(handle.into())?
397+
.task()
398+
.recv()
399+
.await?;
400+
Ok(())
392401
}
393402

394403
async fn get_state(&mut self, handle: api::Handle) -> Result<api::LookupState, types::Error> {

lib/src/wiggle_abi/cache.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -225,10 +225,16 @@ impl FastlyCache for Session {
225225
options_mask: types::CacheLookupOptionsMask,
226226
options: wiggle::GuestPtr<types::CacheLookupOptions>,
227227
) -> Result<types::CacheHandle, Error> {
228-
let busy = self
229-
.transaction_lookup_async(memory, cache_key, options_mask, options)
230-
.await?;
231-
self.cache_busy_handle_wait(memory, busy).await
228+
let headers = load_lookup_options(self, memory, options_mask, options)?;
229+
let key = load_cache_key(memory, cache_key)?;
230+
let cache = Arc::clone(self.cache());
231+
232+
let entry = cache.transaction_lookup(&key, &headers).await;
233+
234+
let task = PeekableTask::spawn(Box::pin(async move { Ok(entry) })).await;
235+
let task = PendingCacheTask::new(task);
236+
let handle = self.insert_cache_op(task);
237+
Ok(handle.into())
232238
}
233239

234240
async fn transaction_lookup_async(
@@ -257,8 +263,12 @@ impl FastlyCache for Session {
257263
handle: types::CacheBusyHandle,
258264
) -> Result<types::CacheHandle, Error> {
259265
let handle = handle.into();
260-
let _ = self.cache_entry_mut(handle).await?;
261-
Ok(handle)
266+
// Swap out for a distinct handle, so we don't hit a repeated `close`+`close_busy`:
267+
let entry = self.cache_entry_mut(handle).await?;
268+
let mut other_entry = entry.stub();
269+
std::mem::swap(entry, &mut other_entry);
270+
let task = PeekableTask::spawn(Box::pin(async move { Ok(other_entry) })).await;
271+
Ok(self.insert_cache_op(PendingCacheTask::new(task)).into())
262272
}
263273

264274
async fn transaction_insert(
@@ -325,18 +335,24 @@ impl FastlyCache for Session {
325335
memory: &mut wiggle::GuestMemory<'_>,
326336
handle: types::CacheHandle,
327337
) -> Result<(), Error> {
328-
Err(Error::NotAvailable("Cache API primitives"))
338+
let entry = self.cache_entry_mut(handle.into()).await?;
339+
if let Some(_) = entry.take_go_get() {
340+
Ok(())
341+
} else {
342+
Err(Error::CacheError(crate::cache::Error::CannotWrite).into())
343+
}
329344
}
330345

331346
async fn close_busy(
332347
&mut self,
333348
memory: &mut wiggle::GuestMemory<'_>,
334349
handle: types::CacheBusyHandle,
335350
) -> Result<(), Error> {
336-
Err(Error::NotAvailable("Cache API primitives"))
351+
// Don't wait for the transaction to complete; drop the future to cancel.
352+
let _ = self.take_cache_entry(handle.into())?;
353+
Ok(())
337354
}
338355

339-
/// Wait for the lookup to be complete, then discard the results.
340356
async fn close(
341357
&mut self,
342358
memory: &mut wiggle::GuestMemory<'_>,

test-fixtures/src/bin/cache_transactional.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use uuid::Uuid;
55

66
fn main() {
77
test_racing_transactions();
8-
test_implicit_cancel();
8+
test_implicit_cancel_of_fetch();
9+
test_implicit_cancel_of_pending();
910
test_explicit_cancel();
1011
}
1112

@@ -62,18 +63,33 @@ fn test_racing_transactions() {
6263
assert_eq!(&read, "hello");
6364
}
6465

65-
fn test_implicit_cancel() {
66+
fn test_implicit_cancel_of_fetch() {
6667
let key = new_key();
6768

6869
let busy1 = Transaction::lookup(key.clone()).execute_async().unwrap();
6970
let busy2 = Transaction::lookup(key.clone()).execute_async().unwrap();
70-
let (tx, pending) = ready_and_pending(busy1, busy2);
71+
let (t1, pending) = ready_and_pending(busy1, busy2);
7172

7273
// Cancel via dropping:
73-
assert!(tx.must_insert_or_update());
74-
std::mem::drop(tx);
75-
//let t2 = pending.wait().unwrap();
76-
//assert!(!t2.found().is_some());
74+
assert!(t1.must_insert_or_update());
75+
std::mem::drop(t1);
76+
let t2 = pending.wait().unwrap();
77+
assert!(t2.found().is_none());
78+
assert!(t2.must_insert_or_update());
79+
}
80+
81+
fn test_implicit_cancel_of_pending() {
82+
let key = new_key();
83+
84+
let busy1 = Transaction::lookup(key.clone()).execute_async().unwrap();
85+
let busy2 = Transaction::lookup(key.clone()).execute_async().unwrap();
86+
let (t1, pending) = ready_and_pending(busy1, busy2);
87+
88+
// Cancel the blocked request via dropping.
89+
// Fun fact, this was a bug in compute platform that we fixed when writing the Viceroy
90+
// implementation!
91+
std::mem::drop(pending);
92+
assert!(t1.must_insert_or_update());
7793
}
7894

7995
fn test_explicit_cancel() {

0 commit comments

Comments
 (0)