Skip to content

Commit 6aee160

Browse files
authored
chore(cubestore): Remove old local files refactoring (#6688)
1 parent fb59e59 commit 6aee160

File tree

7 files changed

+390
-120
lines changed

7 files changed

+390
-120
lines changed

rust/cubestore/cubestore/src/app_metrics.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,6 @@ pub static METASTORE_READ_OUT_QUEUE_OPERATION: Histogram =
5656

5757
/// RemoteFs metrics
5858
pub static REMOTE_FS_OPERATION_CORE: Counter = metrics::counter("cs.remote_fs.operations.core");
59+
pub static REMOTE_FS_FILES_TO_REMOVE: Gauge = metrics::gauge("cs.remote_fs.files_to_remove.count");
60+
pub static REMOTE_FS_FILES_SIZE_TO_REMOVE: Gauge =
61+
metrics::gauge("cs.remote_fs.files_to_remove.size");

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::metastore::{
2020
use crate::mysql::{MySqlServer, SqlAuthDefaultImpl, SqlAuthService};
2121
use crate::queryplanner::query_executor::{QueryExecutor, QueryExecutorImpl};
2222
use crate::queryplanner::{QueryPlanner, QueryPlannerImpl};
23+
use crate::remotefs::cleanup::RemoteFsCleanup;
2324
use crate::remotefs::gcs::GCSRemoteFs;
2425
use crate::remotefs::minio::MINIORemoteFs;
2526
use crate::remotefs::queue::QueueRemoteFs;
@@ -109,6 +110,14 @@ impl CubeServices {
109110
QueueRemoteFs::wait_processing_loops(remote_fs.clone()).await
110111
}));
111112

113+
if self.injector.has_service_typed::<RemoteFsCleanup>().await {
114+
let cleanup = self.injector.get_service_typed::<RemoteFsCleanup>().await;
115+
futures.push(cube_ext::spawn(async move {
116+
cleanup.wait_local_cleanup_loop().await;
117+
Ok(())
118+
}));
119+
}
120+
112121
if !self.cluster.is_select_worker() {
113122
let rocks_meta_store = self.rocks_meta_store.clone().unwrap();
114123
futures.push(cube_ext::spawn(async move {
@@ -220,6 +229,11 @@ impl CubeServices {
220229
scheduler.stop_processing_loops()?;
221230
}
222231

232+
if self.injector.has_service_typed::<RemoteFsCleanup>().await {
233+
let cleanup = self.injector.get_service_typed::<RemoteFsCleanup>().await;
234+
cleanup.stop();
235+
}
236+
223237
if self
224238
.injector
225239
.has_service_typed::<CacheStoreSchedulerImpl>()
@@ -470,7 +484,13 @@ pub trait ConfigObj: DIService {
470484

471485
fn local_files_cleanup_interval_secs(&self) -> u64;
472486

487+
fn remote_files_cleanup_interval_secs(&self) -> u64;
488+
473489
fn local_files_cleanup_size_threshold(&self) -> u64;
490+
491+
fn local_files_cleanup_delay_secs(&self) -> u64;
492+
493+
fn remote_files_cleanup_delay_secs(&self) -> u64;
474494
}
475495

476496
#[derive(Debug, Clone)]
@@ -544,7 +564,10 @@ pub struct ConfigObjImpl {
544564
pub transport_max_message_size: usize,
545565
pub transport_max_frame_size: usize,
546566
pub local_files_cleanup_interval_secs: u64,
567+
pub remote_files_cleanup_interval_secs: u64,
547568
pub local_files_cleanup_size_threshold: u64,
569+
pub local_files_cleanup_delay_secs: u64,
570+
pub remote_files_cleanup_delay_secs: u64,
548571
}
549572

550573
crate::di_service!(ConfigObjImpl, [ConfigObj]);
@@ -812,9 +835,21 @@ impl ConfigObj for ConfigObjImpl {
812835
self.local_files_cleanup_interval_secs
813836
}
814837

838+
fn remote_files_cleanup_interval_secs(&self) -> u64 {
839+
self.remote_files_cleanup_interval_secs
840+
}
841+
815842
fn local_files_cleanup_size_threshold(&self) -> u64 {
816843
self.local_files_cleanup_size_threshold
817844
}
845+
846+
fn local_files_cleanup_delay_secs(&self) -> u64 {
847+
self.local_files_cleanup_delay_secs
848+
}
849+
850+
fn remote_files_cleanup_delay_secs(&self) -> u64 {
851+
self.remote_files_cleanup_delay_secs
852+
}
818853
}
819854

820855
lazy_static! {
@@ -1187,14 +1222,26 @@ impl Config {
11871222
),
11881223
local_files_cleanup_interval_secs: env_parse(
11891224
"CUBESTORE_LOCAL_FILES_CLEANUP_INTERVAL_SECS",
1190-
3 * 600,
1225+
600,
1226+
),
1227+
remote_files_cleanup_interval_secs: env_parse(
1228+
"CUBESTORE_REMOTE_FILES_CLEANUP_INTERVAL_SECS",
1229+
6 * 600,
11911230
),
11921231
local_files_cleanup_size_threshold: env_parse_size(
11931232
"CUBESTORE_LOCAL_FILES_CLEANUP_SIZE_THRESHOLD",
11941233
1024 * 1024 * 1024,
11951234
None,
11961235
None,
11971236
) as u64,
1237+
local_files_cleanup_delay_secs: env_parse(
1238+
"CUBESTORE_LOCAL_FILES_CLEANUP_DELAY_SECS",
1239+
600,
1240+
),
1241+
remote_files_cleanup_delay_secs: env_parse(
1242+
"CUBESTORE_REMOTE_FILES_CLEANUP_DELAY_SECS",
1243+
3600,
1244+
),
11981245
}),
11991246
}
12001247
}
@@ -1280,7 +1327,10 @@ impl Config {
12801327
transport_max_message_size: 64 << 20,
12811328
transport_max_frame_size: 16 << 20,
12821329
local_files_cleanup_interval_secs: 600,
1330+
remote_files_cleanup_interval_secs: 600,
12831331
local_files_cleanup_size_threshold: 0,
1332+
local_files_cleanup_delay_secs: 600,
1333+
remote_files_cleanup_delay_secs: 3600,
12841334
}),
12851335
}
12861336
}
@@ -1739,6 +1789,16 @@ impl Config {
17391789
))
17401790
})
17411791
.await;
1792+
1793+
self.injector
1794+
.register_typed::<RemoteFsCleanup, _, _, _>(async move |i| {
1795+
Arc::new(RemoteFsCleanup::new(
1796+
i.get_service_typed().await,
1797+
i.get_service_typed().await,
1798+
i.get_service_typed().await,
1799+
))
1800+
})
1801+
.await;
17421802
}
17431803

