From ce69613eb46b9184c79dd7ee4d8ce6902aa9dc50 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 15 Aug 2025 14:18:48 +0200 Subject: [PATCH] chore(cubestore): Track op name for out of queue operations --- .../src/cachestore/cache_eviction_manager.rs | 7 +- .../src/cachestore/cache_rocksstore.rs | 6 +- rust/cubestore/cubestore/src/metastore/mod.rs | 302 ++++++++++-------- .../cubestore/src/metastore/rocks_store.rs | 24 +- 4 files changed, 188 insertions(+), 151 deletions(-) diff --git a/rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs b/rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs index 141e7ee4fc83a..a0807b9bf5c55 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs @@ -575,7 +575,7 @@ impl CacheEvictionManager { store: &Arc, ) -> Result { let (expired, stats_total_keys, stats_total_raw_size) = store - .read_operation_out_of_queue(move |db_ref| { + .read_operation_out_of_queue("collect_stats_and_expired_keys", move |db_ref| { let mut stats_total_keys: u32 = 0; let mut stats_total_raw_size: u64 = 0; @@ -624,7 +624,7 @@ impl CacheEvictionManager { let eviction_proactive_size_threshold = self.eviction_proactive_size_threshold; let (all_keys, stats_total_keys, stats_total_raw_size, expired_keys) = store - .read_operation_out_of_queue(move |db_ref| { + .read_operation_out_of_queue("collect_allkeys_to_evict", move |db_ref| { let mut stats_total_keys: u32 = 0; let mut stats_total_raw_size: u64 = 0; @@ -768,7 +768,7 @@ impl CacheEvictionManager { let eviction_proactive_size_threshold = self.eviction_proactive_size_threshold; let to_delete: Vec<(u64, u32)> = store - .read_operation_out_of_queue(move |db_ref| { + .read_operation_out_of_queue("do_eviction_by_sampling", move |db_ref| { let mut pending_volume_remove: u64 = 0; let now_at_start = Utc::now(); @@ -1146,6 +1146,7 @@ impl CacheEvictionManager { let _ = store .read_operation_out_of_queue_opt( + "check_compaction_trigger", |db_ref| { let start: Option<&[u8]> = None; let end: Option<&[u8]> = None; diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index 4e5165ddb372b..d48cb41a96a1c 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -876,7 +876,7 @@ pub trait CacheStore: DIService + Send + Sync { impl CacheStore for RocksCacheStore { async fn cache_all(&self, limit: Option) -> Result>, CubeError> { self.store - .read_operation_out_of_queue(move |db_ref| { + .read_operation_out_of_queue("cache_all", move |db_ref| { Ok(CacheItemRocksTable::new(db_ref).scan_rows(limit)?) }) .await @@ -1510,7 +1510,7 @@ impl CacheStore for RocksCacheStore { async fn compaction(&self) -> Result<(), CubeError> { self.store - .read_operation_out_of_queue(move |db_ref| { + .read_operation_out_of_queue("compaction", move |db_ref| { let start: Option<&[u8]> = None; let end: Option<&[u8]> = None; @@ -1525,7 +1525,7 @@ impl CacheStore for RocksCacheStore { async fn info(&self) -> Result { self.store - .read_operation_out_of_queue(move |db_ref| { + .read_operation_out_of_queue("info", move |db_ref| { let cache_schema = CacheItemRocksTable::new(db_ref.clone()); let cache_schema_stats = cache_schema.collect_table_stats_by_extended_index( &CacheItemRocksIndex::ByPath, diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 439cb5dc15d2c..6580edcbcccbc 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -1497,12 +1497,16 @@ impl RocksMetaStore { } #[inline(always)] - pub async fn read_operation_out_of_queue(&self, f: F) -> Result + pub async fn read_operation_out_of_queue( + &self, + op_name: &'static str, + f: F, + ) -> Result where F: for<'a> FnOnce(DbTableRef<'a>) -> Result + Send + Sync + 'static, R: Send + Sync + 'static, { - self.store.read_operation_out_of_queue(f).await + self.store.read_operation_out_of_queue(op_name, f).await } #[inline(always)] @@ -1921,8 +1925,10 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn get_schemas(&self) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| SchemaRocksTable::new(db_ref).all_rows()) - .await + self.read_operation_out_of_queue("get_schemas", move |db_ref| { + SchemaRocksTable::new(db_ref).all_rows() + }) + .await } #[tracing::instrument(level = "trace", skip(self))] @@ -2362,8 +2368,10 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn get_tables(&self) -> Result>, CubeError> { - self.read_operation_out_of_queue(|db_ref| TableRocksTable::new(db_ref).all_rows()) - .await + self.read_operation_out_of_queue("get_tables", |db_ref| { + TableRocksTable::new(db_ref).all_rows() + }) + .await } #[tracing::instrument(level = "trace", skip(self))] @@ -2372,7 +2380,7 @@ impl MetaStore for RocksMetaStore { include_non_ready: bool, ) -> Result>, CubeError> { if include_non_ready { - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("get_tables_with_path(true)", move |db_ref| { let tables = TableRocksTable::new(db_ref.clone()).all_rows()?; let schemas = SchemaRocksTable::new(db_ref); let tables = Arc::new(schemas.build_path_rows( @@ -2428,7 +2436,7 @@ impl MetaStore for RocksMetaStore { &self, created_seconds_ago: i64, ) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("not_ready_tables", move |db_ref| { let table_rocks_table = TableRocksTable::new(db_ref); let tables = table_rocks_table.scan_all_rows()?; let mut res = Vec::new(); @@ -2492,7 +2500,7 @@ impl MetaStore for RocksMetaStore { &self, partition_id: u64, ) -> Result, CubeError> { - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("get_partition_out_of_queue", move |db_ref| { PartitionRocksTable::new(db_ref).get_row_or_not_found(partition_id) }) .await @@ -2610,7 +2618,7 @@ impl MetaStore for RocksMetaStore { async fn get_all_partitions_and_chunks_out_of_queue( &self, ) -> Result<(Vec>, Vec>), CubeError> { - self.read_operation_out_of_queue(move |db| { + self.read_operation_out_of_queue("get_all_partitions_and_chunks_out_of_queue", move |db| { let partitions = PartitionRocksTable::new(db.clone()).all_rows()?; let chunks = ChunkRocksTable::new(db).all_rows()?; Ok((partitions, chunks)) @@ -2825,7 +2833,7 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn can_delete_partition(&self, partition_id: u64) -> Result { - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("can_delete_partition", move |db_ref| { let partitions_table = PartitionRocksTable::new(db_ref.clone()); let chunks_table = ChunkRocksTable::new(db_ref.clone()); @@ -2860,7 +2868,7 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn can_delete_middle_man_partition(&self, partition_id: u64) -> Result { - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("can_delete_middle_man_partition", move |db_ref| { let partitions_table = PartitionRocksTable::new(db_ref.clone()); let chunks_table = ChunkRocksTable::new(db_ref.clone()); @@ -2900,7 +2908,7 @@ impl MetaStore for RocksMetaStore { async fn all_inactive_partitions_to_repartition( &self, ) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("all_inactive_partitions_to_repartition", move |db_ref| { let partitions_table = PartitionRocksTable::new(db_ref.clone()); let chunks_table = ChunkRocksTable::new(db_ref.clone()); @@ -2934,7 +2942,7 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn all_inactive_middle_man_partitions(&self) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("all_inactive_middle_man_partitions", move |db_ref| { let partitions_table = PartitionRocksTable::new(db_ref.clone()); let chunks_table = ChunkRocksTable::new(db_ref.clone()); @@ -2980,7 +2988,7 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn all_just_created_partitions(&self) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("all_just_created_partitions", move |db_ref| { let partitions_table = PartitionRocksTable::new(db_ref.clone()); let orphaned_partitions = partitions_table.scan_rows_by_index( @@ -3005,37 +3013,40 @@ impl MetaStore for RocksMetaStore { &self, seconds_ago: i64, ) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| { - let chunks_table = ChunkRocksTable::new(db_ref.clone()); - - let now = Utc::now(); - let mut partition_ids = HashSet::new(); - for c in chunks_table.scan_all_rows()? { - let c = c?; - if c.get_row().active() - && c.get_row() - .created_at() - .as_ref() - .map(|created_at| { - now.signed_duration_since(created_at.clone()).num_seconds() - >= seconds_ago - }) - .unwrap_or(false) - { - partition_ids.insert(c.get_row().get_partition_id()); + self.read_operation_out_of_queue( + "get_partitions_with_chunks_created_seconds_ago", + move |db_ref| { + let chunks_table = ChunkRocksTable::new(db_ref.clone()); + + let now = Utc::now(); + let mut partition_ids = HashSet::new(); + for c in chunks_table.scan_all_rows()? { + let c = c?; + if c.get_row().active() + && c.get_row() + .created_at() + .as_ref() + .map(|created_at| { + now.signed_duration_since(created_at.clone()).num_seconds() + >= seconds_ago + }) + .unwrap_or(false) + { + partition_ids.insert(c.get_row().get_partition_id()); + } } - } - let partitions_table = PartitionRocksTable::new(db_ref.clone()); - let mut partitions = Vec::new(); - for id in partition_ids { - if let Some(partition) = partitions_table.get_row(id)? { - partitions.push(partition); + let partitions_table = PartitionRocksTable::new(db_ref.clone()); + let mut partitions = Vec::new(); + for id in partition_ids { + if let Some(partition) = partitions_table.get_row(id)? { + partitions.push(partition); + } } - } - Ok(partitions) - }) + Ok(partitions) + }, + ) .await } @@ -3053,7 +3064,7 @@ impl MetaStore for RocksMetaStore { CubeError, > { let config = self.store.config.clone(); - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("get_partitions_for_in_memory_compaction", move |db_ref| { let chunks_table = ChunkRocksTable::new(db_ref.clone()); let mut partitions_map = HashMap::new(); @@ -3104,7 +3115,7 @@ impl MetaStore for RocksMetaStore { node: String, ) -> Result>, CubeError> { let config = self.store.config.clone(); - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("get_all_node_in_memory_chunks", move |db_ref| { let chunks_table = ChunkRocksTable::new(db_ref.clone()); let partitions_table = PartitionRocksTable::new(db_ref.clone()); @@ -3139,41 +3150,44 @@ impl MetaStore for RocksMetaStore { &self, seconds_ago: i64, ) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| { - let chunks_table = ChunkRocksTable::new(db_ref.clone()); - - let now = Utc::now(); - let mut partitions = HashMap::new(); - for c in chunks_table.scan_all_rows()? { - let c = c?; - if c.get_row().active() - && c.get_row() - .created_at() - .as_ref() - .map(|created_at| { - now.signed_duration_since(created_at.clone()).num_seconds() - >= seconds_ago - }) - .unwrap_or(false) - { - partitions - .entry(c.get_row().get_partition_id()) - .or_insert(vec![]) - .push(c); + self.read_operation_out_of_queue( + "get_chunks_without_partition_created_seconds_ago", + move |db_ref| { + let chunks_table = ChunkRocksTable::new(db_ref.clone()); + + let now = Utc::now(); + let mut partitions = HashMap::new(); + for c in chunks_table.scan_all_rows()? { + let c = c?; + if c.get_row().active() + && c.get_row() + .created_at() + .as_ref() + .map(|created_at| { + now.signed_duration_since(created_at.clone()).num_seconds() + >= seconds_ago + }) + .unwrap_or(false) + { + partitions + .entry(c.get_row().get_partition_id()) + .or_insert(vec![]) + .push(c); + } } - } - let partitions_table = PartitionRocksTable::new(db_ref.clone()); + let partitions_table = PartitionRocksTable::new(db_ref.clone()); - let mut result = Vec::new(); - for (id, mut chunks) in partitions { - if partitions_table.get_row(id)?.is_none() { - result.append(&mut chunks); + let mut result = Vec::new(); + for (id, mut chunks) in partitions { + if partitions_table.get_row(id)?.is_none() { + result.append(&mut chunks); + } } - } - Ok(result) - }) + Ok(result) + }, + ) .await } @@ -3248,7 +3262,7 @@ impl MetaStore for RocksMetaStore { &self, table_id: u64, ) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("get_table_indexes_out_of_queue", move |db_ref| { let index_table = IndexRocksTable::new(db_ref); Ok(index_table .get_rows_by_index(&IndexIndexKey::TableId(table_id), &IndexRocksIndex::TableID)?) @@ -3261,7 +3275,7 @@ impl MetaStore for RocksMetaStore { &self, index_id: u64, ) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("get_active_partitions_by_index_id", move |db_ref| { let rocks_partition = PartitionRocksTable::new(db_ref); // TODO iterate over range Ok(rocks_partition @@ -3288,21 +3302,24 @@ impl MetaStore for RocksMetaStore { &self, index_id: u64, ) -> Result<(IdRow, Vec>), CubeError> { - self.read_operation_out_of_queue(move |db_ref| { - let index = IndexRocksTable::new(db_ref.clone()).get_row_or_not_found(index_id)?; - let rocks_partition = PartitionRocksTable::new(db_ref); - - let partitions = rocks_partition - .get_rows_by_index( - &PartitionIndexKey::ByIndexId(index.get_id()), - &PartitionRocksIndex::IndexId, - )? - .into_iter() - .filter(|r| r.get_row().active) - .collect::>(); - - Ok((index, partitions)) - }) + self.read_operation_out_of_queue( + "get_index_with_active_partitions_out_of_queue", + move |db_ref| { + let index = IndexRocksTable::new(db_ref.clone()).get_row_or_not_found(index_id)?; + let rocks_partition = PartitionRocksTable::new(db_ref); + + let partitions = rocks_partition + .get_rows_by_index( + &PartitionIndexKey::ByIndexId(index.get_id()), + &PartitionRocksIndex::IndexId, + )? + .into_iter() + .filter(|r| r.get_row().active) + .collect::>(); + + Ok((index, partitions)) + }, + ) .await } @@ -3381,44 +3398,47 @@ impl MetaStore for RocksMetaStore { &self, index_id: Vec, ) -> Result, Vec>)>>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| { - let rocks_chunk = ChunkRocksTable::new(db_ref.clone()); - let rocks_partition = PartitionRocksTable::new(db_ref); - - let mut results = Vec::with_capacity(index_id.len()); - for index_id in index_id { - let mut processed = HashSet::new(); - let mut partitions = Vec::new(); - let mut add_with_parents = |mut p: u64| -> Result<(), CubeError> { - loop { - if !processed.insert(p) { - break; - } - let r = rocks_partition.get_row_or_not_found(p)?; - let parent = r.row.parent_partition_id().clone(); - partitions.push((r, Vec::new())); - match parent { - None => break, - Some(parent) => p = parent, + self.read_operation_out_of_queue( + "get_active_partitions_and_chunks_by_index_id_for_select", + move |db_ref| { + let rocks_chunk = ChunkRocksTable::new(db_ref.clone()); + let rocks_partition = PartitionRocksTable::new(db_ref); + + let mut results = Vec::with_capacity(index_id.len()); + for index_id in index_id { + let mut processed = HashSet::new(); + let mut partitions = Vec::new(); + let mut add_with_parents = |mut p: u64| -> Result<(), CubeError> { + loop { + if !processed.insert(p) { + break; + } + let r = rocks_partition.get_row_or_not_found(p)?; + let parent = r.row.parent_partition_id().clone(); + partitions.push((r, Vec::new())); + match parent { + None => break, + Some(parent) => p = parent, + } } + Ok(()) + }; + // TODO iterate over range. + for p in rocks_partition.get_row_ids_by_index( + &PartitionIndexKey::ByIndexId(index_id), + &PartitionRocksIndex::IndexId, + )? { + add_with_parents(p)?; } - Ok(()) - }; - // TODO iterate over range. - for p in rocks_partition.get_row_ids_by_index( - &PartitionIndexKey::ByIndexId(index_id), - &PartitionRocksIndex::IndexId, - )? { - add_with_parents(p)?; - } - for (p, chunks) in &mut partitions { - *chunks = Self::chunks_by_partition(p.id, &rocks_chunk, false)?; + for (p, chunks) in &mut partitions { + *chunks = Self::chunks_by_partition(p.id, &rocks_chunk, false)?; + } + results.push(partitions) } - results.push(partitions) - } - Ok(results) - }) + Ok(results) + }, + ) .await } @@ -3426,7 +3446,7 @@ impl MetaStore for RocksMetaStore { async fn get_warmup_partitions( &self, ) -> Result, Vec>)>, CubeError> { - self.read_operation_out_of_queue(|db| { + self.read_operation_out_of_queue("get_warmup_partitions", |db| { // Do full scan, likely only a small number chunks and partitions are inactive. let mut partition_to_chunks = HashMap::new(); for c in ChunkRocksTable::new(db.clone()).table_scan(db.snapshot)? { @@ -3463,7 +3483,7 @@ impl MetaStore for RocksMetaStore { .await } async fn get_all_filenames(&self) -> Result, CubeError> { - self.read_operation_out_of_queue(|db| { + self.read_operation_out_of_queue("get_all_filenames", |db| { let mut filenames = Vec::new(); for c in ChunkRocksTable::new(db.clone()).table_scan(db.snapshot)? { let c = c?; @@ -3529,7 +3549,7 @@ impl MetaStore for RocksMetaStore { } #[tracing::instrument(level = "trace", skip(self))] async fn get_chunks_out_of_queue(&self, ids: Vec) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db| { + self.read_operation_out_of_queue("get_chunks_out_of_queue", move |db| { let db = ChunkRocksTable::new(db.clone()); let mut res = Vec::with_capacity(ids.len()); for id in ids.into_iter() { @@ -3545,7 +3565,7 @@ impl MetaStore for RocksMetaStore { &self, ids: Vec, ) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db| { + self.read_operation_out_of_queue("get_partitions_out_of_queue", move |db| { let db = PartitionRocksTable::new(db.clone()); let mut res = Vec::with_capacity(ids.len()); for id in ids.into_iter() { @@ -3576,7 +3596,7 @@ impl MetaStore for RocksMetaStore { partition_id: u64, include_inactive: bool, ) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db| { + self.read_operation_out_of_queue("read_operation_out_of_queue", move |db| { Self::chunks_by_partition(partition_id, &ChunkRocksTable::new(db), include_inactive) }) .await @@ -3790,7 +3810,7 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn all_inactive_chunks(&self) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("all_inactive_chunks", move |db_ref| { let table = ChunkRocksTable::new(db_ref); let mut res = Vec::new(); for c in table.scan_all_rows()? { @@ -3806,7 +3826,7 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn all_inactive_not_uploaded_chunks(&self) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("read_operation_out_of_queue", move |db_ref| { let table = ChunkRocksTable::new(db_ref); let mut res = Vec::new(); @@ -3892,8 +3912,10 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn all_jobs(&self) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| Ok(JobRocksTable::new(db_ref).all_rows()?)) - .await + self.read_operation_out_of_queue("all_jobs", move |db_ref| { + Ok(JobRocksTable::new(db_ref).all_rows()?) + }) + .await } #[tracing::instrument(level = "trace", skip(self))] @@ -3947,7 +3969,7 @@ impl MetaStore for RocksMetaStore { orphaned_timeout: Duration, ) -> Result>, CubeError> { let duration = chrono::Duration::from_std(orphaned_timeout).unwrap(); - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("get_orphaned_jobs", move |db_ref| { let jobs_table = JobRocksTable::new(db_ref); let time = Utc::now(); let all_jobs = jobs_table @@ -3977,7 +3999,7 @@ impl MetaStore for RocksMetaStore { .iter() .map(|s| s.to_string()) .collect::>(); - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("get_orphaned_jobs", move |db_ref| { let jobs_table = JobRocksTable::new(db_ref); let all_jobs = jobs_table .all_rows()? @@ -4166,7 +4188,7 @@ impl MetaStore for RocksMetaStore { &self, table_id: u64, ) -> Result>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("read_operation_out_of_queue", move |db_ref| { Ok(ReplayHandleRocksTable::new(db_ref).get_rows_by_index( &ReplayHandleIndexKey::ByTableId(table_id), &ReplayHandleRocksIndex::ByTableId, @@ -4289,7 +4311,7 @@ impl MetaStore for RocksMetaStore { async fn all_replay_handles_to_merge( &self, ) -> Result, bool)>, CubeError> { - self.read_operation_out_of_queue(move |db_ref| { + self.read_operation_out_of_queue("all_replay_handles_to_merge", move |db_ref| { let all_replay_handles = ReplayHandleRocksTable::new(db_ref.clone()).all_rows()?; let chunks_table = ChunkRocksTable::new(db_ref); let mut result = Vec::new(); @@ -4315,7 +4337,7 @@ impl MetaStore for RocksMetaStore { &self, table_name: Vec<(String, String)>, ) -> Result, IdRow, Vec>)>, CubeError> { - self.read_operation_out_of_queue(|db| { + self.read_operation_out_of_queue("table_info_by_name", |db| { let mut r = Vec::with_capacity(table_name.len()); for (schema, table) in table_name { let table = get_table_impl(db.clone(), schema, table)?; @@ -4395,7 +4417,7 @@ impl MetaStore for RocksMetaStore { &self, multi_part_ids: Vec, ) -> Result, CubeError> { - self.read_operation_out_of_queue(move |db| { + self.read_operation_out_of_queue("multi_partitions_with_children", move |db| { let table = MultiPartitionRocksTable::new(db); let mut r = HashMap::new(); for m in multi_part_ids { diff --git a/rust/cubestore/cubestore/src/metastore/rocks_store.rs b/rust/cubestore/cubestore/src/metastore/rocks_store.rs index 9bda9ff02711d..2f66663c5c17c 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_store.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_store.rs @@ -700,7 +700,9 @@ macro_rules! meta_store_table_impl { async fn all_rows(&self) -> Result>, CubeError> { self.rocks_meta_store - .read_operation_out_of_queue(move |db_ref| Ok(Self::table(db_ref).all_rows()?)) + .read_operation_out_of_queue("all_rows", move |db_ref| { + Ok(Self::table(db_ref).all_rows()?) + }) .await } @@ -1413,6 +1415,7 @@ impl RocksStore { pub async fn read_operation_out_of_queue_opt( &self, + op_name: &'static str, f: F, timeout: Duration, ) -> Result @@ -1422,9 +1425,14 @@ impl RocksStore { { let mem_seq = MemorySequence::new(self.seq_store.clone()); let db_to_send = self.db.clone(); + let span_name = format!( + "{} read operation out of queue: {}", + self.details.get_name(), + op_name + ); cube_ext::spawn_blocking(move || { - let db_span = warn_long("store read operation out of queue", timeout); + let db_span = warn_long(&span_name, timeout); let span = tracing::trace_span!("store read operation out of queue"); let span_holder = span.enter(); @@ -1444,12 +1452,16 @@ impl RocksStore { .await? } - pub async fn read_operation_out_of_queue(&self, f: F) -> Result + pub async fn read_operation_out_of_queue( + &self, + op_name: &'static str, + f: F, + ) -> Result where F: for<'a> FnOnce(DbTableRef<'a>) -> Result + Send + Sync + 'static, R: Send + Sync + 'static, { - self.read_operation_out_of_queue_opt::(f, Duration::from_millis(100)) + self.read_operation_out_of_queue_opt::(op_name, f, Duration::from_millis(100)) .await } @@ -1781,7 +1793,9 @@ mod tests { .await .unwrap(); let all_schemas = rocks_store - .read_operation_out_of_queue(move |db_ref| SchemaRocksTable::new(db_ref).all_rows()) + .read_operation_out_of_queue("test_snapshot_uplaods", move |db_ref| { + SchemaRocksTable::new(db_ref).all_rows() + }) .await .unwrap(); let expected = vec![