Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ impl CacheEvictionManager {
cache_schema.update_extended_ttl_secondary_index(
row_id,
&CacheItemRocksIndex::ByPath,
item.key_hash.to_vec(),
item.key_hash,
RocksSecondaryIndexValueTTLExtended {
lfu: item.lfu,
lru: item.lru.decode_value_as_opt_datetime()?,
Expand Down
8 changes: 4 additions & 4 deletions rust/cubestore/cubestore/src/cachestore/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ mod tests {
let index = CacheItemRocksIndex::ByPath;
let key = RowKey::SecondaryIndex(
CacheItemRocksTable::index_id(index.get_id()),
index.key_hash(&row).to_be_bytes().to_vec(),
index.key_hash(&row).to_be_bytes(),
1,
);

Expand All @@ -386,7 +386,7 @@ mod tests {
let index = CacheItemRocksIndex::ByPath;
let key = RowKey::SecondaryIndex(
CacheItemRocksTable::index_id(index.get_id()),
index.key_hash(&row).to_be_bytes().to_vec(),
index.key_hash(&row).to_be_bytes(),
1,
);

Expand All @@ -410,11 +410,11 @@ mod tests {
let index = CacheItemRocksIndex::ByPath;
let key = RowKey::SecondaryIndex(
CacheItemRocksTable::index_id(index.get_id()),
index.key_hash(&row).to_be_bytes().to_vec(),
index.key_hash(&row).to_be_bytes(),
1,
);

// Indexes with TTL use new format (v2) for indexes, but index migration doesnt skip
// Indexes with TTL use a new format (v2) for indexes, but index migration doesn't skip
// compaction for old rows
let index_value = RocksSecondaryIndexValue::Hash("kek".as_bytes())
.to_bytes(RocksSecondaryIndexValueVersion::OnlyHash)
Expand Down
15 changes: 7 additions & 8 deletions rust/cubestore/cubestore/src/metastore/rocks_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize};
use serde_repr::*;
use std::collections::HashMap;
use std::fmt::Debug;
use std::io::{Cursor, Write};
use std::io::{Cursor, Read, Write};

use crate::metastore::snapshot_info::SnapshotInfo;
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
Expand Down Expand Up @@ -122,7 +122,7 @@ pub fn get_fixed_prefix() -> usize {
13
}

pub type SecondaryKey = Vec<u8>;
pub type SecondaryKeyHash = [u8; 8];
pub type IndexId = u32;

#[derive(Clone)]
Expand Down Expand Up @@ -378,7 +378,7 @@ impl<'a> RocksSecondaryIndexValue<'a> {
pub enum RowKey {
Table(TableId, /** row_id */ u64),
Sequence(TableId),
SecondaryIndex(IndexId, SecondaryKey, /** row_id */ u64),
SecondaryIndex(IndexId, SecondaryKeyHash, /** row_id */ u64),
SecondaryIndexInfo { index_id: IndexId },
TableInfo { table_id: TableId },
}
Expand Down Expand Up @@ -421,11 +421,10 @@ impl RowKey {
)?)),
3 => {
let table_id = IndexId::from(reader.read_u32::<BigEndian>()?);
let mut secondary_key: SecondaryKey = SecondaryKey::new();
let sc_length = bytes.len() - 13;
for _i in 0..sc_length {
secondary_key.push(reader.read_u8()?);
}

let mut secondary_key: SecondaryKeyHash = [0_u8; 8];
reader.read_exact(&mut secondary_key)?;

let row_id = reader.read_u64::<BigEndian>()?;

Ok(RowKey::SecondaryIndex(table_id, secondary_key, row_id))
Expand Down
66 changes: 25 additions & 41 deletions rust/cubestore/cubestore/src/metastore/rocks_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::metastore::rocks_store::TableId;
use crate::metastore::{
get_fixed_prefix, BatchPipe, DbTableRef, IdRow, IndexId, KeyVal, MemorySequence,
MetaStoreEvent, RocksSecondaryIndexValue, RocksSecondaryIndexValueTTLExtended,
RocksSecondaryIndexValueVersion, RocksTableStats, RowKey, SecondaryIndexInfo, SecondaryKey,
RocksSecondaryIndexValueVersion, RocksTableStats, RowKey, SecondaryIndexInfo, SecondaryKeyHash,
TableInfo,
};
use crate::CubeError;
Expand Down Expand Up @@ -303,7 +303,7 @@ pub struct IndexScanIter<'a, RT: RocksTable + ?Sized> {
table: &'a RT,
index_id: u32,
secondary_key_val: Vec<u8>,
secondary_key_hash: Vec<u8>,
secondary_key_hash: SecondaryKeyHash,
iter: DBIterator<'a>,
}

Expand Down Expand Up @@ -364,7 +364,7 @@ where
#[derive(Debug)]
pub struct SecondaryIndexValueScanIterItem {
pub row_id: u64,
pub key_hash: SecondaryKey,
pub key_hash: SecondaryKeyHash,
pub ttl: Option<DateTime<Utc>>,
pub extended: Option<RocksSecondaryIndexValueTTLExtended>,
}
Expand Down Expand Up @@ -496,11 +496,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
if index.is_unique() {
let hash = index.key_hash(&row);
let index_val = index.index_key_by(&row);
let existing_keys = self.get_row_ids_from_index(
index.get_id(),
&index_val,
&hash.to_be_bytes().to_vec(),
)?;
let existing_keys =
self.get_row_ids_from_index(index.get_id(), &index_val, hash.to_be_bytes())?;
if existing_keys.len() > 0 {
return Err(CubeError::user(
format!(
Expand Down Expand Up @@ -759,7 +756,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
let existing_keys = self.get_row_ids_from_index(
RocksSecondaryIndex::get_id(secondary_index),
&index_val,
&hash.to_be_bytes().to_vec(),
hash.to_be_bytes(),
)?;

Ok(existing_keys)
Expand Down Expand Up @@ -832,8 +829,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
K: Hash,
{
let row_ids = self.get_row_ids_by_index(row_key, secondary_index)?;

let mut res = Vec::new();
let mut res = Vec::with_capacity(row_ids.len());

for id in row_ids {
if let Some(row) = self.get_row(id)? {
Expand Down Expand Up @@ -969,7 +965,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
&self,
row_id: u64,
secondary_index: &'a impl RocksSecondaryIndex<Self::T, K>,
secondary_key_hash: SecondaryKey,
secondary_key_hash: SecondaryKeyHash,
extended: RocksSecondaryIndexValueTTLExtended,
batch_pipe: &mut BatchPipe,
) -> Result<bool, CubeError>
Expand Down Expand Up @@ -1141,11 +1137,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
) -> KeyVal {
let hash = index.key_hash(row);
let index_val = index.index_value(row);
let key = RowKey::SecondaryIndex(
Self::index_id(index.get_id()),
hash.to_be_bytes().to_vec(),
row_id,
);
let key =
RowKey::SecondaryIndex(Self::index_id(index.get_id()), hash.to_be_bytes(), row_id);

KeyVal {
key: key.to_bytes(),
Expand All @@ -1157,11 +1150,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
let mut res = Vec::new();
for index in Self::indexes().iter() {
let hash = index.key_hash(&row);
let key = RowKey::SecondaryIndex(
Self::index_id(index.get_id()),
hash.to_be_bytes().to_vec(),
row_id,
);
let key =
RowKey::SecondaryIndex(Self::index_id(index.get_id()), hash.to_be_bytes(), row_id);
res.push(KeyVal {
key: key.to_bytes(),
val: vec![],
Expand Down Expand Up @@ -1247,17 +1237,17 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
&self,
secondary_id: u32,
secondary_key_val: &Vec<u8>,
secondary_key_hash: &Vec<u8>,
secondary_key_hash: SecondaryKeyHash,
) -> Result<Vec<u64>, CubeError> {
let ref db = self.snapshot();
let key_len = secondary_key_hash.len();
let key_min =
RowKey::SecondaryIndex(Self::index_id(secondary_id), secondary_key_hash.clone(), 0);
let key_min = RowKey::SecondaryIndex(Self::index_id(secondary_id), secondary_key_hash, 0);

let mut res: Vec<u64> = Vec::new();

let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);

let iter = db.iterator_opt(
IteratorMode::From(&key_min.to_bytes()[0..(key_len + 5)], Direction::Forward),
opts,
Expand All @@ -1269,10 +1259,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
if let RowKey::SecondaryIndex(_, secondary_index_hash, row_id) =
RowKey::from_bytes(&key)
{
if !secondary_index_hash
.iter()
.zip(secondary_key_hash)
.all(|(a, b)| a == b)
if secondary_index_hash.len() != secondary_key_hash.len()
|| secondary_index_hash != secondary_key_hash
{
break;
}
Expand All @@ -1284,9 +1272,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
RocksSecondaryIndexValue::HashAndTTLExtended(h, expire, _) => (h, expire),
};

if secondary_key_val.len() != hash.len()
|| !hash.iter().zip(secondary_key_val).all(|(a, b)| a == b)
{
if hash.len() != secondary_key_val.len() || hash != secondary_key_val.as_slice() {
continue;
}

Expand Down Expand Up @@ -1341,8 +1327,9 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
batch: &mut WriteBatch,
) -> Result<u64, CubeError> {
let ref db = self.snapshot();
let zero_vec = vec![0 as u8; 8];
let key_min = RowKey::SecondaryIndex(Self::index_id(secondary_id), zero_vec.clone(), 0);

let zero_vec = [0 as u8; 8];
let key_min = RowKey::SecondaryIndex(Self::index_id(secondary_id), zero_vec, 0);

let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(false);
Expand Down Expand Up @@ -1408,7 +1395,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
let ref db = self.snapshot();

let index_id = RocksSecondaryIndex::get_id(secondary_index);
let row_key = RowKey::SecondaryIndex(Self::index_id(index_id), vec![], 0);
let zero_vec = [0 as u8; 8];
let row_key = RowKey::SecondaryIndex(Self::index_id(index_id), zero_vec, 0);

let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(false);
Expand All @@ -1433,16 +1421,12 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
{
let ref db = self.snapshot();

let secondary_key_hash = secondary_index
.typed_key_hash(&row_key)
.to_be_bytes()
.to_vec();
let secondary_key_hash = secondary_index.typed_key_hash(&row_key).to_be_bytes() as [u8; 8];
let secondary_key_val = secondary_index.key_to_bytes(&row_key);

let index_id = RocksSecondaryIndex::get_id(secondary_index);
let key_len = secondary_key_hash.len();
let key_min =
RowKey::SecondaryIndex(Self::index_id(index_id), secondary_key_hash.clone(), 0);
let key_min = RowKey::SecondaryIndex(Self::index_id(index_id), secondary_key_hash, 0);

let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);
Expand Down
Loading