Skip to content

Commit 8584fe7

Browse files
authored
chore(cubestore): QueueResult - use same pk as QueueItem (#6546)
1 parent e8814bf commit 8584fe7

File tree

3 files changed

+51
-8
lines changed

3 files changed

+51
-8
lines changed

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -825,7 +825,12 @@ impl CacheStore for RocksCacheStore {
825825

826826
if let Some(result) = result {
827827
let queue_result = QueueResult::new(path.clone(), result.clone());
828-
let result_row = result_schema.insert(queue_result, batch_pipe)?;
828+
// QueueResult is a result of QueueItem, it's why we can use row_id of QueueItem
829+
let result_row = result_schema.insert_with_pk(
830+
item_row.get_id(),
831+
queue_result,
832+
batch_pipe,
833+
)?;
829834

830835
batch_pipe.add_event(MetaStoreEvent::AckQueueItem(QueueResultAckEvent {
831836
path,

rust/cubestore/cubestore/src/cachestore/queue_result.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@ pub struct QueueResult {
1515
pub(crate) expire: DateTime<Utc>,
1616
}
1717

18-
impl RocksEntity for QueueResult {}
18+
impl RocksEntity for QueueResult {
19+
fn version() -> u32 {
20+
2
21+
}
22+
}
1923

2024
impl QueueResult {
2125
pub fn new(path: String, value: String) -> Self {

rust/cubestore/cubestore/src/metastore/rocks_table.rs

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,10 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
376376
Ok(())
377377
}
378378

379-
fn insert(
379+
/// @internal Do not use this method directly, please use insert or insert_with_pk
380+
fn do_insert(
380381
&self,
382+
row_id: Option<u64>,
381383
row: Self::T,
382384
batch_pipe: &mut BatchPipe,
383385
) -> Result<IdRow<Self::T>, CubeError> {
@@ -401,7 +403,13 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
401403
}
402404
}
403405

404-
let (row_id, inserted_row) = self.insert_row(serialized_row)?;
406+
let row_id = if let Some(row_id) = row_id {
407+
row_id
408+
} else {
409+
self.next_table_seq()?
410+
};
411+
let inserted_row = self.insert_row(row_id, serialized_row)?;
412+
405413
batch_pipe.add_event(MetaStoreEvent::Insert(Self::table_id(), row_id));
406414
if self.snapshot().get(&inserted_row.key)?.is_some() {
407415
return Err(CubeError::internal(format!("Primary key constraint violation. Primary key already exists for a row id {}: {:?}", row_id, &row)));
@@ -419,6 +427,23 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
419427
Ok(IdRow::new(row_id, row))
420428
}
421429

430+
fn insert_with_pk(
431+
&self,
432+
row_id: u64,
433+
row: Self::T,
434+
batch_pipe: &mut BatchPipe,
435+
) -> Result<IdRow<Self::T>, CubeError> {
436+
self.do_insert(Some(row_id), row, batch_pipe)
437+
}
438+
439+
fn insert(
440+
&self,
441+
row: Self::T,
442+
batch_pipe: &mut BatchPipe,
443+
) -> Result<IdRow<Self::T>, CubeError> {
444+
self.do_insert(None, row, batch_pipe)
445+
}
446+
422447
fn migrate(&self) -> Result<(), CubeError> {
423448
self.migration_check_table()?;
424449
self.migration_check_indexes()?;
@@ -444,6 +469,15 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
444469
{
445470
let mut batch = WriteBatch::default();
446471

472+
log::trace!(
473+
"Migrating table {:?} from [{}, {}] to [{}, {}]",
474+
Self::table_id(),
475+
table_info.version,
476+
table_info.value_version,
477+
Self::T::version(),
478+
Self::T::value_version(),
479+
);
480+
447481
self.migrate_table(&mut batch, table_info)?;
448482

449483
batch.put(
@@ -798,14 +832,14 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
798832
Ok(next_seq)
799833
}
800834

801-
fn insert_row(&self, row: Vec<u8>) -> Result<(u64, KeyVal), CubeError> {
802-
let next_seq = self.next_table_seq()?;
803-
let t = RowKey::Table(Self::table_id(), next_seq);
835+
fn insert_row(&self, row_id: u64, row: Vec<u8>) -> Result<KeyVal, CubeError> {
836+
let t = RowKey::Table(Self::table_id(), row_id);
804837
let res = KeyVal {
805838
key: t.to_bytes(),
806839
val: row,
807840
};
808-
Ok((next_seq, res))
841+
842+
Ok(res)
809843
}
810844

811845
fn update_row(&self, row_id: u64, row: Vec<u8>) -> Result<KeyVal, CubeError> {

0 commit comments

Comments
 (0)