Skip to content

Commit b35f9fd

Browse files
ovrFrank-TXS
authored andcommitted
chore(cachestore): Rockstore - track operation name in RWLoop for slow span (cube-js#9737)
1 parent 13ccfeb commit b35f9fd

File tree

5 files changed

+208
-166
lines changed

5 files changed

+208
-166
lines changed

rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ impl CacheEvictionManager {
391391
store: &Arc<RocksStore>,
392392
) -> Result<DeleteBatchResult, CubeError> {
393393
let (deleted_count, deleted_size, skipped) = store
394-
.write_operation(move |db_ref, pipe| {
394+
.write_operation("delete_batch", move |db_ref, pipe| {
395395
let cache_schema = CacheItemRocksTable::new(db_ref.clone());
396396

397397
let mut deleted_count: u32 = 0;
@@ -951,7 +951,7 @@ impl CacheEvictionManager {
951951
app_metrics::CACHESTORE_TTL_BUFFER.report(buffer_len as i64);
952952

953953
store
954-
.write_operation(move |db_ref, pipe| {
954+
.write_operation("persist_ttl", move |db_ref, pipe| {
955955
let cache_schema = CacheItemRocksTable::new(db_ref.clone());
956956

957957
for (row_id, item) in to_persist.into_iter() {

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ impl RocksCacheStore {
502502
impl RocksCacheStore {
503503
async fn queue_result_delete_by_id(&self, id: u64) -> Result<(), CubeError> {
504504
self.store
505-
.write_operation(move |db_ref, batch_pipe| {
505+
.write_operation("queue_result_delete_by_id", move |db_ref, batch_pipe| {
506506
let result_schema = QueueResultRocksTable::new(db_ref.clone());
507507
result_schema.try_delete(id, batch_pipe)?;
508508

@@ -514,7 +514,7 @@ impl RocksCacheStore {
514514
/// This method should be called when we are sure that we return data to the consumer
515515
async fn queue_result_ready_to_delete(&self, id: u64) -> Result<(), CubeError> {
516516
self.store
517-
.write_operation(move |db_ref, batch_pipe| {
517+
.write_operation("queue_result_ready_to_delete", move |db_ref, batch_pipe| {
518518
let result_schema = QueueResultRocksTable::new(db_ref.clone());
519519
if let Some(row) = result_schema.get_row(id)? {
520520
Self::queue_result_ready_to_delete_impl(&result_schema, batch_pipe, row)?;
@@ -555,7 +555,7 @@ impl RocksCacheStore {
555555
key: QueueKey,
556556
) -> Result<Option<QueueResultResponse>, CubeError> {
557557
self.store
558-
.write_operation(move |db_ref, batch_pipe| {
558+
.write_operation("lookup_queue_result_by_key", move |db_ref, batch_pipe| {
559559
let result_schema = QueueResultRocksTable::new(db_ref.clone());
560560
let query_key_is_path = key.is_path();
561561
let queue_result = result_schema.get_row_by_key(key.clone())?;
@@ -866,7 +866,7 @@ impl CacheStore for RocksCacheStore {
866866

867867
let (result, inserted) = self
868868
.store
869-
.write_operation(move |db_ref, batch_pipe| {
869+
.write_operation("cache_set", move |db_ref, batch_pipe| {
870870
let cache_schema = CacheItemRocksTable::new(db_ref.clone());
871871
let index_key = CacheItemIndexKey::ByPath(item.get_path());
872872
let id_row_opt = cache_schema
@@ -900,7 +900,7 @@ impl CacheStore for RocksCacheStore {
900900

901901
let result = self
902902
.store
903-
.write_operation(move |db_ref, batch_pipe| {
903+
.write_operation("cache_truncate", move |db_ref, batch_pipe| {
904904
let cache_schema = CacheItemRocksTable::new(db_ref);
905905
cache_schema.truncate(batch_pipe)?;
906906

@@ -917,7 +917,7 @@ impl CacheStore for RocksCacheStore {
917917
async fn cache_delete(&self, key: String) -> Result<(), CubeError> {
918918
let result = self
919919
.store
920-
.write_operation(move |db_ref, batch_pipe| {
920+
.write_operation("cache_delete", move |db_ref, batch_pipe| {
921921
let cache_schema = CacheItemRocksTable::new(db_ref.clone());
922922
let index_key = CacheItemIndexKey::ByPath(key);
923923
let row_opt = cache_schema
@@ -947,7 +947,7 @@ impl CacheStore for RocksCacheStore {
947947
async fn cache_get(&self, key: String) -> Result<Option<IdRow<CacheItem>>, CubeError> {
948948
let res = self
949949
.store
950-
.read_operation(move |db_ref| {
950+
.read_operation("cache_get", move |db_ref| {
951951
let cache_schema = CacheItemRocksTable::new(db_ref.clone());
952952
let index_key = CacheItemIndexKey::ByPath(key);
953953
let id_row_opt = cache_schema
@@ -966,7 +966,7 @@ impl CacheStore for RocksCacheStore {
966966

967967
async fn cache_keys(&self, prefix: String) -> Result<Vec<IdRow<CacheItem>>, CubeError> {
968968
self.store
969-
.read_operation(move |db_ref| {
969+
.read_operation("cache_keys", move |db_ref| {
970970
let cache_schema = CacheItemRocksTable::new(db_ref.clone());
971971
let index_key =
972972
CacheItemIndexKey::ByPrefix(CacheItem::parse_path_to_prefix(prefix));
@@ -981,7 +981,7 @@ impl CacheStore for RocksCacheStore {
981981
async fn cache_incr(&self, path: String) -> Result<IdRow<CacheItem>, CubeError> {
982982
let item = self
983983
.store
984-
.write_operation(move |db_ref, batch_pipe| {
984+
.write_operation("cache_incr", move |db_ref, batch_pipe| {
985985
let cache_schema = CacheItemRocksTable::new(db_ref.clone());
986986
let index_key = CacheItemIndexKey::ByPath(path.clone());
987987
let id_row_opt = cache_schema
@@ -1009,7 +1009,7 @@ impl CacheStore for RocksCacheStore {
10091009

10101010
async fn queue_all(&self, limit: Option<usize>) -> Result<Vec<QueueAllItem>, CubeError> {
10111011
self.store
1012-
.read_operation(move |db_ref| {
1012+
.read_operation("queue_all", move |db_ref| {
10131013
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
10141014
let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone());
10151015

@@ -1041,13 +1041,15 @@ impl CacheStore for RocksCacheStore {
10411041
limit: Option<usize>,
10421042
) -> Result<Vec<IdRow<QueueResult>>, CubeError> {
10431043
self.store
1044-
.read_operation(move |db_ref| Ok(QueueResultRocksTable::new(db_ref).scan_rows(limit)?))
1044+
.read_operation("queue_results_all", move |db_ref| {
1045+
Ok(QueueResultRocksTable::new(db_ref).scan_rows(limit)?)
1046+
})
10451047
.await
10461048
}
10471049

10481050
async fn queue_results_multi_delete(&self, ids: Vec<u64>) -> Result<(), CubeError> {
10491051
self.store
1050-
.write_operation(move |db_ref, batch_pipe| {
1052+
.write_operation("queue_results_multi_delete", move |db_ref, batch_pipe| {
10511053
let queue_result_schema = QueueResultRocksTable::new(db_ref);
10521054

10531055
for id in ids {
@@ -1061,7 +1063,7 @@ impl CacheStore for RocksCacheStore {
10611063

10621064
async fn queue_add(&self, payload: QueueAddPayload) -> Result<QueueAddResponse, CubeError> {
10631065
self.store
1064-
.write_operation(move |db_ref, batch_pipe| {
1066+
.write_operation("queue_add", move |db_ref, batch_pipe| {
10651067
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
10661068
let pending = queue_schema.count_rows_by_index(
10671069
&QueueItemIndexKey::ByPrefixAndStatus(
@@ -1113,7 +1115,7 @@ impl CacheStore for RocksCacheStore {
11131115

11141116
async fn queue_truncate(&self) -> Result<(), CubeError> {
11151117
self.store
1116-
.write_operation(move |db_ref, batch_pipe| {
1118+
.write_operation("queue_truncate", move |db_ref, batch_pipe| {
11171119
let queue_item_schema = QueueItemRocksTable::new(db_ref.clone());
11181120
queue_item_schema.truncate(batch_pipe)?;
11191121

@@ -1137,7 +1139,7 @@ impl CacheStore for RocksCacheStore {
11371139
heartbeat_timeout: Option<u32>,
11381140
) -> Result<Vec<IdRow<QueueItem>>, CubeError> {
11391141
self.store
1140-
.read_operation(move |db_ref| {
1142+
.read_operation("queue_to_cancel", move |db_ref| {
11411143
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
11421144
let index_key = QueueItemIndexKey::ByPrefix(prefix);
11431145
let items =
@@ -1161,7 +1163,7 @@ impl CacheStore for RocksCacheStore {
11611163
with_payload: bool,
11621164
) -> Result<Vec<QueueListItem>, CubeError> {
11631165
self.store
1164-
.read_operation(move |db_ref| {
1166+
.read_operation("queue_list", move |db_ref| {
11651167
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
11661168

11671169
let items = if let Some(status_filter) = status_filter {
@@ -1210,7 +1212,7 @@ impl CacheStore for RocksCacheStore {
12101212

12111213
async fn queue_get(&self, key: QueueKey) -> Result<Option<QueueGetResponse>, CubeError> {
12121214
self.store
1213-
.read_operation(move |db_ref| {
1215+
.read_operation("queue_get", move |db_ref| {
12141216
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
12151217

12161218
if let Some(item_row) = queue_schema.get_row_by_key(key)? {
@@ -1238,7 +1240,7 @@ impl CacheStore for RocksCacheStore {
12381240

12391241
async fn queue_cancel(&self, key: QueueKey) -> Result<Option<QueueCancelResponse>, CubeError> {
12401242
self.store
1241-
.write_operation(move |db_ref, batch_pipe| {
1243+
.write_operation("queue_cancel", move |db_ref, batch_pipe| {
12421244
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
12431245
let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone());
12441246

@@ -1267,7 +1269,7 @@ impl CacheStore for RocksCacheStore {
12671269

12681270
async fn queue_heartbeat(&self, key: QueueKey) -> Result<(), CubeError> {
12691271
self.store
1270-
.write_operation(move |db_ref, batch_pipe| {
1272+
.write_operation("queue_heartbeat", move |db_ref, batch_pipe| {
12711273
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
12721274
let id_row_opt = queue_schema.get_row_by_key(key.clone())?;
12731275

@@ -1292,7 +1294,7 @@ impl CacheStore for RocksCacheStore {
12921294
allow_concurrency: u32,
12931295
) -> Result<QueueRetrieveResponse, CubeError> {
12941296
self.store
1295-
.write_operation(move |db_ref, batch_pipe| {
1297+
.write_operation("queue_retrieve_by_path", move |db_ref, batch_pipe| {
12961298
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
12971299
let prefix = QueueItem::parse_path(path.clone())
12981300
.0
@@ -1368,7 +1370,7 @@ impl CacheStore for RocksCacheStore {
13681370

13691371
async fn queue_ack(&self, key: QueueKey, result: Option<String>) -> Result<bool, CubeError> {
13701372
self.store
1371-
.write_operation(move |db_ref, batch_pipe| {
1373+
.write_operation("queue_ack", move |db_ref, batch_pipe| {
13721374
let queue_item_tbl = QueueItemRocksTable::new(db_ref.clone());
13731375
let queue_item_payload_tbl = QueueItemPayloadRocksTable::new(db_ref.clone());
13741376

@@ -1470,7 +1472,7 @@ impl CacheStore for RocksCacheStore {
14701472

14711473
async fn queue_merge_extra(&self, key: QueueKey, payload: String) -> Result<(), CubeError> {
14721474
self.store
1473-
.write_operation(move |db_ref, batch_pipe| {
1475+
.write_operation("queue_merge_extra", move |db_ref, batch_pipe| {
14741476
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
14751477
let id_row_opt = queue_schema.get_row_by_key(key.clone())?;
14761478

0 commit comments

Comments
 (0)