17441804
pub async fn configure_common(&self) {

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -969,6 +969,8 @@ pub trait MetaStore: DIService + Send + Sync {
969969
&self,
970970
) -> Result<Vec<(IdRow<Partition>, Vec<IdRow<Chunk>>)>, CubeError>;
971971

972+
async fn get_all_filenames(&self) -> Result<Vec<String>, CubeError>;
973+
972974
fn chunks_table(&self) -> ChunkMetaStoreTable;
973975
async fn create_chunk(
974976
&self,
@@ -3313,6 +3315,26 @@ impl MetaStore for RocksMetaStore {
33133315
})
33143316
.await
33153317
}
3318+
async fn get_all_filenames(&self) -> Result<Vec<String>, CubeError> {
3319+
self.read_operation_out_of_queue(|db| {
3320+
let mut filenames = Vec::new();
3321+
for c in ChunkRocksTable::new(db.clone()).table_scan(db.snapshot)? {
3322+
let c = c?;
3323+
if !c.row.in_memory {
3324+
filenames.push(c.row.get_full_name(c.id));
3325+
}
3326+
}
3327+
3328+
for p in PartitionRocksTable::new(db.clone()).table_scan(db.snapshot)? {
3329+
let p = p?;
3330+
if let Some(filename) = p.row.get_full_name(p.id) {
3331+
filenames.push(filename);
3332+
}
3333+
}
3334+
Ok(filenames)
3335+
})
3336+
.await
3337+
}
33163338

33173339
#[tracing::instrument(level = "trace", skip(self))]
33183340
async fn create_chunk(

rust/cubestore/cubestore/src/queryplanner/test_utils.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,10 @@ impl MetaStore for MetaStoreMock {
735735
) -> Result<Vec<IdRow<Index>>, CubeError> {
736736
panic!("MetaStore mock!")
737737
}
738+
739+
async fn get_all_filenames(&self) -> Result<Vec<String>, CubeError> {
740+
panic!("MetaStore mock!")
741+
}
738742
}
739743

740744
crate::di_service!(MetaStoreMock, [MetaStore]);

0 commit comments

Comments
 (0)