@@ -805,18 +805,18 @@ pub trait RocksStoreDetails: Send + Sync {
805805 fn log_enabled ( & self ) -> bool ;
806806}
807807
808+ pub type RocksStoreRWLoopFun = Box < dyn FnOnce ( ) -> Result < ( ) , CubeError > + Send + ' static > ;
809+
808810#[ derive( Debug , Clone ) ]
809811pub struct RocksStoreRWLoop {
810812 name : & ' static str ,
811- tx : tokio:: sync:: mpsc:: Sender < Box < dyn FnOnce ( ) -> Result < ( ) , CubeError > + Send + ' static > > ,
813+ tx : tokio:: sync:: mpsc:: Sender < RocksStoreRWLoopFun > ,
812814 _join_handle : Arc < AbortingJoinHandle < ( ) > > ,
813815}
814816
815817impl RocksStoreRWLoop {
816818 pub fn new ( name : & ' static str ) -> Self {
817- let ( tx, mut rx) = tokio:: sync:: mpsc:: channel :: <
818- Box < dyn FnOnce ( ) -> Result < ( ) , CubeError > + Send + ' static > ,
819- > ( 32_768 ) ;
819+ let ( tx, mut rx) = tokio:: sync:: mpsc:: channel :: < RocksStoreRWLoopFun > ( 32_768 ) ;
820820
821821 let join_handle = cube_ext:: spawn_blocking ( move || loop {
822822 if let Some ( fun) = rx. blocking_recv ( ) {
@@ -843,6 +843,15 @@ impl RocksStoreRWLoop {
843843 }
844844 }
845845
846+ pub async fn schedule ( & self , fun : RocksStoreRWLoopFun ) -> Result < ( ) , CubeError > {
847+ self . tx . send ( fun) . await . map_err ( |err| {
848+ CubeError :: user ( format ! (
849+ "Failed to schedule task to RWLoop ({}), error: {}" ,
850+ self . name, err
851+ ) )
852+ } )
853+ }
854+
846855 pub fn get_name ( & self ) -> & ' static str {
847856 self . name
848857 }
@@ -1034,8 +1043,7 @@ impl RocksStore {
10341043 let ( tx, rx) = oneshot:: channel :: < Result < ( R , Vec < MetaStoreEvent > ) , CubeError > > ( ) ;
10351044
10361045 let res = rw_loop
1037- . tx
1038- . send ( Box :: new ( move || {
1046+ . schedule ( Box :: new ( move || {
10391047 let db_span = warn_long ( & span_name, Duration :: from_millis ( 100 ) ) ;
10401048
10411049 let mut batch = BatchPipe :: new ( db_to_send. as_ref ( ) ) ;
@@ -1367,7 +1375,7 @@ impl RocksStore {
13671375 let store_name = self . details . get_name ( ) ;
13681376 let span_name = format ! ( "{}({}) read operation: {}" , store_name, loop_name, op_name) ;
13691377
1370- let res = rw_loop. tx . send ( Box :: new ( move || {
1378+ let res = rw_loop. schedule ( Box :: new ( move || {
13711379 let db_span = warn_long ( & span_name, Duration :: from_millis ( 100 ) ) ;
13721380
13731381 let snapshot = db_to_send. snapshot ( ) ;
0 commit comments