1
+ use super :: Cas ;
1
2
use anyhow:: { Context , Result } ;
2
3
use spin_core:: { async_trait, wasmtime:: component:: Resource } ;
3
4
use spin_resource_table:: Table ;
4
5
use spin_world:: v2:: key_value;
5
6
use spin_world:: wasi:: keyvalue as wasi_keyvalue;
6
7
use std:: { collections:: HashSet , sync:: Arc } ;
7
8
use tracing:: { instrument, Level } ;
8
- use super :: Cas ;
9
9
10
10
const DEFAULT_STORE_TABLE_CAPACITY : u32 = 256 ;
11
11
@@ -32,15 +32,14 @@ pub trait Store: Sync + Send {
32
32
async fn delete ( & self , key : & str ) -> Result < ( ) , Error > ;
33
33
async fn exists ( & self , key : & str ) -> Result < bool , Error > ;
34
34
async fn get_keys ( & self ) -> Result < Vec < String > , Error > ;
35
- async fn get_many ( & self , keys : Vec < String > ) -> Result < Vec < Option < ( String , Vec < u8 > ) > > , Error > ;
35
+ async fn get_many ( & self , keys : Vec < String > ) -> Result < Vec < ( String , Option < Vec < u8 > > ) > , Error > ;
36
36
async fn set_many ( & self , key_values : Vec < ( String , Vec < u8 > ) > ) -> Result < ( ) , Error > ;
37
37
async fn delete_many ( & self , keys : Vec < String > ) -> Result < ( ) , Error > ;
38
38
async fn increment ( & self , key : String , delta : i64 ) -> Result < i64 , Error > ;
39
- async fn new_compare_and_swap ( & self , key : & str ) -> Result < Arc < dyn Cas > , Error > ;
39
+ async fn new_compare_and_swap ( & self , bucket_rep : u32 , key : & str )
40
+ -> Result < Arc < dyn Cas > , Error > ;
40
41
}
41
42
42
-
43
-
44
43
pub struct KeyValueDispatch {
45
44
allowed_stores : HashSet < String > ,
46
45
manager : Arc < dyn StoreManager > ,
@@ -282,7 +281,7 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch {
282
281
& mut self ,
283
282
bucket : Resource < wasi_keyvalue:: batch:: Bucket > ,
284
283
keys : Vec < String > ,
285
- ) -> std:: result:: Result < Vec < Option < ( String , Vec < u8 > ) > > , wasi_keyvalue:: store:: Error > {
284
+ ) -> std:: result:: Result < Vec < ( String , Option < Vec < u8 > > ) > , wasi_keyvalue:: store:: Error > {
286
285
let store = self . get_store_wasi ( bucket) ?;
287
286
store
288
287
. get_many ( keys. iter ( ) . map ( |k| k. to_string ( ) ) . collect ( ) )
@@ -321,11 +320,20 @@ impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {
321
320
bucket : Resource < wasi_keyvalue:: atomics:: Bucket > ,
322
321
key : String ,
323
322
) -> Result < Resource < wasi_keyvalue:: atomics:: Cas > , wasi_keyvalue:: store:: Error > {
323
+ let bucket_rep = bucket. rep ( ) ;
324
+ let bucket: Resource < Bucket > = Resource :: new_own ( bucket_rep) ;
324
325
let store = self . get_store_wasi ( bucket) ?;
325
- let cas = store. new_compare_and_swap ( & key) . await . map_err ( to_wasi_err) ?;
326
+ let cas = store
327
+ . new_compare_and_swap ( bucket_rep, & key)
328
+ . await
329
+ . map_err ( to_wasi_err) ?;
326
330
self . compare_and_swaps
327
331
. push ( cas)
328
- . map_err ( |( ) | spin_world:: wasi:: keyvalue:: store:: Error :: Other ( "too many compare_and_swaps opened" . to_string ( ) ) )
332
+ . map_err ( |( ) | {
333
+ spin_world:: wasi:: keyvalue:: store:: Error :: Other (
334
+ "too many compare_and_swaps opened" . to_string ( ) ,
335
+ )
336
+ } )
329
337
. map ( Resource :: new_own)
330
338
}
331
339
@@ -361,15 +369,32 @@ impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
361
369
#[ instrument( name = "spin_key_value.swap" , skip( self , cas_res, value) , err( level = Level :: INFO ) , fields( otel. kind = "client" ) ) ]
362
370
async fn swap (
363
371
& mut self ,
364
- cas_res : Resource < wasi_keyvalue :: atomics:: Cas > ,
372
+ cas_res : Resource < atomics:: Cas > ,
365
373
value : Vec < u8 > ,
366
- ) -> Result < std:: result:: Result < ( ) , wasi_keyvalue:: atomics:: CasError > > {
374
+ ) -> Result < std:: result:: Result < ( ) , CasError > > {
375
+ let cas_rep = cas_res. rep ( ) ;
367
376
let cas = self
368
- . get_cas ( cas_res)
369
- . map_err ( |e| wasi_keyvalue:: atomics:: CasError :: StoreError ( wasi_keyvalue:: atomics:: Error :: Other ( e. to_string ( ) ) ) ) ?;
370
- Ok ( cas. swap ( value)
371
- . await
372
- . map_err ( |e| wasi_keyvalue:: atomics:: CasError :: StoreError ( wasi_keyvalue:: atomics:: Error :: Other ( e. to_string ( ) ) ) ) )
377
+ . get_cas ( Resource :: < Bucket > :: new_own ( cas_rep) )
378
+ . map_err ( |e| CasError :: StoreError ( atomics:: Error :: Other ( e. to_string ( ) ) ) ) ?;
379
+
380
+ match cas. swap ( value) . await {
381
+ Ok ( cas) => Ok ( Ok ( ( ) ) ) ,
382
+ Err ( err) => {
383
+ if err. to_string ( ) . contains ( "CAS_ERROR" ) {
384
+ let bucket = Resource :: new_own ( cas. bucket_rep ( ) . await ) ;
385
+ let new_cas = self . new ( bucket, cas. key ( ) . await ) . await ?;
386
+ let new_cas_rep = new_cas. rep ( ) ;
387
+ self . current ( Resource :: new_own ( new_cas_rep) ) . await ?;
388
+ Err ( anyhow:: Error :: new ( CasError :: CasFailed ( Resource :: new_own (
389
+ new_cas_rep,
390
+ ) ) ) )
391
+ } else {
392
+ Err ( anyhow:: Error :: new ( CasError :: StoreError (
393
+ atomics:: Error :: Other ( err. to_string ( ) ) ,
394
+ ) ) )
395
+ }
396
+ }
397
+ }
373
398
}
374
399
}
375
400
@@ -379,6 +404,8 @@ pub fn log_error(err: impl std::fmt::Debug) -> Error {
379
404
}
380
405
381
406
use spin_world:: v1:: key_value:: Error as LegacyError ;
407
+ use spin_world:: wasi:: keyvalue:: atomics;
408
+ use spin_world:: wasi:: keyvalue:: atomics:: { CasError , HostCas } ;
382
409
383
410
fn to_legacy_error ( value : key_value:: Error ) -> LegacyError {
384
411
match value {
0 commit comments