From 669e2bc9ffa978ad12e965536cd1a5c340ed0bec Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 21 Jul 2025 18:00:21 +0200 Subject: [PATCH] chore(cubestore): Metastore listner - remove large unexpected clone --- rust/cubestore/cubestore/src/lib.rs | 1 - .../cubestore/src/metastore/listener.rs | 46 +++++++++---------- rust/cubestore/cubestore/src/metastore/mod.rs | 5 ++ rust/cubestore/cubestore/src/sql/mod.rs | 3 ++ 4 files changed, 29 insertions(+), 26 deletions(-) diff --git a/rust/cubestore/cubestore/src/lib.rs b/rust/cubestore/cubestore/src/lib.rs index 05d24b86f0a14..2eeafc495cdf1 100644 --- a/rust/cubestore/cubestore/src/lib.rs +++ b/rust/cubestore/cubestore/src/lib.rs @@ -5,7 +5,6 @@ #![feature(hash_set_entry)] #![feature(is_sorted)] #![feature(result_flattening)] -#![feature(extract_if)] // #![feature(trace_macros)] // trace_macros!(true); diff --git a/rust/cubestore/cubestore/src/metastore/listener.rs b/rust/cubestore/cubestore/src/metastore/listener.rs index cd2c53afea888..0a6a9fcee899b 100644 --- a/rust/cubestore/cubestore/src/metastore/listener.rs +++ b/rust/cubestore/cubestore/src/metastore/listener.rs @@ -9,28 +9,18 @@ use tokio::sync::Notify; #[async_trait] pub trait MetastoreListener: Send + Sync { - async fn wait_for_event( - &self, - event_fn: Box bool + Send + Sync>, - ) -> Result<(), CubeError>; + async fn wait_for_event(&self, event_fn: MetastoreListenerWaitFun) -> Result<(), CubeError>; } +pub type MetastoreListenerWaitFun = Box bool + Send + Sync>; pub struct MetastoreListenerImpl { event_receiver: Mutex>, - wait_fns: Mutex< - Vec<( - Arc, - Box bool + Send + Sync>, - )>, - >, + wait_fns: Mutex, MetastoreListenerWaitFun)>>, } #[async_trait] impl MetastoreListener for MetastoreListenerImpl { - async fn wait_for_event( - &self, - event_fn: Box bool + Send + Sync>, - ) -> Result<(), CubeError> { + async fn wait_for_event(&self, event_fn: MetastoreListenerWaitFun) -> Result<(), CubeError> { let notify = Arc::new(Notify::new()); self.wait_fns.lock().await.push((notify.clone(), event_fn)); notify.notified().await; @@ -42,10 +32,7 @@ pub struct MockMetastoreListener; #[async_trait] impl MetastoreListener for MockMetastoreListener { - async fn wait_for_event( - &self, - _event_fn: Box bool + Send + Sync>, - ) -> Result<(), CubeError> { + async fn wait_for_event(&self, _event_fn: MetastoreListenerWaitFun) -> Result<(), CubeError> { Ok(()) } } @@ -67,7 +54,7 @@ impl MetastoreListenerImpl { pub async fn run_listener(&self) -> Result<(), CubeError> { loop { let event = self.event_receiver.lock().await.recv().await?; - let res = self.process_event(event.clone()).await; + let res = self.process_event(&event).await; if let Err(e) = res { error!("Error processing event {:?}: {}", event, e); } @@ -80,7 +67,7 @@ impl MetastoreListenerImpl { ) -> Result<(), CubeError> { loop { let event = self.event_receiver.lock().await.recv().await?; - let res = self.process_event(event.clone()).await; + let res = self.process_event(&event).await; if let Err(e) = res { error!("Error processing event {:?}: {}", event, e); } @@ -90,13 +77,22 @@ impl MetastoreListenerImpl { } } - async fn process_event(&self, event: MetaStoreEvent) -> Result<(), CubeError> { + async fn process_event(&self, event: &MetaStoreEvent) -> Result<(), CubeError> { let mut wait_fns = self.wait_fns.lock().await; - let to_notify = wait_fns - .extract_if(|(_, wait_fn)| wait_fn(event.clone())) - .collect::>(); + let mut to_notify = Vec::new(); + + wait_fns.retain(|(notify, wait_fn)| { + if wait_fn(event) { + to_notify.push(notify.clone()); + false + } else { + true + } + }); + + drop(wait_fns); - for (notify, _) in to_notify { + for notify in to_notify { notify.notify_waiters(); } diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 43fde1e356731..a94ff04a346e7 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -4954,6 +4954,11 @@ mod tests { assert_eq!(format_table_value!(s, name, String), "foo"); } + #[test] + fn test_structures_size() { + assert_eq!(std::mem::size_of::(), 680); + } + #[tokio::test] async fn schema_test() { let config = Config::test("schema_test"); diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 793cece76fab8..76c802c9d1119 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -2815,6 +2815,9 @@ mod tests { println!("All partitions: {:#?}", partitions); + // TODO API to wait for all jobs to be completed and all events processed + Delay::new(Duration::from_millis(500)).await; + let plans = service .plan_query("SELECT sum(num) from foo.numbers where num = 50") .await