Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ impl CacheEvictionManager {
store: &Arc<RocksStore>,
) -> Result<DeleteBatchResult, CubeError> {
let (deleted_count, deleted_size, skipped) = store
.write_operation(move |db_ref, pipe| {
.write_operation("delete_batch", move |db_ref, pipe| {
let cache_schema = CacheItemRocksTable::new(db_ref.clone());

let mut deleted_count: u32 = 0;
Expand Down Expand Up @@ -951,7 +951,7 @@ impl CacheEvictionManager {
app_metrics::CACHESTORE_TTL_BUFFER.report(buffer_len as i64);

store
.write_operation(move |db_ref, pipe| {
.write_operation("persist_ttl", move |db_ref, pipe| {
let cache_schema = CacheItemRocksTable::new(db_ref.clone());

for (row_id, item) in to_persist.into_iter() {
Expand Down
46 changes: 24 additions & 22 deletions rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl RocksCacheStore {
impl RocksCacheStore {
async fn queue_result_delete_by_id(&self, id: u64) -> Result<(), CubeError> {
self.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("queue_result_delete_by_id", move |db_ref, batch_pipe| {
let result_schema = QueueResultRocksTable::new(db_ref.clone());
result_schema.try_delete(id, batch_pipe)?;

Expand All @@ -514,7 +514,7 @@ impl RocksCacheStore {
/// This method should be called when we are sure that we return data to the consumer
async fn queue_result_ready_to_delete(&self, id: u64) -> Result<(), CubeError> {
self.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("queue_result_ready_to_delete", move |db_ref, batch_pipe| {
let result_schema = QueueResultRocksTable::new(db_ref.clone());
if let Some(row) = result_schema.get_row(id)? {
Self::queue_result_ready_to_delete_impl(&result_schema, batch_pipe, row)?;
Expand Down Expand Up @@ -555,7 +555,7 @@ impl RocksCacheStore {
key: QueueKey,
) -> Result<Option<QueueResultResponse>, CubeError> {
self.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("lookup_queue_result_by_key", move |db_ref, batch_pipe| {
let result_schema = QueueResultRocksTable::new(db_ref.clone());
let query_key_is_path = key.is_path();
let queue_result = result_schema.get_row_by_key(key.clone())?;
Expand Down Expand Up @@ -866,7 +866,7 @@ impl CacheStore for RocksCacheStore {

let (result, inserted) = self
.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("cache_set", move |db_ref, batch_pipe| {
let cache_schema = CacheItemRocksTable::new(db_ref.clone());
let index_key = CacheItemIndexKey::ByPath(item.get_path());
let id_row_opt = cache_schema
Expand Down Expand Up @@ -900,7 +900,7 @@ impl CacheStore for RocksCacheStore {

let result = self
.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("cache_truncate", move |db_ref, batch_pipe| {
let cache_schema = CacheItemRocksTable::new(db_ref);
cache_schema.truncate(batch_pipe)?;

Expand All @@ -917,7 +917,7 @@ impl CacheStore for RocksCacheStore {
async fn cache_delete(&self, key: String) -> Result<(), CubeError> {
let result = self
.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("cache_delete", move |db_ref, batch_pipe| {
let cache_schema = CacheItemRocksTable::new(db_ref.clone());
let index_key = CacheItemIndexKey::ByPath(key);
let row_opt = cache_schema
Expand Down Expand Up @@ -947,7 +947,7 @@ impl CacheStore for RocksCacheStore {
async fn cache_get(&self, key: String) -> Result<Option<IdRow<CacheItem>>, CubeError> {
let res = self
.store
.read_operation(move |db_ref| {
.read_operation("cache_get", move |db_ref| {
let cache_schema = CacheItemRocksTable::new(db_ref.clone());
let index_key = CacheItemIndexKey::ByPath(key);
let id_row_opt = cache_schema
Expand All @@ -966,7 +966,7 @@ impl CacheStore for RocksCacheStore {

async fn cache_keys(&self, prefix: String) -> Result<Vec<IdRow<CacheItem>>, CubeError> {
self.store
.read_operation(move |db_ref| {
.read_operation("cache_keys", move |db_ref| {
let cache_schema = CacheItemRocksTable::new(db_ref.clone());
let index_key =
CacheItemIndexKey::ByPrefix(CacheItem::parse_path_to_prefix(prefix));
Expand All @@ -981,7 +981,7 @@ impl CacheStore for RocksCacheStore {
async fn cache_incr(&self, path: String) -> Result<IdRow<CacheItem>, CubeError> {
let item = self
.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("cache_incr", move |db_ref, batch_pipe| {
let cache_schema = CacheItemRocksTable::new(db_ref.clone());
let index_key = CacheItemIndexKey::ByPath(path.clone());
let id_row_opt = cache_schema
Expand Down Expand Up @@ -1009,7 +1009,7 @@ impl CacheStore for RocksCacheStore {

async fn queue_all(&self, limit: Option<usize>) -> Result<Vec<QueueAllItem>, CubeError> {
self.store
.read_operation(move |db_ref| {
.read_operation("queue_all", move |db_ref| {
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone());

Expand Down Expand Up @@ -1041,13 +1041,15 @@ impl CacheStore for RocksCacheStore {
limit: Option<usize>,
) -> Result<Vec<IdRow<QueueResult>>, CubeError> {
self.store
.read_operation(move |db_ref| Ok(QueueResultRocksTable::new(db_ref).scan_rows(limit)?))
.read_operation("queue_results_all", move |db_ref| {
Ok(QueueResultRocksTable::new(db_ref).scan_rows(limit)?)
})
.await
}

async fn queue_results_multi_delete(&self, ids: Vec<u64>) -> Result<(), CubeError> {
self.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("queue_results_multi_delete", move |db_ref, batch_pipe| {
let queue_result_schema = QueueResultRocksTable::new(db_ref);

for id in ids {
Expand All @@ -1061,7 +1063,7 @@ impl CacheStore for RocksCacheStore {

async fn queue_add(&self, payload: QueueAddPayload) -> Result<QueueAddResponse, CubeError> {
self.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("queue_add", move |db_ref, batch_pipe| {
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
let pending = queue_schema.count_rows_by_index(
&QueueItemIndexKey::ByPrefixAndStatus(
Expand Down Expand Up @@ -1113,7 +1115,7 @@ impl CacheStore for RocksCacheStore {

async fn queue_truncate(&self) -> Result<(), CubeError> {
self.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("queue_truncate", move |db_ref, batch_pipe| {
let queue_item_schema = QueueItemRocksTable::new(db_ref.clone());
queue_item_schema.truncate(batch_pipe)?;

Expand All @@ -1137,7 +1139,7 @@ impl CacheStore for RocksCacheStore {
heartbeat_timeout: Option<u32>,
) -> Result<Vec<IdRow<QueueItem>>, CubeError> {
self.store
.read_operation(move |db_ref| {
.read_operation("queue_to_cancel", move |db_ref| {
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
let index_key = QueueItemIndexKey::ByPrefix(prefix);
let items =
Expand All @@ -1161,7 +1163,7 @@ impl CacheStore for RocksCacheStore {
with_payload: bool,
) -> Result<Vec<QueueListItem>, CubeError> {
self.store
.read_operation(move |db_ref| {
.read_operation("queue_list", move |db_ref| {
let queue_schema = QueueItemRocksTable::new(db_ref.clone());

let items = if let Some(status_filter) = status_filter {
Expand Down Expand Up @@ -1210,7 +1212,7 @@ impl CacheStore for RocksCacheStore {

async fn queue_get(&self, key: QueueKey) -> Result<Option<QueueGetResponse>, CubeError> {
self.store
.read_operation(move |db_ref| {
.read_operation("queue_get", move |db_ref| {
let queue_schema = QueueItemRocksTable::new(db_ref.clone());

if let Some(item_row) = queue_schema.get_row_by_key(key)? {
Expand Down Expand Up @@ -1238,7 +1240,7 @@ impl CacheStore for RocksCacheStore {

async fn queue_cancel(&self, key: QueueKey) -> Result<Option<QueueCancelResponse>, CubeError> {
self.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("queue_cancel", move |db_ref, batch_pipe| {
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone());

Expand Down Expand Up @@ -1267,7 +1269,7 @@ impl CacheStore for RocksCacheStore {

async fn queue_heartbeat(&self, key: QueueKey) -> Result<(), CubeError> {
self.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("queue_heartbeat", move |db_ref, batch_pipe| {
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
let id_row_opt = queue_schema.get_row_by_key(key.clone())?;

Expand All @@ -1292,7 +1294,7 @@ impl CacheStore for RocksCacheStore {
allow_concurrency: u32,
) -> Result<QueueRetrieveResponse, CubeError> {
self.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("queue_retrieve_by_path", move |db_ref, batch_pipe| {
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
let prefix = QueueItem::parse_path(path.clone())
.0
Expand Down Expand Up @@ -1368,7 +1370,7 @@ impl CacheStore for RocksCacheStore {

async fn queue_ack(&self, key: QueueKey, result: Option<String>) -> Result<bool, CubeError> {
self.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("queue_ack", move |db_ref, batch_pipe| {
let queue_item_tbl = QueueItemRocksTable::new(db_ref.clone());
let queue_item_payload_tbl = QueueItemPayloadRocksTable::new(db_ref.clone());

Expand Down Expand Up @@ -1470,7 +1472,7 @@ impl CacheStore for RocksCacheStore {

async fn queue_merge_extra(&self, key: QueueKey, payload: String) -> Result<(), CubeError> {
self.store
.write_operation(move |db_ref, batch_pipe| {
.write_operation("queue_merge_extra", move |db_ref, batch_pipe| {
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
let id_row_opt = queue_schema.get_row_by_key(key.clone())?;

Expand Down
Loading
Loading