Skip to content

Commit 80781f9

Browse files
committed
chore(cubestore): Upgrade DF: Add compaction operation app_metrics
1 parent ebcd055 commit 80781f9

File tree

2 files changed

+56
-7
lines changed

2 files changed

+56
-7
lines changed

rust/cubestore/cubestore/src/app_metrics.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,30 @@ pub static CACHESTORE_ROCKSDB_CF_DEFAULT_SIZE: Gauge =
8585
pub static CACHESTORE_SCHEDULER_GC_QUEUE: Gauge =
8686
metrics::gauge("cs.cachestore.scheduler.gc_queue");
8787

88+
// TODO: Maybe these should be a single metric that uses tags.
89+
pub static JOBS_PARTITION_COMPACTION: Counter =
90+
metrics::counter("cs.jobs.partition_compaction.count");
91+
pub static JOBS_PARTITION_COMPACTION_COMPLETED: Counter =
92+
metrics::counter("cs.jobs.partition_compaction.completed");
93+
pub static JOBS_PARTITION_COMPACTION_FAILURES: Counter =
94+
metrics::counter("cs.jobs.partition_compaction.failures");
95+
pub static JOBS_MULTI_PARTITION_SPLIT: Counter =
96+
metrics::counter("cs.jobs.multi_partition_split.count");
97+
pub static JOBS_MULTI_PARTITION_SPLIT_COMPLETED: Counter =
98+
metrics::counter("cs.jobs.multi_partition_split.completed");
99+
pub static JOBS_MULTI_PARTITION_SPLIT_FAILURES: Counter =
100+
metrics::counter("cs.jobs.multi_partition_split.failures");
101+
pub static JOBS_FINISH_MULTI_SPLIT: Counter = metrics::counter("cs.jobs.finish_multi_split.count");
102+
pub static JOBS_FINISH_MULTI_SPLIT_COMPLETED: Counter =
103+
metrics::counter("cs.jobs.finish_multi_split.completed");
104+
pub static JOBS_FINISH_MULTI_SPLIT_FAILURES: Counter =
105+
metrics::counter("cs.jobs.finish_multi_split.failures");
106+
pub static JOBS_REPARTITION_CHUNK: Counter = metrics::counter("cs.jobs.repartition_chunk.count");
107+
pub static JOBS_REPARTITION_CHUNK_COMPLETED: Counter =
108+
metrics::counter("cs.jobs.repartition_chunk.completed");
109+
pub static JOBS_REPARTITION_CHUNK_FAILURES: Counter =
110+
metrics::counter("cs.jobs.repartition_chunk.failures");
111+
88112
/// RemoteFs metrics
89113
pub static REMOTE_FS_OPERATION_CORE: Counter = metrics::counter("cs.remote_fs.operations.core");
90114
pub static REMOTE_FS_FILES_TO_REMOVE: Gauge = metrics::gauge("cs.remote_fs.files_to_remove.count");

rust/cubestore/cubestore/src/cluster/ingestion/job_processor.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::metastore::{MetaStore, RowKey, TableId};
77
use crate::queryplanner::trace_data_loaded::DataLoadedSize;
88
use crate::store::compaction::CompactionService;
99
use crate::store::ChunkDataStore;
10-
use crate::CubeError;
10+
use crate::{app_metrics, CubeError};
1111
use async_trait::async_trait;
1212
use serde::{Deserialize, Serialize};
1313
use std::sync::Arc;
@@ -117,10 +117,16 @@ impl JobIsolatedProcessor {
117117
let compaction_service = self.compaction_service.clone();
118118
let partition_id = *partition_id;
119119
let data_loaded_size = DataLoadedSize::new();
120+
app_metrics::JOBS_PARTITION_COMPACTION.add(1);
120121
let r = compaction_service
121122
.compact(partition_id, data_loaded_size.clone())
122123
.await;
123-
r?;
124+
if let Err(e) = r {
125+
app_metrics::JOBS_PARTITION_COMPACTION_FAILURES.add(1);
126+
return Err(e);
127+
}
128+
app_metrics::JOBS_PARTITION_COMPACTION_COMPLETED.add(1);
129+
124130
Ok(JobProcessResult::new(data_loaded_size.get()))
125131
} else {
126132
Self::fail_job_row_key(job)
@@ -130,7 +136,13 @@ impl JobIsolatedProcessor {
130136
if let RowKey::Table(TableId::MultiPartitions, id) = job.row_reference() {
131137
let compaction_service = self.compaction_service.clone();
132138
let id = *id;
133-
compaction_service.split_multi_partition(id).await?;
139+
app_metrics::JOBS_MULTI_PARTITION_SPLIT.add(1);
140+
let r = compaction_service.split_multi_partition(id).await;
141+
if let Err(e) = r {
142+
app_metrics::JOBS_MULTI_PARTITION_SPLIT_FAILURES.add(1);
143+
return Err(e);
144+
}
145+
app_metrics::JOBS_MULTI_PARTITION_SPLIT_COMPLETED.add(1);
134146
Ok(JobProcessResult::default())
135147
} else {
136148
Self::fail_job_row_key(job)
@@ -143,9 +155,15 @@ impl JobIsolatedProcessor {
143155
let compaction_service = self.compaction_service.clone();
144156
let multi_part_id = *multi_part_id;
145157
for p in meta_store.find_unsplit_partitions(multi_part_id).await? {
146-
compaction_service
158+
app_metrics::JOBS_FINISH_MULTI_SPLIT.add(1);
159+
let r = compaction_service
147160
.finish_multi_split(multi_part_id, p)
148-
.await?
161+
.await;
162+
if let Err(e) = r {
163+
app_metrics::JOBS_FINISH_MULTI_SPLIT_FAILURES.add(1);
164+
return Err(e);
165+
}
166+
app_metrics::JOBS_FINISH_MULTI_SPLIT_COMPLETED.add(1);
149167
}
150168

151169
Ok(JobProcessResult::default())
@@ -196,9 +214,16 @@ impl JobIsolatedProcessor {
196214
));
197215
}
198216
let data_loaded_size = DataLoadedSize::new();
199-
self.chunk_store
217+
app_metrics::JOBS_REPARTITION_CHUNK.add(1);
218+
let r = self
219+
.chunk_store
200220
.repartition_chunk(chunk_id, data_loaded_size.clone())
201-
.await?;
221+
.await;
222+
if let Err(e) = r {
223+
app_metrics::JOBS_REPARTITION_CHUNK_FAILURES.add(1);
224+
return Err(e);
225+
}
226+
app_metrics::JOBS_REPARTITION_CHUNK_COMPLETED.add(1);
202227
Ok(JobProcessResult::new(data_loaded_size.get()))
203228
} else {
204229
Self::fail_job_row_key(job)

0 commit comments

Comments
 (0)