Skip to content

Commit e2dfe8b

Browse files
authored
chore(cubestore): metrics for remote filesystem operations (#6320)
1 parent 50841d9 commit e2dfe8b

File tree

6 files changed

+90
-2
lines changed

6 files changed

+90
-2
lines changed

rust/cubestore/cubestore/src/app_metrics.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,6 @@ pub static METASTORE_INNER_WRITE_OPERATION: Histogram =
5050
metrics::histogram("cs.metastore.inner_write_operation.ms");
5151
pub static METASTORE_READ_OUT_QUEUE_OPERATION: Histogram =
5252
metrics::histogram("cs.metastore.read_out_queue_operation.ms");
53+
54+
/// RemoteFs metrics
55+
pub static REMOTE_FS_OPERATION_CORE: Counter = metrics::counter("cs.remote_fs.operations.core");

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,6 +1157,12 @@ impl Config {
11571157
.register_typed::<dyn ConfigObj, _, _, _>(async move |_| config_obj_to_register)
11581158
.await;
11591159

1160+
let server_name = self.config_obj.server_name();
1161+
let host_name = server_name
1162+
.split(":")
1163+
.next()
1164+
.unwrap_or("undefined")
1165+
.to_string();
11601166
match &self.config_obj.store_provider {
11611167
FileStoreProvider::Filesystem { remote_dir } => {
11621168
let remote_dir = remote_dir.clone();
@@ -1180,7 +1186,8 @@ impl Config {
11801186
self.injector
11811187
.register("original_remote_fs", async move |_| {
11821188
let arc: Arc<dyn DIService> =
1183-
S3RemoteFs::new(data_dir, region, bucket_name, sub_path).unwrap();
1189+
S3RemoteFs::new(data_dir, region, bucket_name, sub_path, host_name)
1190+
.unwrap();
11841191
arc
11851192
})
11861193
.await;
@@ -1195,7 +1202,7 @@ impl Config {
11951202
self.injector
11961203
.register("original_remote_fs", async move |_| {
11971204
let arc: Arc<dyn DIService> =
1198-
GCSRemoteFs::new(data_dir, bucket_name, sub_path).unwrap();
1205+
GCSRemoteFs::new(data_dir, bucket_name, sub_path, host_name).unwrap();
11991206
arc
12001207
})
12011208
.await;

rust/cubestore/cubestore/src/metastore/rocks_store.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,6 +1086,9 @@ mod tests {
10861086
);
10871087
}
10881088

1089+
let _ = fs::remove_dir_all(store_path.clone());
1090+
let _ = fs::remove_dir_all(remote_store_path.clone());
1091+
10891092
Ok(())
10901093
}
10911094
}

rust/cubestore/cubestore/src/remotefs/gcs.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::app_metrics;
12
use crate::di_service;
23
use crate::remotefs::{LocalDirRemoteFs, RemoteFile, RemoteFs};
34
use crate::util::lock::acquire_lock;
@@ -96,20 +97,23 @@ pub struct GCSRemoteFs {
9697
bucket: String,
9798
sub_path: Option<String>,
9899
delete_mut: Mutex<()>,
100+
cube_host: String,
99101
}
100102

101103
impl GCSRemoteFs {
102104
pub fn new(
103105
dir: PathBuf,
104106
bucket_name: String,
105107
sub_path: Option<String>,
108+
cube_host: String,
106109
) -> Result<Arc<Self>, CubeError> {
107110
ensure_credentials_init();
108111
Ok(Arc::new(Self {
109112
dir,
110113
bucket: bucket_name.to_string(),
111114
sub_path,
112115
delete_mut: Mutex::new(()),
116+
cube_host,
113117
}))
114118
}
115119
}
@@ -123,6 +127,14 @@ impl RemoteFs for GCSRemoteFs {
123127
temp_upload_path: &str,
124128
remote_path: &str,
125129
) -> Result<u64, CubeError> {
130+
app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags(
131+
1,
132+
Some(&vec![
133+
"operation:upload_file".to_string(),
134+
"driver:gcs".to_string(),
135+
format!("cube_host:{}", self.cube_host),
136+
]),
137+
);
126138
let time = SystemTime::now();
127139
debug!("Uploading {}", remote_path);
128140
let file = File::open(temp_upload_path).await?;
@@ -168,6 +180,14 @@ impl RemoteFs for GCSRemoteFs {
168180

169181
fs::create_dir_all(&downloads_dirs).await?;
170182
if !local_file.exists() {
183+
app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags(
184+
1,
185+
Some(&vec![
186+
"operation:download_file".to_string(),
187+
"driver:gcs".to_string(),
188+
format!("cube_host:{}", self.cube_host),
189+
]),
190+
);
171191
let time = SystemTime::now();
172192
debug!("Downloading {}", remote_path);
173193
let (temp_file, temp_path) =
@@ -206,6 +226,14 @@ impl RemoteFs for GCSRemoteFs {
206226
}
207227

208228
async fn delete_file(&self, remote_path: &str) -> Result<(), CubeError> {
229+
app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags(
230+
1,
231+
Some(&vec![
232+
"operation:delete_file".to_string(),
233+
"driver:gcs".to_string(),
234+
format!("cube_host:{}", self.cube_host),
235+
]),
236+
);
209237
let time = SystemTime::now();
210238
debug!("Deleting {}", remote_path);
211239
Object::delete(self.bucket.as_str(), self.gcs_path(remote_path).as_str()).await?;
@@ -232,6 +260,14 @@ impl RemoteFs for GCSRemoteFs {
232260
}
233261

234262
async fn list_with_metadata(&self, remote_prefix: &str) -> Result<Vec<RemoteFile>, CubeError> {
263+
app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags(
264+
1,
265+
Some(&vec![
266+
"operation:list".to_string(),
267+
"driver:gcs".to_string(),
268+
format!("cube_host:{}", self.cube_host),
269+
]),
270+
);
235271
let prefix = self.gcs_path(remote_prefix);
236272
let list = Object::list_prefix(self.bucket.as_str(), prefix.as_str()).await?;
237273
let leading_slash = Regex::new(format!("^{}", self.gcs_path("")).as_str()).unwrap();

rust/cubestore/cubestore/src/remotefs/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,7 @@ mod tests {
573573
region.clone(),
574574
bucket_name.clone(),
575575
None,
576+
"test".to_string(),
576577
)
577578
.unwrap();
578579

@@ -586,6 +587,7 @@ mod tests {
586587
region.clone(),
587588
bucket_name.clone(),
588589
Some("remotefs_test_subpathdir".to_string()),
590+
"test".to_string(),
589591
)
590592
.unwrap();
591593

rust/cubestore/cubestore/src/remotefs/s3.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::app_metrics;
12
use crate::di_service;
23
use crate::remotefs::{LocalDirRemoteFs, RemoteFile, RemoteFs};
34
use crate::util::lock::acquire_lock;
@@ -25,6 +26,7 @@ pub struct S3RemoteFs {
2526
bucket: std::sync::RwLock<Bucket>,
2627
sub_path: Option<String>,
2728
delete_mut: Mutex<()>,
29+
cube_host: String,
2830
}
2931

3032
impl fmt::Debug for S3RemoteFs {
@@ -48,6 +50,7 @@ impl S3RemoteFs {
4850
region: String,
4951
bucket_name: String,
5052
sub_path: Option<String>,
53+
cube_host: String,
5154
) -> Result<Arc<Self>, CubeError> {
5255
let key_id = env::var("CUBESTORE_AWS_ACCESS_KEY_ID").ok();
5356
let access_key = env::var("CUBESTORE_AWS_SECRET_ACCESS_KEY").ok();
@@ -61,6 +64,7 @@ impl S3RemoteFs {
6164
bucket,
6265
sub_path,
6366
delete_mut: Mutex::new(()),
67+
cube_host,
6468
});
6569
spawn_creds_refresh_loop(key_id, access_key, bucket_name, region, &fs);
6670
Ok(fs)
@@ -138,6 +142,15 @@ impl RemoteFs for S3RemoteFs {
138142
remote_path: &str,
139143
) -> Result<u64, CubeError> {
140144
{
145+
app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags(
146+
1,
147+
Some(&vec![
148+
"operation:upload_file".to_string(),
149+
"driver:s3".to_string(),
150+
format!("cube_host:{}", self.cube_host),
151+
]),
152+
);
153+
141154
let time = SystemTime::now();
142155
debug!("Uploading {}", remote_path);
143156
let path = self.s3_path(remote_path);
@@ -188,6 +201,14 @@ impl RemoteFs for S3RemoteFs {
188201

189202
fs::create_dir_all(&downloads_dir).await?;
190203
if !local_file.exists() {
204+
app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags(
205+
1,
206+
Some(&vec![
207+
"operation:download_file".to_string(),
208+
"driver:s3".to_string(),
209+
format!("cube_host:{}", self.cube_host),
210+
]),
211+
);
191212
let time = SystemTime::now();
192213
debug!("Downloading {}", remote_path);
193214
let path = self.s3_path(remote_path);
@@ -216,6 +237,14 @@ impl RemoteFs for S3RemoteFs {
216237
}
217238

218239
async fn delete_file(&self, remote_path: &str) -> Result<(), CubeError> {
240+
app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags(
241+
1,
242+
Some(&vec![
243+
"operation:delete_file".to_string(),
244+
"driver:s3".to_string(),
245+
format!("cube_host:{}", self.cube_host),
246+
]),
247+
);
219248
let time = SystemTime::now();
220249
debug!("Deleting {}", remote_path);
221250
let path = self.s3_path(remote_path);
@@ -251,6 +280,14 @@ impl RemoteFs for S3RemoteFs {
251280
}
252281

253282
async fn list_with_metadata(&self, remote_prefix: &str) -> Result<Vec<RemoteFile>, CubeError> {
283+
app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags(
284+
1,
285+
Some(&vec![
286+
"operation:list".to_string(),
287+
"driver:s3".to_string(),
288+
format!("cube_host:{}", self.cube_host),
289+
]),
290+
);
254291
let path = self.s3_path(remote_prefix);
255292
let bucket = self.bucket.read().unwrap().clone();
256293
let list = cube_ext::spawn_blocking(move || bucket.list_blocking(path, None)).await??;

0 commit comments

Comments
 (0)