Skip to content

Commit 37b4a95

Browse files
authored
feat(cubestore): Support lazy initialization for CacheStore (#5933)
1 parent 972ccd7 commit 37b4a95

File tree

5 files changed

+216
-20
lines changed

5 files changed

+216
-20
lines changed

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,17 +141,16 @@ impl RocksCacheStore {
141141
Ok(Self::new_from_store(store))
142142
}
143143

144-
pub async fn wait_upload_loop(meta_store: Arc<Self>) {
145-
if !meta_store.store.config.upload_to_remote() {
144+
pub async fn wait_upload_loop(self: Arc<Self>) {
145+
if !self.store.config.upload_to_remote() {
146146
log::info!("Not running cachestore upload loop");
147147
return;
148148
}
149149

150-
let upload_interval = meta_store.store.config.meta_store_log_upload_interval();
151-
meta_store
152-
.upload_loop
150+
let upload_interval = self.store.config.meta_store_log_upload_interval();
151+
self.upload_loop
153152
.process(
154-
meta_store.clone(),
153+
self.clone(),
155154
async move |_| Ok(Delay::new(Duration::from_secs(upload_interval)).await),
156155
async move |m, _| m.store.run_upload().await,
157156
)
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
use crate::cachestore::{CacheItem, CacheStore, RocksCacheStore};
2+
use crate::config::ConfigObj;
3+
use crate::metastore::{IdRow, MetaStoreFs};
4+
use crate::CubeError;
5+
use async_trait::async_trait;
6+
use log::trace;
7+
use std::path::Path;
8+
use std::sync::Arc;
9+
use tokio::sync::watch::{Receiver, Sender};
10+
11+
pub enum LazyRocksCacheStoreState {
12+
FromRemote {
13+
path: String,
14+
metastore_fs: Arc<dyn MetaStoreFs>,
15+
config: Arc<dyn ConfigObj>,
16+
init_flag: Sender<bool>,
17+
},
18+
Closed {},
19+
Initialized {
20+
store: Arc<RocksCacheStore>,
21+
},
22+
}
23+
24+
pub struct LazyRocksCacheStore {
25+
init_signal: Option<Receiver<bool>>,
26+
state: tokio::sync::RwLock<LazyRocksCacheStoreState>,
27+
}
28+
29+
impl LazyRocksCacheStore {
30+
pub async fn load_from_dump(
31+
path: &Path,
32+
dump_path: &Path,
33+
metastore_fs: Arc<dyn MetaStoreFs>,
34+
config: Arc<dyn ConfigObj>,
35+
) -> Result<Arc<Self>, CubeError> {
36+
let store = RocksCacheStore::load_from_dump(path, dump_path, metastore_fs, config).await?;
37+
38+
Ok(Arc::new(Self {
39+
init_signal: None,
40+
state: tokio::sync::RwLock::new(LazyRocksCacheStoreState::Initialized { store }),
41+
}))
42+
}
43+
44+
pub async fn load_from_remote(
45+
path: &str,
46+
metastore_fs: Arc<dyn MetaStoreFs>,
47+
config: Arc<dyn ConfigObj>,
48+
) -> Result<Arc<Self>, CubeError> {
49+
let (init_flag, init_signal) = tokio::sync::watch::channel::<bool>(false);
50+
51+
Ok(Arc::new(Self {
52+
init_signal: Some(init_signal),
53+
state: tokio::sync::RwLock::new(LazyRocksCacheStoreState::FromRemote {
54+
path: path.to_string(),
55+
metastore_fs,
56+
config,
57+
init_flag,
58+
}),
59+
}))
60+
}
61+
62+
async fn init(&self) -> Result<Arc<RocksCacheStore>, CubeError> {
63+
{
64+
let guard = self.state.read().await;
65+
match &*guard {
66+
LazyRocksCacheStoreState::FromRemote { .. } => {}
67+
LazyRocksCacheStoreState::Closed { .. } => {
68+
return Err(CubeError::internal(
69+
"Unable to initialize Cache Store on lazy call, it was closed".to_string(),
70+
));
71+
}
72+
LazyRocksCacheStoreState::Initialized { store } => {
73+
return Ok(store.clone());
74+
}
75+
}
76+
}
77+
78+
let mut guard = self.state.write().await;
79+
match &*guard {
80+
LazyRocksCacheStoreState::FromRemote {
81+
path,
82+
metastore_fs,
83+
config,
84+
// receiver will be closed on drop
85+
init_flag: _,
86+
} => {
87+
let store =
88+
RocksCacheStore::load_from_remote(&path, metastore_fs.clone(), config.clone())
89+
.await?;
90+
91+
*guard = LazyRocksCacheStoreState::Initialized {
92+
store: store.clone(),
93+
};
94+
95+
Ok(store)
96+
}
97+
_ => Err(CubeError::internal(
98+
"Unable to initialize Cache Store on lazy call, unexpected state".to_string(),
99+
)),
100+
}
101+
}
102+
103+
pub async fn wait_upload_loop(&self) {
104+
if let Some(init_signal) = &self.init_signal {
105+
let _ = init_signal.clone().changed().await;
106+
}
107+
108+
let store = {
109+
let guard = self.state.read().await;
110+
if let LazyRocksCacheStoreState::Initialized { store } = &*guard {
111+
store.clone()
112+
} else {
113+
return ();
114+
}
115+
};
116+
117+
trace!("wait_upload_loop unblocked, Cache Store was initialized");
118+
119+
store.wait_upload_loop().await
120+
}
121+
122+
pub async fn stop_processing_loops(&self) {
123+
let store = {
124+
let mut guard = self.state.write().await;
125+
match &*guard {
126+
LazyRocksCacheStoreState::Closed { .. } => {
127+
return ();
128+
}
129+
LazyRocksCacheStoreState::FromRemote { .. } => {
130+
*guard = LazyRocksCacheStoreState::Closed {};
131+
132+
return ();
133+
}
134+
LazyRocksCacheStoreState::Initialized { store } => {
135+
let store_to_move = store.clone();
136+
137+
*guard = LazyRocksCacheStoreState::Closed {};
138+
139+
store_to_move
140+
}
141+
}
142+
};
143+
144+
trace!("stop_processing_loops unblocked, Cache Store was initialized");
145+
146+
store.stop_processing_loops().await
147+
}
148+
}
149+
150+
#[async_trait]
151+
impl CacheStore for LazyRocksCacheStore {
152+
async fn cache_all(&self) -> Result<Vec<IdRow<CacheItem>>, CubeError> {
153+
self.init().await?.cache_all().await
154+
}
155+
156+
async fn cache_set(
157+
&self,
158+
item: CacheItem,
159+
update_if_not_exists: bool,
160+
) -> Result<bool, CubeError> {
161+
self.init()
162+
.await?
163+
.cache_set(item, update_if_not_exists)
164+
.await
165+
}
166+
167+
async fn cache_truncate(&self) -> Result<(), CubeError> {
168+
self.init().await?.cache_truncate().await
169+
}
170+
171+
async fn cache_delete(&self, key: String) -> Result<(), CubeError> {
172+
self.init().await?.cache_delete(key).await
173+
}
174+
175+
async fn cache_get(&self, key: String) -> Result<Option<IdRow<CacheItem>>, CubeError> {
176+
self.init().await?.cache_get(key).await
177+
}
178+
179+
async fn cache_keys(&self, prefix: String) -> Result<Vec<IdRow<CacheItem>>, CubeError> {
180+
self.init().await?.cache_keys(prefix).await
181+
}
182+
183+
async fn cache_incr(&self, path: String) -> Result<IdRow<CacheItem>, CubeError> {
184+
self.init().await?.cache_incr(path).await
185+
}
186+
187+
async fn compaction(&self) -> Result<(), CubeError> {
188+
self.init().await?.compaction().await
189+
}
190+
}
191+
192+
crate::di_service!(LazyRocksCacheStore, [CacheStore]);
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
mod cache_item;
22
mod cache_rocksstore;
33
mod compaction;
4+
mod lazy;
45

56
pub use cache_item::CacheItem;
67
pub use cache_rocksstore::{
78
CacheStore, CacheStoreRpcClient, ClusterCacheStoreClient, RocksCacheStore,
89
};
10+
pub use lazy::LazyRocksCacheStore;

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
pub mod injection;
33
pub mod processing_loop;
44

5-
use crate::cachestore::{CacheStore, ClusterCacheStoreClient, RocksCacheStore};
5+
use crate::cachestore::{CacheStore, ClusterCacheStoreClient, LazyRocksCacheStore};
66
use crate::cluster::transport::{
77
ClusterTransport, ClusterTransportImpl, MetaStoreTransport, MetaStoreTransportImpl,
88
};
@@ -58,7 +58,7 @@ pub struct CubeServices {
5858
pub sql_service: Arc<dyn SqlService>,
5959
pub scheduler: Arc<SchedulerImpl>,
6060
pub rocks_meta_store: Option<Arc<RocksMetaStore>>,
61-
pub rocks_cache_store: Option<Arc<RocksCacheStore>>,
61+
pub rocks_cache_store: Option<Arc<LazyRocksCacheStore>>,
6262
pub meta_store: Arc<dyn MetaStore>,
6363
pub cluster: Arc<ClusterImpl>,
6464
}
@@ -104,13 +104,13 @@ impl CubeServices {
104104
if !self.cluster.is_select_worker() {
105105
let rocks_meta_store = self.rocks_meta_store.clone().unwrap();
106106
futures.push(cube_ext::spawn(async move {
107-
RocksMetaStore::wait_upload_loop(rocks_meta_store).await;
107+
rocks_meta_store.wait_upload_loop().await;
108108
Ok(())
109109
}));
110110

111111
let rocks_cache_store = self.rocks_cache_store.clone().unwrap();
112112
futures.push(cube_ext::spawn(async move {
113-
RocksCacheStore::wait_upload_loop(rocks_cache_store).await;
113+
rocks_cache_store.wait_upload_loop().await;
114114
Ok(())
115115
}));
116116

@@ -1262,12 +1262,12 @@ impl Config {
12621262
.await;
12631263
let path = self.cache_store_path().to_str().unwrap().to_string();
12641264
self.injector
1265-
.register_typed_with_default::<dyn CacheStore, RocksCacheStore, _, _>(
1265+
.register_typed_with_default::<dyn CacheStore, LazyRocksCacheStore, _, _>(
12661266
async move |i| {
12671267
let config = i.get_service_typed::<dyn ConfigObj>().await;
12681268
let cachestore_fs = i.get_service("cachestore_fs").await;
12691269
let cache_store = if let Some(dump_dir) = config.clone().dump_dir() {
1270-
RocksCacheStore::load_from_dump(
1270+
LazyRocksCacheStore::load_from_dump(
12711271
&Path::new(&path),
12721272
dump_dir,
12731273
cachestore_fs,
@@ -1276,7 +1276,7 @@ impl Config {
12761276
.await
12771277
.unwrap()
12781278
} else {
1279-
RocksCacheStore::load_from_remote(&path, cachestore_fs, config)
1279+
LazyRocksCacheStore::load_from_remote(&path, cachestore_fs, config)
12801280
.await
12811281
.unwrap()
12821282
};
@@ -1511,7 +1511,11 @@ impl Config {
15111511
} else {
15121512
None
15131513
},
1514-
rocks_cache_store: if self.injector.has_service_typed::<RocksCacheStore>().await {
1514+
rocks_cache_store: if self
1515+
.injector
1516+
.has_service_typed::<LazyRocksCacheStore>()
1517+
.await
1518+
{
15151519
Some(self.injector.get_service_typed().await)
15161520
} else {
15171521
None

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,17 +1166,16 @@ impl RocksMetaStore {
11661166
Ok(Self::new_from_store(store))
11671167
}
11681168

1169-
pub async fn wait_upload_loop(meta_store: Arc<Self>) {
1170-
if !meta_store.store.config.upload_to_remote() {
1169+
pub async fn wait_upload_loop(self: Arc<Self>) {
1170+
if !self.store.config.upload_to_remote() {
11711171
log::info!("Not running metastore upload loop");
11721172
return;
11731173
}
11741174

1175-
let upload_interval = meta_store.store.config.meta_store_log_upload_interval();
1176-
meta_store
1177-
.upload_loop
1175+
let upload_interval = self.store.config.meta_store_log_upload_interval();
1176+
self.upload_loop
11781177
.process(
1179-
meta_store.clone(),
1178+
self.clone(),
11801179
async move |_| Ok(Delay::new(Duration::from_secs(upload_interval)).await),
11811180
async move |m, _| m.store.run_upload().await,
11821181
)

0 commit comments

Comments
 (0)