@@ -5,6 +5,7 @@ use spin_world::v2::key_value;
55use spin_world:: wasi:: keyvalue as wasi_keyvalue;
66use std:: { collections:: HashSet , sync:: Arc } ;
77use tracing:: { instrument, Level } ;
8+ use super :: Cas ;
89
910const DEFAULT_STORE_TABLE_CAPACITY : u32 = 256 ;
1011
@@ -31,12 +32,20 @@ pub trait Store: Sync + Send {
3132 async fn delete ( & self , key : & str ) -> Result < ( ) , Error > ;
3233 async fn exists ( & self , key : & str ) -> Result < bool , Error > ;
3334 async fn get_keys ( & self ) -> Result < Vec < String > , Error > ;
35+ async fn get_many ( & self , keys : Vec < String > ) -> Result < Vec < Option < ( String , Vec < u8 > ) > > , Error > ;
36+ async fn set_many ( & self , key_values : Vec < ( String , Vec < u8 > ) > ) -> Result < ( ) , Error > ;
37+ async fn delete_many ( & self , keys : Vec < String > ) -> Result < ( ) , Error > ;
38+ async fn increment ( & self , key : String , delta : i64 ) -> Result < i64 , Error > ;
39+ async fn new_compare_and_swap ( & self , key : & str ) -> Result < Arc < dyn Cas > , Error > ;
3440}
3541
42+
43+
3644pub struct KeyValueDispatch {
3745 allowed_stores : HashSet < String > ,
3846 manager : Arc < dyn StoreManager > ,
3947 stores : Table < Arc < dyn Store > > ,
48+ compare_and_swaps : Table < Arc < dyn Cas > > ,
4049}
4150
4251impl KeyValueDispatch {
@@ -53,13 +62,20 @@ impl KeyValueDispatch {
5362 allowed_stores,
5463 manager,
5564 stores : Table :: new ( capacity) ,
65+ compare_and_swaps : Table :: new ( capacity) ,
5666 }
5767 }
5868
5969 pub fn get_store < T : ' static > ( & self , store : Resource < T > ) -> anyhow:: Result < & Arc < dyn Store > > {
6070 self . stores . get ( store. rep ( ) ) . context ( "invalid store" )
6171 }
6272
73+ pub fn get_cas < T : ' static > ( & self , cas : Resource < T > ) -> Result < & Arc < dyn Cas > > {
74+ self . compare_and_swaps
75+ . get ( cas. rep ( ) )
76+ . context ( "invalid compare and swap" )
77+ }
78+
6379 pub fn allowed_stores ( & self ) -> & HashSet < String > {
6480 & self . allowed_stores
6581 }
@@ -72,6 +88,17 @@ impl KeyValueDispatch {
7288 . get ( store. rep ( ) )
7389 . ok_or ( wasi_keyvalue:: store:: Error :: NoSuchStore )
7490 }
91+
92+ pub fn get_cas_wasi < T : ' static > (
93+ & self ,
94+ cas : Resource < T > ,
95+ ) -> Result < & Arc < dyn Cas > , wasi_keyvalue:: atomics:: Error > {
96+ self . compare_and_swaps
97+ . get ( cas. rep ( ) )
98+ . ok_or ( wasi_keyvalue:: atomics:: Error :: Other (
99+ "compare and swap not found" . to_string ( ) ,
100+ ) )
101+ }
75102}
76103
77104#[ async_trait]
@@ -231,11 +258,9 @@ impl wasi_keyvalue::store::HostBucket for KeyValueDispatch {
231258 cursor : Option < String > ,
232259 ) -> Result < wasi_keyvalue:: store:: KeyResponse , wasi_keyvalue:: store:: Error > {
233260 match cursor {
234- Some ( _) => {
235- Err ( wasi_keyvalue:: store:: Error :: Other (
236- "list_keys: cursor not supported" . to_owned ( ) ,
237- ) )
238- } ,
261+ Some ( _) => Err ( wasi_keyvalue:: store:: Error :: Other (
262+ "list_keys: cursor not supported" . to_owned ( ) ,
263+ ) ) ,
239264 None => {
240265 let store = self . get_store_wasi ( self_) ?;
241266 let keys = store. get_keys ( ) . await . map_err ( to_wasi_err) ?;
@@ -250,6 +275,104 @@ impl wasi_keyvalue::store::HostBucket for KeyValueDispatch {
250275 }
251276}
252277
278+ #[ async_trait]
279+ impl wasi_keyvalue:: batch:: Host for KeyValueDispatch {
280+ #[ instrument( name = "spin_key_value.get_many" , skip( self , bucket, keys) , err( level = Level :: INFO ) , fields( otel. kind = "client" ) ) ]
281+ async fn get_many (
282+ & mut self ,
283+ bucket : Resource < wasi_keyvalue:: batch:: Bucket > ,
284+ keys : Vec < String > ,
285+ ) -> std:: result:: Result < Vec < Option < ( String , Vec < u8 > ) > > , wasi_keyvalue:: store:: Error > {
286+ let store = self . get_store_wasi ( bucket) ?;
287+ store
288+ . get_many ( keys. iter ( ) . map ( |k| k. to_string ( ) ) . collect ( ) )
289+ . await
290+ . map_err ( to_wasi_err)
291+ }
292+
293+ #[ instrument( name = "spin_key_value.set_many" , skip( self , bucket, key_values) , err( level = Level :: INFO ) , fields( otel. kind = "client" ) ) ]
294+ async fn set_many (
295+ & mut self ,
296+ bucket : Resource < wasi_keyvalue:: batch:: Bucket > ,
297+ key_values : Vec < ( String , Vec < u8 > ) > ,
298+ ) -> std:: result:: Result < ( ) , wasi_keyvalue:: store:: Error > {
299+ let store = self . get_store_wasi ( bucket) ?;
300+ store. set_many ( key_values) . await . map_err ( to_wasi_err)
301+ }
302+
303+ #[ instrument( name = "spin_key_value.get_many" , skip( self , bucket, keys) , err( level = Level :: INFO ) , fields( otel. kind = "client" ) ) ]
304+ async fn delete_many (
305+ & mut self ,
306+ bucket : Resource < wasi_keyvalue:: batch:: Bucket > ,
307+ keys : Vec < String > ,
308+ ) -> std:: result:: Result < ( ) , wasi_keyvalue:: store:: Error > {
309+ let store = self . get_store_wasi ( bucket) ?;
310+ store
311+ . delete_many ( keys. iter ( ) . map ( |k| k. to_string ( ) ) . collect ( ) )
312+ . await
313+ . map_err ( to_wasi_err)
314+ }
315+ }
316+
317+ #[ async_trait]
318+ impl wasi_keyvalue:: atomics:: HostCas for KeyValueDispatch {
319+ async fn new (
320+ & mut self ,
321+ bucket : Resource < wasi_keyvalue:: atomics:: Bucket > ,
322+ key : String ,
323+ ) -> Result < Resource < wasi_keyvalue:: atomics:: Cas > , wasi_keyvalue:: store:: Error > {
324+ let store = self . get_store_wasi ( bucket) ?;
325+ let cas = store. new_compare_and_swap ( & key) . await . map_err ( to_wasi_err) ?;
326+ self . compare_and_swaps
327+ . push ( cas)
328+ . map_err ( |( ) | spin_world:: wasi:: keyvalue:: store:: Error :: Other ( "too many compare_and_swaps opened" . to_string ( ) ) )
329+ . map ( Resource :: new_own)
330+ }
331+
332+ async fn current (
333+ & mut self ,
334+ cas : Resource < wasi_keyvalue:: atomics:: Cas > ,
335+ ) -> Result < Option < Vec < u8 > > , wasi_keyvalue:: store:: Error > {
336+ let cas = self
337+ . get_cas ( cas)
338+ . map_err ( |e| wasi_keyvalue:: store:: Error :: Other ( e. to_string ( ) ) ) ?;
339+ cas. current ( ) . await . map_err ( to_wasi_err)
340+ }
341+
342+ async fn drop ( & mut self , rep : Resource < wasi_keyvalue:: atomics:: Cas > ) -> Result < ( ) > {
343+ self . compare_and_swaps . remove ( rep. rep ( ) ) ;
344+ Ok ( ( ) )
345+ }
346+ }
347+
348+ #[ async_trait]
349+ impl wasi_keyvalue:: atomics:: Host for KeyValueDispatch {
350+ #[ instrument( name = "spin_key_value.increment" , skip( self , bucket, key, delta) , err( level = Level :: INFO ) , fields( otel. kind = "client" ) ) ]
351+ async fn increment (
352+ & mut self ,
353+ bucket : Resource < wasi_keyvalue:: atomics:: Bucket > ,
354+ key : String ,
355+ delta : i64 ,
356+ ) -> Result < i64 , wasi_keyvalue:: store:: Error > {
357+ let store = self . get_store_wasi ( bucket) ?;
358+ store. increment ( key, delta) . await . map_err ( to_wasi_err)
359+ }
360+
361+ #[ instrument( name = "spin_key_value.swap" , skip( self , cas_res, value) , err( level = Level :: INFO ) , fields( otel. kind = "client" ) ) ]
362+ async fn swap (
363+ & mut self ,
364+ cas_res : Resource < wasi_keyvalue:: atomics:: Cas > ,
365+ value : Vec < u8 > ,
366+ ) -> Result < std:: result:: Result < ( ) , wasi_keyvalue:: atomics:: CasError > > {
367+ let cas = self
368+ . get_cas ( cas_res)
369+ . map_err ( |e| wasi_keyvalue:: atomics:: CasError :: StoreError ( wasi_keyvalue:: atomics:: Error :: Other ( e. to_string ( ) ) ) ) ?;
370+ Ok ( cas. swap ( value)
371+ . await
372+ . map_err ( |e| wasi_keyvalue:: atomics:: CasError :: StoreError ( wasi_keyvalue:: atomics:: Error :: Other ( e. to_string ( ) ) ) ) )
373+ }
374+ }
375+
253376pub fn log_error ( err : impl std:: fmt:: Debug ) -> Error {
254377 tracing:: warn!( "key-value error: {err:?}" ) ;
255378 Error :: Other ( format ! ( "{err:?}" ) )
0 commit comments