@@ -875,7 +875,11 @@ impl KVStore for TestStore {
875
875
let secondary_namespace = secondary_namespace. to_string ( ) ;
876
876
let key = key. to_string ( ) ;
877
877
let inner = Arc :: clone ( & self . inner ) ;
878
- Box :: pin ( async move { inner. read_internal ( & primary_namespace, & secondary_namespace, & key) } )
878
+ let fut =
879
+ Box :: pin (
880
+ async move { inner. read_internal ( & primary_namespace, & secondary_namespace, & key) } ,
881
+ ) ;
882
+ Box :: pin ( async move { TestStoreFuture :: new ( Box :: pin ( fut) ) . await } )
879
883
}
880
884
fn write (
881
885
& self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
@@ -884,9 +888,10 @@ impl KVStore for TestStore {
884
888
let secondary_namespace = secondary_namespace. to_string ( ) ;
885
889
let key = key. to_string ( ) ;
886
890
let inner = Arc :: clone ( & self . inner ) ;
887
- Box :: pin ( async move {
891
+ let fut = Box :: pin ( async move {
888
892
inner. write_internal ( & primary_namespace, & secondary_namespace, & key, buf)
889
- } )
893
+ } ) ;
894
+ Box :: pin ( async move { TestStoreFuture :: new ( Box :: pin ( fut) ) . await } )
890
895
}
891
896
fn remove (
892
897
& self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
@@ -895,17 +900,20 @@ impl KVStore for TestStore {
895
900
let secondary_namespace = secondary_namespace. to_string ( ) ;
896
901
let key = key. to_string ( ) ;
897
902
let inner = Arc :: clone ( & self . inner ) ;
898
- Box :: pin ( async move {
903
+ let fut = Box :: pin ( async move {
899
904
inner. remove_internal ( & primary_namespace, & secondary_namespace, & key, lazy)
900
- } )
905
+ } ) ;
906
+ Box :: pin ( async move { TestStoreFuture :: new ( Box :: pin ( fut) ) . await } )
901
907
}
902
908
fn list (
903
909
& self , primary_namespace : & str , secondary_namespace : & str ,
904
910
) -> Pin < Box < dyn Future < Output = Result < Vec < String > , io:: Error > > + ' static + Send > > {
905
911
let primary_namespace = primary_namespace. to_string ( ) ;
906
912
let secondary_namespace = secondary_namespace. to_string ( ) ;
907
913
let inner = Arc :: clone ( & self . inner ) ;
908
- Box :: pin ( async move { inner. list_internal ( & primary_namespace, & secondary_namespace) } )
914
+ let fut =
915
+ Box :: pin ( async move { inner. list_internal ( & primary_namespace, & secondary_namespace) } ) ;
916
+ Box :: pin ( async move { TestStoreFuture :: new ( Box :: pin ( fut) ) . await } )
909
917
}
910
918
}
911
919
@@ -933,6 +941,37 @@ impl KVStoreSync for TestStore {
933
941
}
934
942
}
935
943
944
+ // A `Future` wrapper that returns `Pending` once before actually polling the (likely sync) future.
945
+ pub ( crate ) struct TestStoreFuture < F : Future < Output = Result < R , io:: Error > > + Unpin , R > {
946
+ future : Mutex < Option < F > > ,
947
+ first_poll : AtomicBool ,
948
+ }
949
+
950
+ impl < F : Future < Output = Result < R , io:: Error > > + Unpin , R > TestStoreFuture < F , R > {
951
+ fn new ( fut : F ) -> Self {
952
+ let future = Mutex :: new ( Some ( fut) ) ;
953
+ let first_poll = AtomicBool :: new ( true ) ;
954
+ Self { future, first_poll }
955
+ }
956
+ }
957
+
958
+ impl < F : Future < Output = Result < R , io:: Error > > + Unpin , R > Future for TestStoreFuture < F , R > {
959
+ type Output = Result < R , io:: Error > ;
960
+ fn poll (
961
+ self : Pin < & mut Self > , cx : & mut core:: task:: Context < ' _ > ,
962
+ ) -> core:: task:: Poll < Self :: Output > {
963
+ if self . first_poll . swap ( false , Ordering :: Relaxed ) {
964
+ core:: task:: Poll :: Pending
965
+ } else {
966
+ if let Some ( mut fut) = self . future . lock ( ) . unwrap ( ) . take ( ) {
967
+ Pin :: new ( & mut fut) . poll ( cx)
968
+ } else {
969
+ unreachable ! ( "We should never poll more than twice" ) ;
970
+ }
971
+ }
972
+ }
973
+ }
974
+
936
975
struct TestStoreInner {
937
976
persisted_bytes : Mutex < HashMap < String , HashMap < String , Vec < u8 > > > > ,
938
977
read_only : bool ,
0 commit comments