Skip to content

Commit 1f2db32

Browse files
authored
feat(cubestore): RocksStore - panic protection for broken rw loop (#6293)
1 parent 8c490f4 commit 1f2db32

File tree

2 files changed

+129
-6
lines changed

2 files changed

+129
-6
lines changed

rust/cubestore/cubestore/src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use log::SetLoggerError;
2727
use parquet::errors::ParquetError;
2828
use serde_derive::{Deserialize, Serialize};
2929
use sqlparser::parser::ParserError;
30+
use std::any::Any;
3031
use std::backtrace::Backtrace;
3132
use std::fmt;
3233
use std::fmt::Display;
@@ -155,6 +156,16 @@ impl CubeError {
155156
cause: CubeErrorCauseType::Internal,
156157
}
157158
}
159+
160+
pub fn from_panic_payload(payload: Box<dyn Any + Send>) -> Self {
161+
if let Some(reason) = payload.downcast_ref::<&str>() {
162+
CubeError::panic(format!("Reason: {}", reason))
163+
} else if let Some(reason) = payload.downcast_ref::<String>() {
164+
CubeError::panic(format!("Reason: {}", reason))
165+
} else {
166+
CubeError::panic("Without reason".to_string())
167+
}
168+
}
158169
}
159170

160171
impl fmt::Display for CubeError {

rust/cubestore/cubestore/src/metastore/rocks_store.rs

Lines changed: 118 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -563,8 +563,16 @@ impl RocksStore {
563563

564564
let join_handle = cube_ext::spawn_blocking(move || loop {
565565
if let Some(fun) = rw_loop_rx.blocking_recv() {
566-
if let Err(e) = fun() {
567-
log::error!("Error during read write loop execution: {}", e);
566+
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(fun)) {
567+
Err(panic_payload) => {
568+
let restore_error = CubeError::from_panic_payload(panic_payload);
569+
log::error!("Panic during read write loop execution: {}", restore_error);
570+
}
571+
Ok(res) => {
572+
if let Err(e) = res {
573+
log::error!("Error during read write loop execution: {}", e);
574+
}
575+
}
568576
}
569577
} else {
570578
return;
@@ -710,10 +718,22 @@ impl RocksStore {
710718
Ok(())
711719
}));
712720
if let Err(e) = res.await {
713-
log::error!("[{}] Error during read write loop send: {}", store_name, e);
721+
log::error!(
722+
"[{}] Error during scheduling write task in loop: {}",
723+
store_name,
724+
e
725+
);
726+
727+
return Err(CubeError::internal(format!(
728+
"Error during scheduling write task in loop: {}",
729+
e
730+
)));
714731
}
715732

716-
let (spawn_res, events) = rx.await??;
733+
let res = rx.await.map_err(|err| {
734+
CubeError::internal(format!("Unable to receive result for write task: {}", err))
735+
})?;
736+
let (spawn_res, events) = res?;
717737

718738
self.write_notify.notify_waiters();
719739

@@ -888,10 +908,17 @@ impl RocksStore {
888908
Ok(())
889909
}));
890910
if let Err(e) = res.await {
891-
log::error!("Error during read write loop send: {}", e);
911+
log::error!("Error during scheduling read task in loop: {}", e);
912+
913+
return Err(CubeError::internal(format!(
914+
"Error during scheduling read task in loop: {}",
915+
e
916+
)));
892917
}
893918

894-
rx.await?
919+
rx.await.map_err(|err| {
920+
CubeError::internal(format!("Unable to receive result for read task: {}", err))
921+
})?
895922
}
896923

897924
pub async fn read_operation_out_of_queue<F, R>(&self, f: F) -> Result<R, CubeError>
@@ -977,3 +1004,88 @@ impl RocksStore {
9771004
Ok(())
9781005
}
9791006
}
1007+
1008+
#[cfg(test)]
1009+
mod tests {
1010+
use super::*;
1011+
use crate::config::Config;
1012+
use crate::metastore::{BaseRocksStoreFs, RocksMetaStoreDetails};
1013+
use crate::remotefs::LocalDirRemoteFs;
1014+
use std::{env, fs};
1015+
1016+
#[tokio::test]
1017+
async fn test_loop_panic() -> Result<(), CubeError> {
1018+
let config = Config::test("test_loop_panic");
1019+
let store_path = env::current_dir().unwrap().join("test_loop_panic-local");
1020+
let remote_store_path = env::current_dir().unwrap().join("test_loop_panic-remote");
1021+
let _ = fs::remove_dir_all(store_path.clone());
1022+
let _ = fs::remove_dir_all(remote_store_path.clone());
1023+
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
1024+
1025+
let details = Arc::new(RocksMetaStoreDetails {});
1026+
let rocks_store = RocksStore::new(
1027+
store_path.join("metastore").as_path(),
1028+
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
1029+
config.config_obj(),
1030+
details,
1031+
)?;
1032+
1033+
// read operation
1034+
{
1035+
let r = rocks_store
1036+
.read_operation(|_| -> Result<(), CubeError> {
1037+
panic!("panic - task 1");
1038+
})
1039+
.await;
1040+
assert_eq!(
1041+
r.err().expect("must be error").message,
1042+
"Unable to receive result for read task: channel closed".to_string()
1043+
);
1044+
1045+
let r = rocks_store
1046+
.read_operation(|_| -> Result<(), CubeError> {
1047+
Err(CubeError::user("error - task 3".to_string()))
1048+
})
1049+
.await;
1050+
assert_eq!(
1051+
r.err().expect("must be error").message,
1052+
"error - task 3".to_string()
1053+
);
1054+
}
1055+
1056+
// write operation
1057+
{
1058+
let r = rocks_store
1059+
.write_operation(|_, _| -> Result<(), CubeError> {
1060+
panic!("panic - task 1");
1061+
})
1062+
.await;
1063+
assert_eq!(
1064+
r.err().expect("must be error").message,
1065+
"Unable to receive result for write task: channel closed".to_string()
1066+
);
1067+
1068+
let r = rocks_store
1069+
.write_operation(|_, _| -> Result<(), CubeError> {
1070+
panic!("panic - task 2");
1071+
})
1072+
.await;
1073+
assert_eq!(
1074+
r.err().expect("must be error").message,
1075+
"Unable to receive result for write task: channel closed".to_string()
1076+
);
1077+
1078+
let r = rocks_store
1079+
.write_operation(|_, _| -> Result<(), CubeError> {
1080+
Err(CubeError::user("error - task 3".to_string()))
1081+
})
1082+
.await;
1083+
assert_eq!(
1084+
r.err().expect("must be error").message,
1085+
"error - task 3".to_string()
1086+
);
1087+
}
1088+
1089+
Ok(())
1090+
}
1091+
}

0 commit comments

Comments
 (0)