|
1 | | -use crate::{Cas, Error, Store, StoreManager, SwapError}; |
2 | | -use lru::LruCache; |
| 1 | +use crate::{Error, Store, StoreManager}; |
3 | 2 | use spin_core::async_trait; |
4 | | -use std::{ |
5 | | - collections::{HashMap, HashSet}, |
6 | | - future::Future, |
7 | | - num::NonZeroUsize, |
8 | | - sync::Arc, |
9 | | -}; |
10 | | -use tokio::{ |
11 | | - sync::Mutex as AsyncMutex, |
12 | | - task::{self, JoinHandle}, |
13 | | -}; |
14 | | -use tracing::Instrument; |
| 3 | +use std::{collections::HashMap, sync::Arc}; |
15 | 4 |
|
16 | 5 | /// A [`StoreManager`] which delegates to other `StoreManager`s based on the store label. |
17 | 6 | pub struct DelegatingStoreManager { |
@@ -45,334 +34,3 @@ impl StoreManager for DelegatingStoreManager { |
45 | 34 | None |
46 | 35 | } |
47 | 36 | } |
48 | | - |
49 | | -/// Wrap each `Store` produced by the inner `StoreManager` in an asynchronous, |
50 | | -/// write-behind cache. |
51 | | -/// |
52 | | -/// This serves two purposes: |
53 | | -/// |
54 | | -/// - Improve performance with slow and/or distant stores |
55 | | -/// |
56 | | -/// - Provide a relaxed consistency guarantee vs. what a fully synchronous store |
57 | | -/// provides |
58 | | -/// |
59 | | -/// The latter is intended to prevent guests from coming to rely on the |
60 | | -/// synchronous consistency model of an existing implementation which may later |
61 | | -/// be replaced with one providing a more relaxed, asynchronous (i.e. |
62 | | -/// "eventual") consistency model. See also <https://www.hyrumslaw.com/> and |
63 | | -/// <https://xkcd.com/1172/>. |
64 | | -/// |
65 | | -/// This implementation provides a "read-your-writes", asynchronous consistency |
66 | | -/// model such that values are immediately available for reading as soon as they |
67 | | -/// are written as long as the read(s) hit the same cache as the write(s). |
68 | | -/// Reads and writes through separate caches (e.g. separate guest instances or |
69 | | -/// separately-opened references to the same store within a single instance) are |
70 | | -/// _not_ guaranteed to be consistent; not only is cross-cache consistency |
71 | | -/// subject to scheduling and/or networking delays, a given tuple is never |
72 | | -/// refreshed from the backing store once added to a cache since this |
73 | | -/// implementation is intended for use only by short-lived guest instances. |
74 | | -/// |
75 | | -/// Note that, because writes are asynchronous and return immediately, |
76 | | -/// durability is _not_ guaranteed. I/O errors may occur asynchronously after |
77 | | -/// the write operation has returned control to the guest, which may result in |
78 | | -/// the write being lost without the guest knowing. In the future, a separate |
79 | | -/// `write-durable` function could be added to key-value.wit to provide either |
80 | | -/// synchronous or asynchronous feedback on durability for guests which need it. |
81 | | -pub struct CachingStoreManager<T> { |
82 | | - capacity: NonZeroUsize, |
83 | | - inner: T, |
84 | | -} |
85 | | - |
86 | | -const DEFAULT_CACHE_SIZE: usize = 256; |
87 | | - |
88 | | -impl<T> CachingStoreManager<T> { |
89 | | - pub fn new(inner: T) -> Self { |
90 | | - Self::new_with_capacity(NonZeroUsize::new(DEFAULT_CACHE_SIZE).unwrap(), inner) |
91 | | - } |
92 | | - |
93 | | - pub fn new_with_capacity(capacity: NonZeroUsize, inner: T) -> Self { |
94 | | - Self { capacity, inner } |
95 | | - } |
96 | | -} |
97 | | - |
98 | | -#[async_trait] |
99 | | -impl<T: StoreManager> StoreManager for CachingStoreManager<T> { |
100 | | - async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error> { |
101 | | - Ok(Arc::new(CachingStore { |
102 | | - inner: self.inner.get(name).await?, |
103 | | - state: Arc::new(AsyncMutex::new(CachingStoreState { |
104 | | - cache: LruCache::new(self.capacity), |
105 | | - previous_task: None, |
106 | | - })), |
107 | | - })) |
108 | | - } |
109 | | - |
110 | | - fn is_defined(&self, store_name: &str) -> bool { |
111 | | - self.inner.is_defined(store_name) |
112 | | - } |
113 | | - |
114 | | - fn summary(&self, store_name: &str) -> Option<String> { |
115 | | - self.inner.summary(store_name) |
116 | | - } |
117 | | -} |
118 | | - |
119 | | -struct CachingStoreState { |
120 | | - cache: LruCache<String, Option<Vec<u8>>>, |
121 | | - previous_task: Option<JoinHandle<Result<(), Error>>>, |
122 | | -} |
123 | | - |
124 | | -impl CachingStoreState { |
125 | | - /// Wrap the specified task in an outer task which waits for `self.previous_task` before proceeding, and spawn |
126 | | - /// the result. This ensures that write order is preserved. |
127 | | - fn spawn(&mut self, task: impl Future<Output = Result<(), Error>> + Send + 'static) { |
128 | | - let previous_task = self.previous_task.take(); |
129 | | - let task = async move { |
130 | | - if let Some(previous_task) = previous_task { |
131 | | - previous_task |
132 | | - .await |
133 | | - .map_err(|e| Error::Other(format!("{e:?}")))?? |
134 | | - } |
135 | | - |
136 | | - task.await |
137 | | - }; |
138 | | - self.previous_task = Some(task::spawn(task.in_current_span())) |
139 | | - } |
140 | | - |
141 | | - async fn flush(&mut self) -> Result<(), Error> { |
142 | | - if let Some(previous_task) = self.previous_task.take() { |
143 | | - previous_task |
144 | | - .await |
145 | | - .map_err(|e| Error::Other(format!("{e:?}")))?? |
146 | | - } |
147 | | - |
148 | | - Ok(()) |
149 | | - } |
150 | | -} |
151 | | - |
152 | | -struct CachingStore { |
153 | | - inner: Arc<dyn Store>, |
154 | | - state: Arc<AsyncMutex<CachingStoreState>>, |
155 | | -} |
156 | | - |
157 | | -#[async_trait] |
158 | | -impl Store for CachingStore { |
159 | | - async fn after_open(&self) -> Result<(), Error> { |
160 | | - self.inner.after_open().await |
161 | | - } |
162 | | - |
163 | | - async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> { |
164 | | - // Retrieve the specified value from the cache, lazily populating the cache as necessary. |
165 | | - |
166 | | - let mut state = self.state.lock().await; |
167 | | - |
168 | | - if let Some(value) = state.cache.get(key).cloned() { |
169 | | - return Ok(value); |
170 | | - } |
171 | | - |
172 | | - // Flush any outstanding writes prior to reading from store. This is necessary because we need to |
173 | | - // guarantee the guest will read its own writes even if entries have been popped off the end of the LRU |
174 | | - // cache prior to their corresponding writes reaching the backing store. |
175 | | - state.flush().await?; |
176 | | - |
177 | | - let value = self.inner.get(key).await?; |
178 | | - |
179 | | - state.cache.put(key.to_owned(), value.clone()); |
180 | | - |
181 | | - Ok(value) |
182 | | - } |
183 | | - |
184 | | - async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> { |
185 | | - // Update the cache and spawn a task to update the backing store asynchronously. |
186 | | - |
187 | | - let mut state = self.state.lock().await; |
188 | | - |
189 | | - state.cache.put(key.to_owned(), Some(value.to_owned())); |
190 | | - |
191 | | - let inner = self.inner.clone(); |
192 | | - let key = key.to_owned(); |
193 | | - let value = value.to_owned(); |
194 | | - state.spawn(async move { inner.set(&key, &value).await }); |
195 | | - |
196 | | - Ok(()) |
197 | | - } |
198 | | - |
199 | | - async fn delete(&self, key: &str) -> Result<(), Error> { |
200 | | - // Update the cache and spawn a task to update the backing store asynchronously. |
201 | | - |
202 | | - let mut state = self.state.lock().await; |
203 | | - |
204 | | - state.cache.put(key.to_owned(), None); |
205 | | - |
206 | | - let inner = self.inner.clone(); |
207 | | - let key = key.to_owned(); |
208 | | - state.spawn(async move { inner.delete(&key).await }); |
209 | | - |
210 | | - Ok(()) |
211 | | - } |
212 | | - |
213 | | - async fn exists(&self, key: &str) -> Result<bool, Error> { |
214 | | - Ok(self.get(key).await?.is_some()) |
215 | | - } |
216 | | - |
217 | | - async fn get_keys(&self) -> Result<Vec<String>, Error> { |
218 | | - // Get the keys from the backing store, remove any which are `None` in the cache, and add any which are |
219 | | - // `Some` in the cache, returning the result. |
220 | | - // |
221 | | - // Note that we don't bother caching the result, since we expect this function won't be called more than |
222 | | - // once for a given store in normal usage, and maintaining consistency would be complicated. |
223 | | - |
224 | | - let mut state = self.state.lock().await; |
225 | | - |
226 | | - // Flush any outstanding writes first in case entries have been popped off the end of the LRU cache prior |
227 | | - // to their corresponding writes reaching the backing store. |
228 | | - state.flush().await?; |
229 | | - |
230 | | - Ok(self |
231 | | - .inner |
232 | | - .get_keys() |
233 | | - .await? |
234 | | - .into_iter() |
235 | | - .filter(|k| { |
236 | | - state |
237 | | - .cache |
238 | | - .peek(k) |
239 | | - .map(|v| v.as_ref().is_some()) |
240 | | - .unwrap_or(true) |
241 | | - }) |
242 | | - .chain( |
243 | | - state |
244 | | - .cache |
245 | | - .iter() |
246 | | - .filter_map(|(k, v)| v.as_ref().map(|_| k.to_owned())), |
247 | | - ) |
248 | | - .collect::<HashSet<_>>() |
249 | | - .into_iter() |
250 | | - .collect()) |
251 | | - } |
252 | | - |
253 | | - async fn get_many( |
254 | | - &self, |
255 | | - keys: Vec<String>, |
256 | | - ) -> anyhow::Result<Vec<(String, Option<Vec<u8>>)>, Error> { |
257 | | - let mut state = self.state.lock().await; |
258 | | - |
259 | | - // Flush any outstanding writes first in case entries have been popped off the end of the LRU cache prior |
260 | | - // to their corresponding writes reaching the backing store. |
261 | | - state.flush().await?; |
262 | | - |
263 | | - let mut found: Vec<(String, Option<Vec<u8>>)> = Vec::new(); |
264 | | - let mut not_found: Vec<String> = Vec::new(); |
265 | | - for key in keys { |
266 | | - match state.cache.get(key.as_str()) { |
267 | | - Some(Some(value)) => found.push((key, Some(value.clone()))), |
268 | | - _ => not_found.push(key), |
269 | | - } |
270 | | - } |
271 | | - |
272 | | - if !not_found.is_empty() { |
273 | | - let keys_and_values = self.inner.get_many(not_found).await?; |
274 | | - for (key, value) in keys_and_values { |
275 | | - found.push((key.clone(), value.clone())); |
276 | | - state.cache.put(key, value); |
277 | | - } |
278 | | - } |
279 | | - |
280 | | - Ok(found) |
281 | | - } |
282 | | - |
283 | | - async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> anyhow::Result<(), Error> { |
284 | | - let mut state = self.state.lock().await; |
285 | | - |
286 | | - for (key, value) in key_values.clone() { |
287 | | - state.cache.put(key, Some(value)); |
288 | | - } |
289 | | - |
290 | | - self.inner.set_many(key_values).await |
291 | | - } |
292 | | - |
293 | | - async fn delete_many(&self, keys: Vec<String>) -> anyhow::Result<(), Error> { |
294 | | - let mut state = self.state.lock().await; |
295 | | - |
296 | | - for key in keys.clone() { |
297 | | - state.cache.put(key, None); |
298 | | - } |
299 | | - |
300 | | - self.inner.delete_many(keys).await |
301 | | - } |
302 | | - |
303 | | - async fn increment(&self, key: String, delta: i64) -> anyhow::Result<i64, Error> { |
304 | | - let mut state = self.state.lock().await; |
305 | | - let counter = self.inner.increment(key.clone(), delta).await?; |
306 | | - state |
307 | | - .cache |
308 | | - .put(key, Some(i64::to_le_bytes(counter).to_vec())); |
309 | | - Ok(counter) |
310 | | - } |
311 | | - |
312 | | - async fn new_compare_and_swap( |
313 | | - &self, |
314 | | - bucket_rep: u32, |
315 | | - key: &str, |
316 | | - ) -> anyhow::Result<Arc<dyn Cas>, Error> { |
317 | | - let inner = self.inner.new_compare_and_swap(bucket_rep, key).await?; |
318 | | - Ok(Arc::new(CompareAndSwap { |
319 | | - bucket_rep, |
320 | | - state: self.state.clone(), |
321 | | - key: key.to_string(), |
322 | | - inner_cas: inner, |
323 | | - })) |
324 | | - } |
325 | | -} |
326 | | - |
327 | | -struct CompareAndSwap { |
328 | | - bucket_rep: u32, |
329 | | - key: String, |
330 | | - state: Arc<AsyncMutex<CachingStoreState>>, |
331 | | - inner_cas: Arc<dyn Cas>, |
332 | | -} |
333 | | - |
334 | | -#[async_trait] |
335 | | -impl Cas for CompareAndSwap { |
336 | | - async fn current(&self) -> anyhow::Result<Option<Vec<u8>>, Error> { |
337 | | - let mut state = self.state.lock().await; |
338 | | - state.flush().await?; |
339 | | - let res = self.inner_cas.current().await; |
340 | | - match res.clone() { |
341 | | - Ok(value) => { |
342 | | - state.cache.put(self.key.clone(), value.clone()); |
343 | | - state.flush().await?; |
344 | | - Ok(value) |
345 | | - } |
346 | | - Err(err) => Err(err), |
347 | | - }?; |
348 | | - res |
349 | | - } |
350 | | - |
351 | | - async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), SwapError> { |
352 | | - let mut state = self.state.lock().await; |
353 | | - state |
354 | | - .flush() |
355 | | - .await |
356 | | - .map_err(|_e| SwapError::Other("failed flushing".to_string()))?; |
357 | | - let res = self.inner_cas.swap(value.clone()).await; |
358 | | - match res { |
359 | | - Ok(()) => { |
360 | | - state.cache.put(self.key.clone(), Some(value)); |
361 | | - state |
362 | | - .flush() |
363 | | - .await |
364 | | - .map_err(|_e| SwapError::Other("failed flushing".to_string()))?; |
365 | | - Ok(()) |
366 | | - } |
367 | | - Err(err) => Err(err), |
368 | | - } |
369 | | - } |
370 | | - |
371 | | - async fn bucket_rep(&self) -> u32 { |
372 | | - self.bucket_rep |
373 | | - } |
374 | | - |
375 | | - async fn key(&self) -> String { |
376 | | - self.key.clone() |
377 | | - } |
378 | | -} |
0 commit comments