diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index 08e8045ab8d73..e82e22803c1d1 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -15,7 +15,7 @@ use std::env; use crate::metastore::{ BaseRocksStoreFs, BatchPipe, DbTableRef, IdRow, MetaStoreEvent, MetaStoreFs, RocksPropertyRow, - RocksStore, RocksStoreDetails, RocksTable, RocksTableStats, + RocksStore, RocksStoreDetails, RocksStoreRWLoop, RocksTable, RocksTableStats, }; use crate::remotefs::LocalDirRemoteFs; use crate::util::WorkerLoop; @@ -184,6 +184,7 @@ pub struct RocksCacheStore { cache_eviction_manager: CacheEvictionManager, upload_loop: Arc, metrics_loop: Arc, + rw_loop_queue_cf: RocksStoreRWLoop, } impl RocksCacheStore { @@ -222,6 +223,7 @@ impl RocksCacheStore { cache_eviction_manager, upload_loop: Arc::new(WorkerLoop::new("Cachestore upload")), metrics_loop: Arc::new(WorkerLoop::new("Cachestore metrics")), + rw_loop_queue_cf: RocksStoreRWLoop::new("queue"), })) } @@ -500,29 +502,60 @@ impl RocksCacheStore { } impl RocksCacheStore { - async fn queue_result_delete_by_id(&self, id: u64) -> Result<(), CubeError> { + #[inline(always)] + pub async fn write_operation_queue( + &self, + op_name: &'static str, + f: F, + ) -> Result + where + F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result + + Send + + Sync + + 'static, + R: Send + Sync + 'static, + { self.store - .write_operation("queue_result_delete_by_id", move |db_ref, batch_pipe| { - let result_schema = QueueResultRocksTable::new(db_ref.clone()); - result_schema.try_delete(id, batch_pipe)?; + .write_operation_impl::(&self.rw_loop_queue_cf, op_name, f) + .await + } - Ok(()) - }) + #[inline(always)] + pub async fn read_operation_queue( + &self, + op_name: &'static str, + f: F, + ) -> Result + where + F: for<'a> FnOnce(DbTableRef<'a>) -> Result + Send + Sync + 'static, + R: Send + Sync + 'static, + { + self.store + .read_operation_impl::(&self.rw_loop_queue_cf, op_name, f) .await } + async fn queue_result_delete_by_id(&self, id: u64) -> Result<(), CubeError> { + self.write_operation_queue("queue_result_delete_by_id", move |db_ref, batch_pipe| { + let result_schema = QueueResultRocksTable::new(db_ref.clone()); + result_schema.try_delete(id, batch_pipe)?; + + Ok(()) + }) + .await + } + /// This method should be called when we are sure that we return data to the consumer async fn queue_result_ready_to_delete(&self, id: u64) -> Result<(), CubeError> { - self.store - .write_operation("queue_result_ready_to_delete", move |db_ref, batch_pipe| { - let result_schema = QueueResultRocksTable::new(db_ref.clone()); - if let Some(row) = result_schema.get_row(id)? { - Self::queue_result_ready_to_delete_impl(&result_schema, batch_pipe, row)?; - } + self.write_operation_queue("queue_result_ready_to_delete", move |db_ref, batch_pipe| { + let result_schema = QueueResultRocksTable::new(db_ref.clone()); + if let Some(row) = result_schema.get_row(id)? { + Self::queue_result_ready_to_delete_impl(&result_schema, batch_pipe, row)?; + } - Ok(()) - }) - .await + Ok(()) + }) + .await } /// This method should be called when we are sure that we return data to the consumer @@ -554,33 +587,32 @@ impl RocksCacheStore { &self, key: QueueKey, ) -> Result, CubeError> { - self.store - .write_operation("lookup_queue_result_by_key", move |db_ref, batch_pipe| { - let result_schema = QueueResultRocksTable::new(db_ref.clone()); - let query_key_is_path = key.is_path(); - let queue_result = result_schema.get_row_by_key(key.clone())?; - - if let Some(queue_result) = queue_result { - if query_key_is_path { - if queue_result.get_row().is_deleted() { - Ok(None) - } else { - Self::queue_result_ready_to_delete_impl( - &result_schema, - batch_pipe, - queue_result, - ) - } + self.write_operation_queue("lookup_queue_result_by_key", move |db_ref, batch_pipe| { + let result_schema = QueueResultRocksTable::new(db_ref.clone()); + let query_key_is_path = key.is_path(); + let queue_result = result_schema.get_row_by_key(key.clone())?; + + if let Some(queue_result) = queue_result { + if query_key_is_path { + if queue_result.get_row().is_deleted() { + Ok(None) } else { - Ok(Some(QueueResultResponse::Success { - value: Some(queue_result.into_row().value), - })) + Self::queue_result_ready_to_delete_impl( + &result_schema, + batch_pipe, + queue_result, + ) } } else { - Ok(None) + Ok(Some(QueueResultResponse::Success { + value: Some(queue_result.into_row().value), + })) } - }) - .await + } else { + Ok(None) + } + }) + .await } fn filter_to_cancel( @@ -1040,94 +1072,89 @@ impl CacheStore for RocksCacheStore { &self, limit: Option, ) -> Result>, CubeError> { - self.store - .read_operation("queue_results_all", move |db_ref| { - Ok(QueueResultRocksTable::new(db_ref).scan_rows(limit)?) - }) - .await + self.read_operation_queue("queue_results_all", move |db_ref| { + Ok(QueueResultRocksTable::new(db_ref).scan_rows(limit)?) + }) + .await } async fn queue_results_multi_delete(&self, ids: Vec) -> Result<(), CubeError> { - self.store - .write_operation("queue_results_multi_delete", move |db_ref, batch_pipe| { - let queue_result_schema = QueueResultRocksTable::new(db_ref); + self.write_operation_queue("queue_results_multi_delete", move |db_ref, batch_pipe| { + let queue_result_schema = QueueResultRocksTable::new(db_ref); - for id in ids { - queue_result_schema.try_delete(id, batch_pipe)?; - } + for id in ids { + queue_result_schema.try_delete(id, batch_pipe)?; + } - Ok(()) - }) - .await + Ok(()) + }) + .await } async fn queue_add(&self, payload: QueueAddPayload) -> Result { - self.store - .write_operation("queue_add", move |db_ref, batch_pipe| { - let queue_schema = QueueItemRocksTable::new(db_ref.clone()); - let pending = queue_schema.count_rows_by_index( - &QueueItemIndexKey::ByPrefixAndStatus( - QueueItem::extract_prefix(payload.path.clone()).unwrap_or("".to_string()), - QueueItemStatus::Pending, - ), - &QueueItemRocksIndex::ByPrefixAndStatus, - )?; - - let index_key = QueueItemIndexKey::ByPath(payload.path.clone()); - let id_row_opt = queue_schema - .get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?; + self.write_operation_queue("queue_add", move |db_ref, batch_pipe| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let pending = queue_schema.count_rows_by_index( + &QueueItemIndexKey::ByPrefixAndStatus( + QueueItem::extract_prefix(payload.path.clone()).unwrap_or("".to_string()), + QueueItemStatus::Pending, + ), + &QueueItemRocksIndex::ByPrefixAndStatus, + )?; - let (id, added) = if let Some(row) = id_row_opt { - (row.id, false) - } else { - let queue_item_row = queue_schema.insert( - QueueItem::new( - payload.path, - QueueItem::status_default(), - payload.priority, - payload.orphaned.clone(), - ), - batch_pipe, - )?; + let index_key = QueueItemIndexKey::ByPath(payload.path.clone()); + let id_row_opt = queue_schema + .get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?; - let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone()); - queue_payload_schema.insert_with_pk( - queue_item_row.id, - QueueItemPayload::new( - payload.value, - queue_item_row.row.get_created().clone(), - queue_item_row.row.get_expire().clone(), - ), - batch_pipe, - )?; + let (id, added) = if let Some(row) = id_row_opt { + (row.id, false) + } else { + let queue_item_row = queue_schema.insert( + QueueItem::new( + payload.path, + QueueItem::status_default(), + payload.priority, + payload.orphaned.clone(), + ), + batch_pipe, + )?; + let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone()); + queue_payload_schema.insert_with_pk( + queue_item_row.id, + QueueItemPayload::new( + payload.value, + queue_item_row.row.get_created().clone(), + queue_item_row.row.get_expire().clone(), + ), + batch_pipe, + )?; - (queue_item_row.id, true) - }; + (queue_item_row.id, true) + }; - Ok(QueueAddResponse { - id, - added, - pending: if added { pending + 1 } else { pending }, - }) + Ok(QueueAddResponse { + id, + added, + pending: if added { pending + 1 } else { pending }, }) - .await + }) + .await } async fn queue_truncate(&self) -> Result<(), CubeError> { - self.store - .write_operation("queue_truncate", move |db_ref, batch_pipe| { - let queue_item_schema = QueueItemRocksTable::new(db_ref.clone()); - queue_item_schema.truncate(batch_pipe)?; + self.write_operation_queue("queue_truncate", move |db_ref, batch_pipe| { + let queue_item_schema = QueueItemRocksTable::new(db_ref.clone()); + queue_item_schema.truncate(batch_pipe)?; - let queue_item_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone()); - queue_item_payload_schema.truncate(batch_pipe)?; + let queue_item_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone()); + queue_item_payload_schema.truncate(batch_pipe)?; - let queue_result_schema = QueueResultRocksTable::new(db_ref); - queue_result_schema.truncate(batch_pipe)?; + let queue_result_schema = QueueResultRocksTable::new(db_ref); + queue_result_schema.truncate(batch_pipe)?; - Ok(()) - }) - .await?; + Ok(()) + }) + .await?; Ok(()) } @@ -1138,21 +1165,20 @@ impl CacheStore for RocksCacheStore { orphaned_timeout: Option, heartbeat_timeout: Option, ) -> Result>, CubeError> { - self.store - .read_operation("queue_to_cancel", move |db_ref| { - let queue_schema = QueueItemRocksTable::new(db_ref.clone()); - let index_key = QueueItemIndexKey::ByPrefix(prefix); - let items = - queue_schema.get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefix)?; - - Ok(Self::filter_to_cancel( - db_ref.start_time.clone(), - items, - orphaned_timeout, - heartbeat_timeout, - )) - }) - .await + self.read_operation_queue("queue_to_cancel", move |db_ref| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let index_key = QueueItemIndexKey::ByPrefix(prefix); + let items = + queue_schema.get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefix)?; + + Ok(Self::filter_to_cancel( + db_ref.start_time.clone(), + items, + orphaned_timeout, + heartbeat_timeout, + )) + }) + .await } async fn queue_list( @@ -1162,130 +1188,124 @@ impl CacheStore for RocksCacheStore { priority_sort: bool, with_payload: bool, ) -> Result, CubeError> { - self.store - .read_operation("queue_list", move |db_ref| { - let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + self.read_operation_queue("queue_list", move |db_ref| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); - let items = if let Some(status_filter) = status_filter { - let index_key = QueueItemIndexKey::ByPrefixAndStatus(prefix, status_filter); - queue_schema - .get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefixAndStatus)? - } else { - let index_key = QueueItemIndexKey::ByPrefix(prefix); - queue_schema.get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefix)? - }; - - let items = if priority_sort { - items - .into_iter() - .sorted_by(|a, b| b.row.cmp(&a.row)) - .collect() - } else { - items - }; + let items = if let Some(status_filter) = status_filter { + let index_key = QueueItemIndexKey::ByPrefixAndStatus(prefix, status_filter); + queue_schema + .get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefixAndStatus)? + } else { + let index_key = QueueItemIndexKey::ByPrefix(prefix); + queue_schema.get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefix)? + }; - if with_payload { - let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone()); - let mut res = Vec::with_capacity(items.len()); + let items = if priority_sort { + items + .into_iter() + .sorted_by(|a, b| b.row.cmp(&a.row)) + .collect() + } else { + items + }; - for item in items { - if let Some(payload_row) = queue_payload_schema.get_row(item.get_id())? { - res.push(QueueListItem::WithPayload( - item, - payload_row.into_row().value, - )); - } else { - res.push(QueueListItem::ItemOnly(item)); - } + if with_payload { + let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone()); + let mut res = Vec::with_capacity(items.len()); + + for item in items { + if let Some(payload_row) = queue_payload_schema.get_row(item.get_id())? { + res.push(QueueListItem::WithPayload( + item, + payload_row.into_row().value, + )); + } else { + res.push(QueueListItem::ItemOnly(item)); } - - Ok(res) - } else { - Ok(items - .into_iter() - .map(|item| QueueListItem::ItemOnly(item)) - .collect()) } - }) - .await + + Ok(res) + } else { + Ok(items + .into_iter() + .map(|item| QueueListItem::ItemOnly(item)) + .collect()) + } + }) + .await } async fn queue_get(&self, key: QueueKey) -> Result, CubeError> { - self.store - .read_operation("queue_get", move |db_ref| { - let queue_schema = QueueItemRocksTable::new(db_ref.clone()); - - if let Some(item_row) = queue_schema.get_row_by_key(key)? { - let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone()); + self.read_operation_queue("queue_get", move |db_ref| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); - if let Some(payload_row) = queue_payload_schema.get_row(item_row.get_id())? { - Ok(Some(QueueGetResponse { - extra: item_row.into_row().extra, - payload: payload_row.into_row().value, - })) - } else { - error!( - "Unable to find payload for queue item, id = {}", - item_row.get_id() - ); + if let Some(item_row) = queue_schema.get_row_by_key(key)? { + let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone()); - Ok(None) - } + if let Some(payload_row) = queue_payload_schema.get_row(item_row.get_id())? { + Ok(Some(QueueGetResponse { + extra: item_row.into_row().extra, + payload: payload_row.into_row().value, + })) } else { + error!( + "Unable to find payload for queue item, id = {}", + item_row.get_id() + ); + Ok(None) } - }) - .await + } else { + Ok(None) + } + }) + .await } async fn queue_cancel(&self, key: QueueKey) -> Result, CubeError> { - self.store - .write_operation("queue_cancel", move |db_ref, batch_pipe| { - let queue_schema = QueueItemRocksTable::new(db_ref.clone()); - let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone()); - - if let Some(id_row) = queue_schema.get_row_by_key(key)? { - let row_id = id_row.get_id(); - let queue_item = queue_schema.delete_row(id_row, batch_pipe)?; - - if let Some(queue_payload) = - queue_payload_schema.try_delete(row_id, batch_pipe)? - { - Ok(Some(QueueCancelResponse { - extra: queue_item.into_row().extra, - value: queue_payload.into_row().value, - })) - } else { - error!("Unable to find payload for queue item, id = {}", row_id); - - Ok(None) - } + self.write_operation_queue("queue_cancel", move |db_ref, batch_pipe| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone()); + + if let Some(id_row) = queue_schema.get_row_by_key(key)? { + let row_id = id_row.get_id(); + let queue_item = queue_schema.delete_row(id_row, batch_pipe)?; + + if let Some(queue_payload) = queue_payload_schema.try_delete(row_id, batch_pipe)? { + Ok(Some(QueueCancelResponse { + extra: queue_item.into_row().extra, + value: queue_payload.into_row().value, + })) } else { + error!("Unable to find payload for queue item, id = {}", row_id); + Ok(None) } - }) - .await + } else { + Ok(None) + } + }) + .await } async fn queue_heartbeat(&self, key: QueueKey) -> Result<(), CubeError> { - self.store - .write_operation("queue_heartbeat", move |db_ref, batch_pipe| { - let queue_schema = QueueItemRocksTable::new(db_ref.clone()); - let id_row_opt = queue_schema.get_row_by_key(key.clone())?; + self.write_operation_queue("queue_heartbeat", move |db_ref, batch_pipe| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let id_row_opt = queue_schema.get_row_by_key(key.clone())?; - if let Some(id_row) = id_row_opt { - let mut new = id_row.get_row().clone(); - new.update_heartbeat(); + if let Some(id_row) = id_row_opt { + let mut new = id_row.get_row().clone(); + new.update_heartbeat(); - queue_schema.update(id_row.id, new, id_row.get_row(), batch_pipe)?; - Ok(()) - } else { - trace!("Unable to update heartbeat, unknown key: {:?}", key); + queue_schema.update(id_row.id, new, id_row.get_row(), batch_pipe)?; + Ok(()) + } else { + trace!("Unable to update heartbeat, unknown key: {:?}", key); - Ok(()) - } - }) - .await + Ok(()) + } + }) + .await } async fn queue_retrieve_by_path( @@ -1293,125 +1313,120 @@ impl CacheStore for RocksCacheStore { path: String, allow_concurrency: u32, ) -> Result { - self.store - .write_operation("queue_retrieve_by_path", move |db_ref, batch_pipe| { - let queue_schema = QueueItemRocksTable::new(db_ref.clone()); - let prefix = QueueItem::parse_path(path.clone()) - .0 - .unwrap_or("".to_string()); - - let mut pending = queue_schema.count_rows_by_index( - &QueueItemIndexKey::ByPrefixAndStatus(prefix.clone(), QueueItemStatus::Pending), + self.write_operation_queue("queue_retrieve_by_path", move |db_ref, batch_pipe| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let prefix = QueueItem::parse_path(path.clone()) + .0 + .unwrap_or("".to_string()); + let mut pending = queue_schema.count_rows_by_index( + &QueueItemIndexKey::ByPrefixAndStatus(prefix.clone(), QueueItemStatus::Pending), + &QueueItemRocksIndex::ByPrefixAndStatus, + )?; + + let mut active: Vec = queue_schema + .get_rows_by_index( + &QueueItemIndexKey::ByPrefixAndStatus(prefix, QueueItemStatus::Active), &QueueItemRocksIndex::ByPrefixAndStatus, - )?; - - let mut active: Vec = queue_schema - .get_rows_by_index( - &QueueItemIndexKey::ByPrefixAndStatus(prefix, QueueItemStatus::Active), - &QueueItemRocksIndex::ByPrefixAndStatus, - )? - .into_iter() - .map(|item| item.into_row().key) - .collect(); - if active.len() >= (allow_concurrency as usize) { - return Ok(QueueRetrieveResponse::NotEnoughConcurrency { pending, active }); - } - - let id_row = queue_schema.get_single_opt_row_by_index( - &QueueItemIndexKey::ByPath(path.clone()), - &QueueItemRocksIndex::ByPath, - )?; - let id_row = if let Some(id_row) = id_row { - id_row - } else { - return Ok(QueueRetrieveResponse::NotFound { pending, active }); - }; + )? + .into_iter() + .map(|item| item.into_row().key) + .collect(); + if active.len() >= (allow_concurrency as usize) { + return Ok(QueueRetrieveResponse::NotEnoughConcurrency { pending, active }); + } - if id_row.get_row().get_status() == &QueueItemStatus::Pending { - let mut new = id_row.get_row().clone(); - new.status = QueueItemStatus::Active; - // It's important to insert heartbeat, because - // without that created datetime will be used for orphaned filtering - new.update_heartbeat(); + let id_row = queue_schema.get_single_opt_row_by_index( + &QueueItemIndexKey::ByPath(path.clone()), + &QueueItemRocksIndex::ByPath, + )?; + let id_row = if let Some(id_row) = id_row { + id_row + } else { + return Ok(QueueRetrieveResponse::NotFound { pending, active }); + }; - let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone()); + if id_row.get_row().get_status() == &QueueItemStatus::Pending { + let mut new = id_row.get_row().clone(); + new.status = QueueItemStatus::Active; + // It's important to insert heartbeat, because + // without that created datetime will be used for orphaned filtering + new.update_heartbeat(); - let res = - queue_schema.update(id_row.get_id(), new, id_row.get_row(), batch_pipe)?; - let payload = if let Some(r) = queue_payload_schema.get_row(res.get_id())? { - r.into_row().value - } else { - error!( - "Unable to find payload for queue item, id = {}", - res.get_id() - ); + let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone()); - queue_schema.delete_row(res, batch_pipe)?; + let res = + queue_schema.update(id_row.get_id(), new, id_row.get_row(), batch_pipe)?; + let payload = if let Some(r) = queue_payload_schema.get_row(res.get_id())? { + r.into_row().value + } else { + error!( + "Unable to find payload for queue item, id = {}", + res.get_id() + ); - return Ok(QueueRetrieveResponse::NotFound { pending, active }); - }; + queue_schema.delete_row(res, batch_pipe)?; - active.push(res.get_row().get_key().clone()); - pending -= 1; + return Ok(QueueRetrieveResponse::NotFound { pending, active }); + }; - Ok(QueueRetrieveResponse::Success { - id: id_row.get_id(), - payload, - item: res.into_row(), - pending, - active, - }) - } else { - Ok(QueueRetrieveResponse::LockFailed { pending, active }) - } - }) - .await + active.push(res.get_row().get_key().clone()); + pending -= 1; + Ok(QueueRetrieveResponse::Success { + id: id_row.get_id(), + payload, + item: res.into_row(), + pending, + active, + }) + } else { + Ok(QueueRetrieveResponse::LockFailed { pending, active }) + } + }) + .await } async fn queue_ack(&self, key: QueueKey, result: Option) -> Result { - self.store - .write_operation("queue_ack", move |db_ref, batch_pipe| { - let queue_item_tbl = QueueItemRocksTable::new(db_ref.clone()); - let queue_item_payload_tbl = QueueItemPayloadRocksTable::new(db_ref.clone()); - - let item_row = queue_item_tbl.get_row_by_key(key.clone())?; - if let Some(item_row) = item_row { - let path = item_row.get_row().get_path(); - let id = item_row.get_id(); - - queue_item_tbl.delete_row(item_row, batch_pipe)?; - queue_item_payload_tbl.try_delete(id, batch_pipe)?; - - if let Some(result) = result { - let queue_result = QueueResult::new(path.clone(), result); - let result_schema = QueueResultRocksTable::new(db_ref.clone()); - // QueueResult is a result of QueueItem, it's why we can use row_id of QueueItem - let result_row = - result_schema.insert_with_pk(id, queue_result, batch_pipe)?; - - batch_pipe.add_event(MetaStoreEvent::AckQueueItem(QueueResultAckEvent { - id, - path, - result: QueueResultAckEventResult::WithResult { - result: Arc::new(result_row.into_row().value), - }, - })); - } else { - batch_pipe.add_event(MetaStoreEvent::AckQueueItem(QueueResultAckEvent { - id, - path, - result: QueueResultAckEventResult::Empty {}, - })); - } - - Ok(true) + self.write_operation_queue("queue_ack", move |db_ref, batch_pipe| { + let queue_item_tbl = QueueItemRocksTable::new(db_ref.clone()); + let queue_item_payload_tbl = QueueItemPayloadRocksTable::new(db_ref.clone()); + + let item_row = queue_item_tbl.get_row_by_key(key.clone())?; + if let Some(item_row) = item_row { + let path = item_row.get_row().get_path(); + let id = item_row.get_id(); + + queue_item_tbl.delete_row(item_row, batch_pipe)?; + queue_item_payload_tbl.try_delete(id, batch_pipe)?; + + if let Some(result) = result { + let queue_result = QueueResult::new(path.clone(), result); + let result_schema = QueueResultRocksTable::new(db_ref.clone()); + // QueueResult is a result of QueueItem, it's why we can use row_id of QueueItem + let result_row = result_schema.insert_with_pk(id, queue_result, batch_pipe)?; + + batch_pipe.add_event(MetaStoreEvent::AckQueueItem(QueueResultAckEvent { + id, + path, + result: QueueResultAckEventResult::WithResult { + result: Arc::new(result_row.into_row().value), + }, + })); } else { - warn!("Unable to ack queue, unknown key: {:?}", key); - - Ok(false) + batch_pipe.add_event(MetaStoreEvent::AckQueueItem(QueueResultAckEvent { + id, + path, + result: QueueResultAckEventResult::Empty {}, + })); } - }) - .await + + Ok(true) + } else { + warn!("Unable to ack queue, unknown key: {:?}", key); + + Ok(false) + } + }) + .await } async fn queue_result_by_path( @@ -1471,22 +1486,22 @@ impl CacheStore for RocksCacheStore { } async fn queue_merge_extra(&self, key: QueueKey, payload: String) -> Result<(), CubeError> { - self.store - .write_operation("queue_merge_extra", move |db_ref, batch_pipe| { - let queue_schema = QueueItemRocksTable::new(db_ref.clone()); - let id_row_opt = queue_schema.get_row_by_key(key.clone())?; + self.write_operation_queue("queue_merge_extra", move |db_ref, batch_pipe| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); - if let Some(id_row) = id_row_opt { - let new = id_row.get_row().merge_extra(payload)?; + let id_row_opt = queue_schema.get_row_by_key(key.clone())?; - queue_schema.update(id_row.id, new, id_row.get_row(), batch_pipe)?; - } else { - warn!("Unable to merge extra, unknown key: {:?}", key); - } + if let Some(id_row) = id_row_opt { + let new = id_row.get_row().merge_extra(payload)?; - Ok(()) - }) - .await + queue_schema.update(id_row.id, new, id_row.get_row(), batch_pipe)?; + } else { + warn!("Unable to merge extra, unknown key: {:?}", key); + } + + Ok(()) + }) + .await } async fn compaction(&self) -> Result<(), CubeError> { diff --git a/rust/cubestore/cubestore/src/metastore/rocks_store.rs b/rust/cubestore/cubestore/src/metastore/rocks_store.rs index cd5b212069e12..9bda9ff02711d 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_store.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_store.rs @@ -805,6 +805,58 @@ pub trait RocksStoreDetails: Send + Sync { fn log_enabled(&self) -> bool; } +pub type RocksStoreRWLoopFn = Box Result<(), CubeError> + Send + 'static>; + +#[derive(Debug, Clone)] +pub struct RocksStoreRWLoop { + name: &'static str, + tx: tokio::sync::mpsc::Sender, + _join_handle: Arc>, +} + +impl RocksStoreRWLoop { + pub fn new(name: &'static str) -> Self { + let (tx, mut rx) = tokio::sync::mpsc::channel::(32_768); + + let join_handle = cube_ext::spawn_blocking(move || loop { + if let Some(fun) = rx.blocking_recv() { + match std::panic::catch_unwind(std::panic::AssertUnwindSafe(fun)) { + Err(panic_payload) => { + let restore_error = CubeError::from_panic_payload(panic_payload); + log::error!("Panic during read write loop execution: {}", restore_error); + } + Ok(res) => { + if let Err(e) = res { + log::error!("Error during read write loop execution: {}", e); + } + } + } + } else { + return; + } + }); + + Self { + name, + tx, + _join_handle: Arc::new(AbortingJoinHandle::new(join_handle)), + } + } + + pub async fn schedule(&self, fun: RocksStoreRWLoopFn) -> Result<(), CubeError> { + self.tx.send(fun).await.map_err(|err| { + CubeError::user(format!( + "Failed to schedule task to RWLoop ({}), error: {}", + self.name, err + )) + }) + } + + pub fn get_name(&self) -> &'static str { + self.name + } +} + #[derive(Clone)] pub struct RocksStore { pub db: Arc, @@ -820,10 +872,7 @@ pub struct RocksStore { snapshot_uploaded: Arc>, snapshots_upload_stopped: Arc>, pub(crate) cached_tables: Arc>>>>, - rw_loop_tx: tokio::sync::mpsc::Sender< - Box Result<(), CubeError> + Send + Sync + 'static>, - >, - _rw_loop_join_handle: Arc>, + rw_loop_default_cf: RocksStoreRWLoop, details: Arc, } @@ -863,28 +912,6 @@ impl RocksStore { let db = details.open_db(path, &config)?; let db_arc = Arc::new(db); - let (rw_loop_tx, mut rw_loop_rx) = tokio::sync::mpsc::channel::< - Box Result<(), CubeError> + Send + Sync + 'static>, - >(32_768); - - let join_handle = cube_ext::spawn_blocking(move || loop { - if let Some(fun) = rw_loop_rx.blocking_recv() { - match std::panic::catch_unwind(std::panic::AssertUnwindSafe(fun)) { - Err(panic_payload) => { - let restore_error = CubeError::from_panic_payload(panic_payload); - log::error!("Panic during read write loop execution: {}", restore_error); - } - Ok(res) => { - if let Err(e) = res { - log::error!("Error during read write loop execution: {}", e); - } - } - } - } else { - return; - } - }); - let meta_store = RocksStore { db: db_arc.clone(), seq_store: Arc::new(Mutex::new(HashMap::new())), @@ -899,8 +926,7 @@ impl RocksStore { snapshots_upload_stopped: Arc::new(AsyncMutex::new(false)), config, cached_tables: Arc::new(Mutex::new(None)), - rw_loop_tx, - _rw_loop_join_handle: Arc::new(AbortingJoinHandle::new(join_handle)), + rw_loop_default_cf: RocksStoreRWLoop::new("default"), details, }; @@ -979,7 +1005,25 @@ impl RocksStore { self.listeners.write().await.push(listener); } + #[inline(always)] pub async fn write_operation(&self, op_name: &'static str, f: F) -> Result + where + F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result + + Send + + Sync + + 'static, + R: Send + Sync + 'static, + { + self.write_operation_impl::(&self.rw_loop_default_cf, op_name, f) + .await + } + + pub async fn write_operation_impl( + &self, + rw_loop: &RocksStoreRWLoop, + op_name: &'static str, + f: F, + ) -> Result where F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result + Send @@ -991,54 +1035,57 @@ impl RocksStore { let mem_seq = MemorySequence::new(self.seq_store.clone()); let db_to_send = db.clone(); let cached_tables = self.cached_tables.clone(); + + let loop_name = rw_loop.get_name(); let store_name = self.details.get_name(); - let span_name = format!("{} write operation {}", store_name, op_name); + let span_name = format!("{}({}) write operation: {}", store_name, loop_name, op_name); - let rw_loop_sender = self.rw_loop_tx.clone(); let (tx, rx) = oneshot::channel::), CubeError>>(); - let res = rw_loop_sender.send(Box::new(move || { - let db_span = warn_long(&span_name, Duration::from_millis(100)); - - let mut batch = BatchPipe::new(db_to_send.as_ref()); - let snapshot = db_to_send.snapshot(); - let res = f( - DbTableRef { - db: db_to_send.as_ref(), - snapshot: &snapshot, - mem_seq, - start_time: Utc::now(), - }, - &mut batch, - ); - match res { - Ok(res) => { - if batch.invalidate_tables_cache { - *cached_tables.lock().unwrap() = None; + let res = rw_loop + .schedule(Box::new(move || { + let db_span = warn_long(&span_name, Duration::from_millis(100)); + + let mut batch = BatchPipe::new(db_to_send.as_ref()); + let snapshot = db_to_send.snapshot(); + let res = f( + DbTableRef { + db: db_to_send.as_ref(), + snapshot: &snapshot, + mem_seq, + start_time: Utc::now(), + }, + &mut batch, + ); + match res { + Ok(res) => { + if batch.invalidate_tables_cache { + *cached_tables.lock().unwrap() = None; + } + let write_result = batch.batch_write_rows()?; + tx.send(Ok((res, write_result))).map_err(|_| { + CubeError::internal(format!( + "[{}-{}] Write operation result receiver has been dropped", + store_name, loop_name + )) + })?; + } + Err(e) => { + tx.send(Err(e)).map_err(|_| { + CubeError::internal(format!( + "[{}-{}] Write operation result receiver has been dropped", + store_name, loop_name + )) + })?; } - let write_result = batch.batch_write_rows()?; - tx.send(Ok((res, write_result))).map_err(|_| { - CubeError::internal(format!( - "[{}] Write operation result receiver has been dropped", - store_name - )) - })?; - } - Err(e) => { - tx.send(Err(e)).map_err(|_| { - CubeError::internal(format!( - "[{}] Write operation result receiver has been dropped", - store_name - )) - })?; } - } - mem::drop(db_span); + mem::drop(db_span); - Ok(()) - })); - if let Err(e) = res.await { + Ok(()) + })) + .await; + if let Err(e) = res { log::error!( "[{}] Error during scheduling write task in loop: {}", store_name, @@ -1300,21 +1347,35 @@ impl RocksStore { Ok((remote_path, checkpoint_path)) } + #[inline(always)] pub async fn read_operation(&self, op_name: &'static str, f: F) -> Result + where + F: for<'a> FnOnce(DbTableRef<'a>) -> Result + Send + Sync + 'static, + R: Send + Sync + 'static, + { + self.read_operation_impl::(&self.rw_loop_default_cf, op_name, f) + .await + } + + pub async fn read_operation_impl( + &self, + rw_loop: &RocksStoreRWLoop, + op_name: &'static str, + f: F, + ) -> Result where F: for<'a> FnOnce(DbTableRef<'a>) -> Result + Send + Sync + 'static, R: Send + Sync + 'static, { let mem_seq = MemorySequence::new(self.seq_store.clone()); let db_to_send = self.db.clone(); - let store_name = self.details.get_name(); - - let rw_loop_sender = self.rw_loop_tx.clone(); let (tx, rx) = oneshot::channel::>(); - let span_name = format!("{} read operation {}", store_name, op_name); + let loop_name = rw_loop.get_name(); + let store_name = self.details.get_name(); + let span_name = format!("{}({}) read operation: {}", store_name, loop_name, op_name); - let res = rw_loop_sender.send(Box::new(move || { + let res = rw_loop.schedule(Box::new(move || { let db_span = warn_long(&span_name, Duration::from_millis(100)); let snapshot = db_to_send.snapshot(); @@ -1327,8 +1388,8 @@ impl RocksStore { tx.send(res).map_err(|_| { CubeError::internal(format!( - "[{}] Read operation result receiver has been dropped", - store_name + "[{}-{}] Read operation result receiver has been dropped", + store_name, loop_name )) })?;