Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions rust/cubestore/cubestore/src/metastore/rocks_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,14 +518,14 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
let inserted_row = self.insert_row_kv(row_id, serialized_row)?;

batch_pipe.add_event(MetaStoreEvent::Insert(Self::table_id(), row_id));
if self.snapshot().get(&inserted_row.key)?.is_some() {
if self.snapshot().get_pinned(&inserted_row.key)?.is_some() {
return Err(CubeError::internal(format!("Primary key constraint violation. Primary key already exists for a row id {}: {:?}", row_id, &row)));
}
batch_pipe.batch().put(inserted_row.key, inserted_row.val);

let index_row = self.insert_index_row(&row, row_id)?;
for to_insert in index_row {
if self.snapshot().get(&to_insert.key)?.is_some() {
if self.snapshot().get_pinned(&to_insert.key)?.is_some() {
return Err(CubeError::internal(format!("Primary key constraint violation in secondary index. Primary key already exists for a row id {}: {:?}", row_id, &row)));
}
batch_pipe.batch().put(to_insert.key, to_insert.val);
Expand Down Expand Up @@ -573,15 +573,15 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
fn migration_check_table(&self) -> Result<(), CubeError> {
let snapshot = self.snapshot();

let table_info = snapshot.get(
let table_info = snapshot.get_pinned(
&RowKey::TableInfo {
table_id: Self::table_id(),
}
.to_bytes(),
)?;

if let Some(table_info) = table_info {
let table_info = self.deserialize_table_info(table_info.as_slice())?;
let table_info = self.deserialize_table_info(&table_info)?;

if table_info.version != Self::T::version()
|| table_info.value_version != Self::T::value_version()
Expand Down Expand Up @@ -633,14 +633,14 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
fn migration_check_indexes(&self) -> Result<(), CubeError> {
let snapshot = self.snapshot();
for index in Self::indexes().into_iter() {
let index_info = snapshot.get(
let index_info = snapshot.get_pinned(
&RowKey::SecondaryIndexInfo {
index_id: Self::index_id(index.get_id()),
}
.to_bytes(),
)?;
if let Some(index_info) = index_info {
let index_info = self.deserialize_index_info(index_info.as_slice())?;
let index_info = self.deserialize_index_info(&index_info)?;
if index_info.version != index.version()
|| index_info.value_version != index.value_version()
{
Expand Down Expand Up @@ -977,7 +977,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
RowKey::SecondaryIndex(Self::index_id(index_id), secondary_key_hash, row_id);
let secondary_index_key = secondary_index_row_key.to_bytes();

if let Some(secondary_key_bytes) = self.db().get(&secondary_index_key)? {
if let Some(secondary_key_bytes) = self.db().get_pinned(&secondary_index_key)? {
let index_value_version = RocksSecondaryIndex::value_version(secondary_index);
let new_value = match RocksSecondaryIndexValue::from_bytes(
&secondary_key_bytes,
Expand Down Expand Up @@ -1102,10 +1102,10 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {

fn get_row(&self, row_id: u64) -> Result<Option<IdRow<Self::T>>, CubeError> {
let ref db = self.snapshot();
let res = db.get(RowKey::Table(Self::table_id(), row_id).to_bytes())?;
let res = db.get_pinned(RowKey::Table(Self::table_id(), row_id).to_bytes())?;

if let Some(buffer) = res {
let row = self.deserialize_id_row(row_id, buffer.as_slice())?;
let row = self.deserialize_id_row(row_id, &buffer)?;
return Ok(Some(row));
}

Expand Down
Loading