@@ -9,28 +9,18 @@ use tokio::sync::Notify;
99
1010#[ async_trait]
1111pub trait MetastoreListener : Send + Sync {
12- async fn wait_for_event (
13- & self ,
14- event_fn : Box < dyn Fn ( MetaStoreEvent ) -> bool + Send + Sync > ,
15- ) -> Result < ( ) , CubeError > ;
12+ async fn wait_for_event ( & self , event_fn : MetastoreListenerWaitFun ) -> Result < ( ) , CubeError > ;
1613}
1714
15+ pub type MetastoreListenerWaitFun = Box < dyn Fn ( & MetaStoreEvent ) -> bool + Send + Sync > ;
1816pub struct MetastoreListenerImpl {
1917 event_receiver : Mutex < Receiver < MetaStoreEvent > > ,
20- wait_fns : Mutex <
21- Vec < (
22- Arc < Notify > ,
23- Box < dyn Fn ( MetaStoreEvent ) -> bool + Send + Sync > ,
24- ) > ,
25- > ,
18+ wait_fns : Mutex < Vec < ( Arc < Notify > , MetastoreListenerWaitFun ) > > ,
2619}
2720
2821#[ async_trait]
2922impl MetastoreListener for MetastoreListenerImpl {
30- async fn wait_for_event (
31- & self ,
32- event_fn : Box < dyn Fn ( MetaStoreEvent ) -> bool + Send + Sync > ,
33- ) -> Result < ( ) , CubeError > {
23+ async fn wait_for_event ( & self , event_fn : MetastoreListenerWaitFun ) -> Result < ( ) , CubeError > {
3424 let notify = Arc :: new ( Notify :: new ( ) ) ;
3525 self . wait_fns . lock ( ) . await . push ( ( notify. clone ( ) , event_fn) ) ;
3626 notify. notified ( ) . await ;
@@ -42,10 +32,7 @@ pub struct MockMetastoreListener;
4232
4333#[ async_trait]
4434impl MetastoreListener for MockMetastoreListener {
45- async fn wait_for_event (
46- & self ,
47- _event_fn : Box < dyn Fn ( MetaStoreEvent ) -> bool + Send + Sync > ,
48- ) -> Result < ( ) , CubeError > {
35+ async fn wait_for_event ( & self , _event_fn : MetastoreListenerWaitFun ) -> Result < ( ) , CubeError > {
4936 Ok ( ( ) )
5037 }
5138}
@@ -67,7 +54,7 @@ impl MetastoreListenerImpl {
6754 pub async fn run_listener ( & self ) -> Result < ( ) , CubeError > {
6855 loop {
6956 let event = self . event_receiver . lock ( ) . await . recv ( ) . await ?;
70- let res = self . process_event ( event. clone ( ) ) . await ;
57+ let res = self . process_event ( & event) . await ;
7158 if let Err ( e) = res {
7259 error ! ( "Error processing event {:?}: {}" , event, e) ;
7360 }
@@ -80,7 +67,7 @@ impl MetastoreListenerImpl {
8067 ) -> Result < ( ) , CubeError > {
8168 loop {
8269 let event = self . event_receiver . lock ( ) . await . recv ( ) . await ?;
83- let res = self . process_event ( event. clone ( ) ) . await ;
70+ let res = self . process_event ( & event) . await ;
8471 if let Err ( e) = res {
8572 error ! ( "Error processing event {:?}: {}" , event, e) ;
8673 }
@@ -90,13 +77,22 @@ impl MetastoreListenerImpl {
9077 }
9178 }
9279
93- async fn process_event ( & self , event : MetaStoreEvent ) -> Result < ( ) , CubeError > {
80+ async fn process_event ( & self , event : & MetaStoreEvent ) -> Result < ( ) , CubeError > {
9481 let mut wait_fns = self . wait_fns . lock ( ) . await ;
95- let to_notify = wait_fns
96- . extract_if ( |( _, wait_fn) | wait_fn ( event. clone ( ) ) )
97- . collect :: < Vec < _ > > ( ) ;
82+ let mut to_notify = Vec :: new ( ) ;
83+
84+ wait_fns. retain ( |( notify, wait_fn) | {
85+ if wait_fn ( event) {
86+ to_notify. push ( notify. clone ( ) ) ;
87+ false
88+ } else {
89+ true
90+ }
91+ } ) ;
92+
93+ drop ( wait_fns) ;
9894
99- for ( notify, _ ) in to_notify {
95+ for notify in to_notify {
10096 notify. notify_waiters ( ) ;
10197 }
10298
0 commit comments