Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion rust/cubestore/cubestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#![feature(hash_set_entry)]
#![feature(is_sorted)]
#![feature(result_flattening)]
#![feature(extract_if)]
// #![feature(trace_macros)]

// trace_macros!(true);
Expand Down
46 changes: 21 additions & 25 deletions rust/cubestore/cubestore/src/metastore/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,18 @@ use tokio::sync::Notify;

#[async_trait]
pub trait MetastoreListener: Send + Sync {
async fn wait_for_event(
&self,
event_fn: Box<dyn Fn(MetaStoreEvent) -> bool + Send + Sync>,
) -> Result<(), CubeError>;
async fn wait_for_event(&self, event_fn: MetastoreListenerWaitFun) -> Result<(), CubeError>;
}

pub type MetastoreListenerWaitFun = Box<dyn Fn(&MetaStoreEvent) -> bool + Send + Sync>;
pub struct MetastoreListenerImpl {
event_receiver: Mutex<Receiver<MetaStoreEvent>>,
wait_fns: Mutex<
Vec<(
Arc<Notify>,
Box<dyn Fn(MetaStoreEvent) -> bool + Send + Sync>,
)>,
>,
wait_fns: Mutex<Vec<(Arc<Notify>, MetastoreListenerWaitFun)>>,
}

#[async_trait]
impl MetastoreListener for MetastoreListenerImpl {
async fn wait_for_event(
&self,
event_fn: Box<dyn Fn(MetaStoreEvent) -> bool + Send + Sync>,
) -> Result<(), CubeError> {
async fn wait_for_event(&self, event_fn: MetastoreListenerWaitFun) -> Result<(), CubeError> {
let notify = Arc::new(Notify::new());
self.wait_fns.lock().await.push((notify.clone(), event_fn));
notify.notified().await;
Expand All @@ -42,10 +32,7 @@ pub struct MockMetastoreListener;

#[async_trait]
impl MetastoreListener for MockMetastoreListener {
async fn wait_for_event(
&self,
_event_fn: Box<dyn Fn(MetaStoreEvent) -> bool + Send + Sync>,
) -> Result<(), CubeError> {
async fn wait_for_event(&self, _event_fn: MetastoreListenerWaitFun) -> Result<(), CubeError> {
Ok(())
}
}
Expand All @@ -67,7 +54,7 @@ impl MetastoreListenerImpl {
pub async fn run_listener(&self) -> Result<(), CubeError> {
loop {
let event = self.event_receiver.lock().await.recv().await?;
let res = self.process_event(event.clone()).await;
let res = self.process_event(&event).await;
if let Err(e) = res {
error!("Error processing event {:?}: {}", event, e);
}
Expand All @@ -80,7 +67,7 @@ impl MetastoreListenerImpl {
) -> Result<(), CubeError> {
loop {
let event = self.event_receiver.lock().await.recv().await?;
let res = self.process_event(event.clone()).await;
let res = self.process_event(&event).await;
if let Err(e) = res {
error!("Error processing event {:?}: {}", event, e);
}
Expand All @@ -90,13 +77,22 @@ impl MetastoreListenerImpl {
}
}

async fn process_event(&self, event: MetaStoreEvent) -> Result<(), CubeError> {
async fn process_event(&self, event: &MetaStoreEvent) -> Result<(), CubeError> {
let mut wait_fns = self.wait_fns.lock().await;
let to_notify = wait_fns
.extract_if(|(_, wait_fn)| wait_fn(event.clone()))
.collect::<Vec<_>>();
let mut to_notify = Vec::new();

wait_fns.retain(|(notify, wait_fn)| {
if wait_fn(event) {
to_notify.push(notify.clone());
false
} else {
true
}
});

drop(wait_fns);

for (notify, _) in to_notify {
for notify in to_notify {
notify.notify_waiters();
}

Expand Down
5 changes: 5 additions & 0 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4954,6 +4954,11 @@ mod tests {
assert_eq!(format_table_value!(s, name, String), "foo");
}

#[test]
fn test_structures_size() {
assert_eq!(std::mem::size_of::<MetaStoreEvent>(), 680);
}

#[tokio::test]
async fn schema_test() {
let config = Config::test("schema_test");
Expand Down
3 changes: 3 additions & 0 deletions rust/cubestore/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2815,6 +2815,9 @@ mod tests {

println!("All partitions: {:#?}", partitions);

// TODO API to wait for all jobs to be completed and all events processed
Delay::new(Duration::from_millis(500)).await;

let plans = service
.plan_query("SELECT sum(num) from foo.numbers where num = 50")
.await
Expand Down
Loading