Skip to content

Commit 88e033b

Browse files
authored
chore(cubestore): Submit additional metrics for cachestore (#7057)
1 parent 4616d57 commit 88e033b

File tree

4 files changed

+110
-28
lines changed

4 files changed

+110
-28
lines changed

rust/cubestore/cubestore/src/app_metrics.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ pub static METASTORE_INNER_WRITE_OPERATION: Histogram =
5454
pub static METASTORE_READ_OUT_QUEUE_OPERATION: Histogram =
5555
metrics::histogram("cs.metastore.read_out_queue_operation.ms");
5656

57+
pub static CACHESTORE_ROCKSDB_ESTIMATE_LIVE_DATA_SIZE: Gauge =
58+
metrics::gauge("cs.cachestore.rocksdb.estimate_live_data_size");
59+
pub static CACHESTORE_ROCKSDB_LIVE_SST_FILES_SIZE: Gauge =
60+
metrics::gauge("cs.cachestore.rocksdb.live_sst_files_size");
61+
pub static CACHESTORE_SCHEDULER_GC_QUEUE: Gauge =
62+
metrics::gauge("cs.cachestore.scheduler.gc_queue");
63+
5764
/// RemoteFs metrics
5865
pub static REMOTE_FS_OPERATION_CORE: Counter = metrics::counter("cs.remote_fs.operations.core");
5966
pub static REMOTE_FS_FILES_TO_REMOVE: Gauge = metrics::gauge("cs.remote_fs.files_to_remove.count");

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::metastore::{
1818
};
1919
use crate::remotefs::LocalDirRemoteFs;
2020
use crate::util::WorkerLoop;
21-
use crate::CubeError;
21+
use crate::{app_metrics, CubeError};
2222
use async_trait::async_trait;
2323

2424
use futures_timer::Delay;
@@ -28,13 +28,15 @@ use crate::cachestore::compaction::CompactionPreloadedState;
2828
use crate::cachestore::listener::RocksCacheStoreListener;
2929
use crate::table::{Row, TableValue};
3030
use chrono::{DateTime, Utc};
31+
use datafusion::cube_ext;
3132
use itertools::Itertools;
3233
use log::{trace, warn};
3334
use serde_derive::{Deserialize, Serialize};
3435
use std::path::{Path, PathBuf};
3536
use std::sync::{Arc, Mutex};
3637
use std::time::Duration;
3738
use tokio::sync::broadcast::Sender;
39+
use tokio::task::JoinHandle;
3840

3941
pub(crate) struct RocksCacheStoreDetails {}
4042

@@ -125,6 +127,7 @@ impl RocksStoreDetails for RocksCacheStoreDetails {
125127
pub struct RocksCacheStore {
126128
store: Arc<RocksStore>,
127129
upload_loop: Arc<WorkerLoop>,
130+
metrics_loop: Arc<WorkerLoop>,
128131
}
129132

130133
impl RocksCacheStore {
@@ -158,6 +161,7 @@ impl RocksCacheStore {
158161
Arc::new(Self {
159162
store,
160163
upload_loop: Arc::new(WorkerLoop::new("Cachestore upload")),
164+
metrics_loop: Arc::new(WorkerLoop::new("Cachestore metrics")),
161165
})
162166
}
163167

@@ -191,24 +195,77 @@ impl RocksCacheStore {
191195
Ok(Self::new_from_store(store))
192196
}
193197

194-
pub async fn wait_upload_loop(self: Arc<Self>) {
195-
if !self.store.config.upload_to_remote() {
198+
pub fn spawn_processing_loops(self: Arc<Self>) -> Vec<JoinHandle<Result<(), CubeError>>> {
199+
let mut loops = vec![];
200+
201+
if self.store.config.upload_to_remote() {
202+
let upload_interval = self.store.config.meta_store_log_upload_interval();
203+
let cachestore = self.clone();
204+
loops.push(cube_ext::spawn(async move {
205+
cachestore
206+
.upload_loop
207+
.process(
208+
cachestore.clone(),
209+
async move |_| Ok(Delay::new(Duration::from_secs(upload_interval)).await),
210+
async move |m, _| m.store.run_upload().await,
211+
)
212+
.await;
213+
214+
Ok(())
215+
}))
216+
} else {
196217
log::info!("Not running cachestore upload loop");
197-
return;
198218
}
199219

200-
let upload_interval = self.store.config.meta_store_log_upload_interval();
201-
self.upload_loop
202-
.process(
203-
self.clone(),
204-
async move |_| Ok(Delay::new(Duration::from_secs(upload_interval)).await),
205-
async move |m, _| m.store.run_upload().await,
206-
)
207-
.await;
220+
let metrics_interval = self.store.config.cachestore_metrics_interval();
221+
if metrics_interval > 0 {
222+
let cachestore = self.clone();
223+
loops.push(cube_ext::spawn(async move {
224+
cachestore
225+
.metrics_loop
226+
.process(
227+
cachestore.clone(),
228+
async move |_| Ok(Delay::new(Duration::from_secs(metrics_interval)).await),
229+
async move |m, _| {
230+
if let Err(err) = m.submit_metrics().await {
231+
log::error!("Error while submitting cachestore metrics: {}", err)
232+
};
233+
234+
Ok(())
235+
},
236+
)
237+
.await;
238+
239+
Ok(())
240+
}))
241+
} else {
242+
log::info!("Not running cachestore metrics loop");
243+
}
244+
245+
loops
246+
}
247+
248+
pub async fn submit_metrics(&self) -> Result<(), CubeError> {
249+
app_metrics::CACHESTORE_ROCKSDB_ESTIMATE_LIVE_DATA_SIZE.report(
250+
self.store
251+
.db
252+
.property_int_value(rocksdb::properties::ESTIMATE_LIVE_DATA_SIZE)?
253+
.unwrap_or(0) as i64,
254+
);
255+
256+
app_metrics::CACHESTORE_ROCKSDB_LIVE_SST_FILES_SIZE.report(
257+
self.store
258+
.db
259+
.property_int_value(rocksdb::properties::LIVE_SST_FILES_SIZE)?
260+
.unwrap_or(0) as i64,
261+
);
262+
263+
Ok(())
208264
}
209265

210266
pub async fn stop_processing_loops(&self) {
211267
self.upload_loop.stop();
268+
self.metrics_loop.stop();
212269
}
213270

214271
pub async fn add_listener(&self, listener: Sender<MetaStoreEvent>) {

rust/cubestore/cubestore/src/cachestore/lazy.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use log::trace;
1212
use std::path::Path;
1313
use std::sync::Arc;
1414
use tokio::sync::watch::{Receiver, Sender};
15+
use tokio::task::JoinHandle;
1516

1617
pub enum LazyRocksCacheStoreState {
1718
FromRemote {
@@ -118,7 +119,7 @@ impl LazyRocksCacheStore {
118119
}
119120
}
120121

121-
pub async fn wait_upload_loop(&self) {
122+
pub async fn spawn_processing_loops(self: Arc<Self>) -> Vec<JoinHandle<Result<(), CubeError>>> {
122123
if let Some(init_signal) = &self.init_signal {
123124
let _ = init_signal.clone().changed().await;
124125
}
@@ -128,13 +129,13 @@ impl LazyRocksCacheStore {
128129
if let LazyRocksCacheStoreState::Initialized { store } = &*guard {
129130
store.clone()
130131
} else {
131-
return ();
132+
return vec![];
132133
}
133134
};
134135

135136
trace!("wait_upload_loop unblocked, Cache Store was initialized");
136137

137-
store.wait_upload_loop().await
138+
store.spawn_processing_loops()
138139
}
139140

140141
pub async fn stop_processing_loops(&self) {

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,10 @@ impl CubeServices {
128128

129129
let rocks_cache_store = self.rocks_cache_store.clone().unwrap();
130130
futures.push(cube_ext::spawn(async move {
131-
rocks_cache_store.wait_upload_loop().await;
131+
let loops = rocks_cache_store.spawn_processing_loops().await;
132+
133+
Self::wait_loops(loops).await?;
134+
132135
Ok(())
133136
}));
134137

@@ -427,6 +430,8 @@ pub trait ConfigObj: DIService {
427430

428431
fn cachestore_queue_results_expire(&self) -> u64;
429432

433+
fn cachestore_metrics_interval(&self) -> u64;
434+
430435
fn download_concurrency(&self) -> u64;
431436

432437
fn upload_concurrency(&self) -> u64;
@@ -541,6 +546,7 @@ pub struct ConfigObjImpl {
541546
pub cachestore_rocks_store_config: RocksStoreConfig,
542547
pub cachestore_gc_loop_interval: u64,
543548
pub cachestore_queue_results_expire: u64,
549+
pub cachestore_metrics_interval: u64,
544550
pub upload_concurrency: u64,
545551
pub download_concurrency: u64,
546552
pub connection_timeout: u64,
@@ -605,10 +611,6 @@ impl ConfigObj for ConfigObjImpl {
605611
self.compaction_chunks_in_memory_size_threshold
606612
}
607613

608-
fn compaction_in_memory_chunks_size_limit(&self) -> u64 {
609-
self.compaction_in_memory_chunks_size_limit
610-
}
611-
612614
fn compaction_chunks_max_lifetime_threshold(&self) -> u64 {
613615
self.compaction_chunks_max_lifetime_threshold
614616
}
@@ -617,6 +619,10 @@ impl ConfigObj for ConfigObjImpl {
617619
self.compaction_in_memory_chunks_max_lifetime_threshold
618620
}
619621

622+
fn compaction_in_memory_chunks_size_limit(&self) -> u64 {
623+
self.compaction_in_memory_chunks_size_limit
624+
}
625+
620626
fn compaction_in_memory_chunks_total_size_limit(&self) -> u64 {
621627
self.compaction_in_memory_chunks_total_size_limit
622628
}
@@ -709,14 +715,14 @@ impl ConfigObj for ConfigObjImpl {
709715
&self.metastore_bind_address
710716
}
711717

712-
fn metastore_remote_address(&self) -> &Option<String> {
713-
&self.metastore_remote_address
714-
}
715-
716718
fn metastore_rocksdb_config(&self) -> &RocksStoreConfig {
717719
&self.metastore_rocks_store_config
718720
}
719721

722+
fn metastore_remote_address(&self) -> &Option<String> {
723+
&self.metastore_remote_address
724+
}
725+
720726
fn cachestore_rocksdb_config(&self) -> &RocksStoreConfig {
721727
&self.cachestore_rocks_store_config
722728
}
@@ -729,6 +735,10 @@ impl ConfigObj for ConfigObjImpl {
729735
self.cachestore_queue_results_expire
730736
}
731737

738+
fn cachestore_metrics_interval(&self) -> u64 {
739+
self.cachestore_metrics_interval
740+
}
741+
732742
fn download_concurrency(&self) -> u64 {
733743
self.download_concurrency
734744
}
@@ -786,10 +796,6 @@ impl ConfigObj for ConfigObjImpl {
786796
fn stream_replay_check_interval_secs(&self) -> u64 {
787797
self.stream_replay_check_interval_secs
788798
}
789-
fn skip_kafka_parsing_errors(&self) -> bool {
790-
self.skip_kafka_parsing_errors
791-
}
792-
793799
fn check_ws_orphaned_messages_interval_secs(&self) -> u64 {
794800
self.check_ws_orphaned_messages_interval_secs
795801
}
@@ -802,6 +808,10 @@ impl ConfigObj for ConfigObjImpl {
802808
self.drop_ws_complete_messages_after_secs
803809
}
804810

811+
fn skip_kafka_parsing_errors(&self) -> bool {
812+
self.skip_kafka_parsing_errors
813+
}
814+
805815
fn dump_dir(&self) -> &Option<PathBuf> {
806816
&self.dump_dir
807817
}
@@ -1151,6 +1161,12 @@ impl Config {
11511161
Some(60 * 5),
11521162
Some(1),
11531163
),
1164+
cachestore_metrics_interval: env_parse_duration(
1165+
"CUBESTORE_CACHESTORE_METRICS_LOOP",
1166+
15,
1167+
Some(60 * 10),
1168+
Some(0),
1169+
),
11541170
upload_concurrency: env_parse("CUBESTORE_MAX_ACTIVE_UPLOADS", 4),
11551171
download_concurrency: env_parse("CUBESTORE_MAX_ACTIVE_DOWNLOADS", 8),
11561172
max_ingestion_data_frames: env_parse("CUBESTORE_MAX_DATA_FRAMES", 4),
@@ -1318,6 +1334,7 @@ impl Config {
13181334
cachestore_rocks_store_config: RocksStoreConfig::cachestore_default(),
13191335
cachestore_gc_loop_interval: 30,
13201336
cachestore_queue_results_expire: 90,
1337+
cachestore_metrics_interval: 15,
13211338
upload_concurrency: 4,
13221339
download_concurrency: 8,
13231340
max_ingestion_data_frames: 4,

0 commit comments

Comments
 (0)