Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
fb0573d
Resolve merge with refactor
4adex Oct 14, 2025
799dc77
Fix merge residues
4adex Oct 14, 2025
d9e3e84
Merge branch 'refactor' into kd_tree_refactor
4adex Dec 17, 2025
84b3069
Update Cargo.lock
4adex Dec 17, 2025
fd02eb0
Debug without rebuild
4adex Dec 17, 2025
687e30c
Add rebuild
4adex Dec 18, 2025
496bfc2
Add tests
4adex Dec 18, 2025
250e11b
Resolve merge conflicts
4adex Dec 27, 2025
327bb03
Refactor code
4adex Dec 27, 2025
7661e7e
implement kdtree serialization
TanmayArya-1p Jan 1, 2026
c4590d7
implement serialization and deserialization logic for flat and kdtree
TanmayArya-1p Jan 1, 2026
6097b19
implement checkpointing for storage engine and add unit tests
TanmayArya-1p Jan 2, 2026
026df74
fix(rocksdb): restore from checkpoint function; unit test for the same
TanmayArya-1p Jan 2, 2026
9ce032c
refactor snapshots and checkpoints
TanmayArya-1p Jan 3, 2026
43cb396
implement repopulate vector on snapshot reload
TanmayArya-1p Jan 4, 2026
789ebf0
fix clippy warnings
TanmayArya-1p Jan 4, 2026
a1d6a51
format manifest
TanmayArya-1p Jan 4, 2026
ae8e4a4
implement basic local snapshot registry for snapshot engine
TanmayArya-1p Jan 4, 2026
809ec0c
implement lockfile for snapshot registry
TanmayArya-1p Jan 4, 2026
2dd4e0e
change order of metadata entries in filename
TanmayArya-1p Jan 4, 2026
8380b2b
implement snapshot engine worker thread; add dead/alive snapshots
TanmayArya-1p Jan 4, 2026
dc7f0fd
refactor registry api; bug fixes and dummy test for snapshot engine
TanmayArya-1p Jan 4, 2026
6ba7f97
remove todo comments
TanmayArya-1p Jan 4, 2026
47e9d84
separate worker and main thread snapshot apis; more comprehensive uni…
TanmayArya-1p Jan 5, 2026
3d66345
add comprehensive unit test for snapshot engine; fix a gnarly bug in …
TanmayArya-1p Jan 5, 2026
6789494
remove todo
TanmayArya-1p Jan 5, 2026
f3167f3
fix unit test for rocksdb
TanmayArya-1p Jan 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
351 changes: 327 additions & 24 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"crates/http",
"crates/tui",
"crates/grpc",
"crates/snapshot",
]

[workspace.package]
Expand Down Expand Up @@ -49,5 +50,6 @@ grpc = { path = "crates/grpc" }
http = { path = "crates/http" }
index = { path = "crates/index" }
server = { path = "crates/server" }
snapshot = { path = "crates/snapshot" }
storage = { path = "crates/storage" }
tui = { path = "crates/tui" }
1 change: 1 addition & 0 deletions crates/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license.workspace = true
[dependencies]
defs.workspace = true
index.workspace = true
snapshot.workspace = true
storage.workspace = true
tempfile.workspace = true
uuid.workspace = true
223 changes: 208 additions & 15 deletions crates/api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use defs::{DbError, IndexedVector, Similarity};
use defs::{DbError, IndexedVector, Similarity, SnapshottableDb};

use defs::{DenseVector, Payload, Point, PointId};
use std::path::PathBuf;
use index::kd_tree::index::KDTree;
use std::path::{Path, PathBuf};
use tempfile::tempdir;
// use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};

use index::flat::FlatIndex;
use index::flat::index::FlatIndex;
use index::{IndexType, VectorIndex};
use snapshot::Snapshot;
use storage::rocks_db::RocksDbStorage;
use storage::{StorageEngine, StorageType, VectorPage};

Expand Down Expand Up @@ -131,6 +134,31 @@ impl VectorDb {
}
}

impl SnapshottableDb for VectorDb {
fn create_snapshot(&self, dir_path: &Path) -> Result<PathBuf, DbError> {
if !dir_path.is_dir() {
return Err(DbError::SnapshotError(format!(
"Invalid path: {}",
dir_path.display()
)));
}

let index_snapshot = self
.index
.read()
.map_err(|_| DbError::LockError)?
.snapshot()?;

let tempdir = tempdir().unwrap();
let storage_checkpoint = self.storage.checkpoint_at(tempdir.path())?;

let snapshot = Snapshot::new(index_snapshot, storage_checkpoint, self.dimension)?;
let snapshot_path = snapshot.save(dir_path)?;

Ok(snapshot_path)
}
}

#[derive(Debug)]
pub struct DbConfig {
pub storage_type: StorageType,
Expand All @@ -139,6 +167,28 @@ pub struct DbConfig {
pub dimension: usize,
}

#[derive(Debug)]
pub struct DbRestoreConfig {
pub data_path: PathBuf,
pub snapshot_path: PathBuf,
}

impl DbRestoreConfig {
pub fn new(data_path: PathBuf, snapshot_path: PathBuf) -> Self {
Self {
data_path,
snapshot_path,
}
}
}

pub fn restore_from_snapshot(config: &DbRestoreConfig) -> Result<VectorDb, DbError> {
// restore the index from the snapshot
let (storage_engine, index, dimensions) =
Snapshot::load(&config.snapshot_path, &config.data_path)?;
Ok(VectorDb::_new(storage_engine, index, dimensions))
}

pub fn init_api(config: DbConfig) -> Result<VectorDb, DbError> {
// Initialize the storage engine
let storage = match config.storage_type {
Expand All @@ -149,7 +199,8 @@ pub fn init_api(config: DbConfig) -> Result<VectorDb, DbError> {
// Initialize the vector index
let index: Arc<RwLock<dyn VectorIndex>> = match config.index_type {
IndexType::Flat => Arc::new(RwLock::new(FlatIndex::new())),
_ => Arc::new(RwLock::new(FlatIndex::new())),
IndexType::KDTree => Arc::new(RwLock::new(KDTree::build_empty(config.dimension))),
_ => Arc::new(RwLock::new(FlatIndex::new())), // TODO: add hnsw here
};

// Init the db
Expand All @@ -166,25 +217,28 @@ mod tests {

// TODO: Add more exhaustive tests

use std::sync::Mutex;

use super::*;
use defs::ContentType;
use tempfile::tempdir;
use snapshot::{engine::SnapshotEngine, registry::local::LocalRegistry};
use tempfile::{TempDir, tempdir};

// Helper function to create a test database
fn create_test_db() -> VectorDb {
fn create_test_db() -> (VectorDb, TempDir) {
let temp_dir = tempdir().unwrap();
let config = DbConfig {
storage_type: StorageType::RocksDb,
index_type: IndexType::Flat,
data_path: temp_dir.path().to_path_buf(),
dimension: 3,
};
init_api(config).unwrap()
(init_api(config).unwrap(), temp_dir)
}

#[test]
fn test_insert_and_get() {
let db = create_test_db();
let (db, _temp_dir) = create_test_db();
let vector = vec![1.0, 2.0, 3.0];
let payload = Payload {
content_type: ContentType::Text,
Expand All @@ -209,7 +263,7 @@ mod tests {

#[test]
fn test_dimension_mismatch() {
let db = create_test_db();
let (db, _temp_dir) = create_test_db();
let v1 = vec![1.0, 2.0, 3.0];
let v2 = vec![1.0, 2.0];
let payload = defs::Payload {
Expand All @@ -228,7 +282,7 @@ mod tests {

#[test]
fn test_delete() {
let db = create_test_db();
let (db, _temp_dir) = create_test_db();
let vector = vec![1.0, 2.0, 3.0];
let payload = Payload {
content_type: ContentType::Text,
Expand All @@ -251,7 +305,7 @@ mod tests {

#[test]
fn test_search() {
let db = create_test_db();
let (db, _temp_dir) = create_test_db();

// Insert some points
let vectors = vec![
Expand Down Expand Up @@ -280,7 +334,7 @@ mod tests {

#[test]
fn test_search_limit() {
let db = create_test_db();
let (db, _temp_dir) = create_test_db();

// Insert 5 points
let mut ids = Vec::new();
Expand All @@ -307,7 +361,7 @@ mod tests {

#[test]
fn test_empty_database() {
let db = create_test_db();
let (db, _temp_dir) = create_test_db();

// Get non-existent point
assert!(db.get(Uuid::new_v4()).unwrap().is_none());
Expand All @@ -319,7 +373,7 @@ mod tests {

#[test]
fn test_list_vectors() {
let db = create_test_db();
let (db, _temp_dir) = create_test_db();
// insert some points
let mut ids = Vec::new();
for i in 0..10 {
Expand Down Expand Up @@ -350,7 +404,7 @@ mod tests {

#[test]
fn test_build_index() {
let db = create_test_db();
let (db, _temp_dir) = create_test_db();

// insert some points
for i in 0..10 {
Expand All @@ -370,4 +424,143 @@ mod tests {
let inserted = db.build_index().unwrap();
assert_eq!(inserted, 10);
}

#[test]
fn test_create_and_load_snapshot() {
let (old_db, temp_dir) = create_test_db();

let v1 = vec![0.0, 1.0, 2.0];
let v2 = vec![3.0, 4.0, 5.0];
let v3 = vec![6.0, 7.0, 8.0];

let id1 = old_db
.insert(
v1.clone(),
Payload {
content_type: ContentType::Text,
content: "test".to_string(),
},
)
.unwrap();

let id2 = old_db
.insert(
v2.clone(),
Payload {
content_type: ContentType::Text,
content: "test".to_string(),
},
)
.unwrap();

let temp_snapshot_dir = tempdir().unwrap();
let snapshot_path = old_db.create_snapshot(temp_snapshot_dir.path()).unwrap();

// insert v3 after snapshot
let id3 = old_db
.insert(
v3.clone(),
Payload {
content_type: ContentType::Text,
content: "test".to_string(),
},
)
.unwrap();

let reload_config = DbRestoreConfig {
data_path: temp_dir.path().to_path_buf(),
snapshot_path,
};

std::mem::drop(old_db);
let loaded_db = restore_from_snapshot(&reload_config).unwrap();

assert!(loaded_db.get(id1).unwrap_or(None).is_some());
assert!(loaded_db.get(id2).unwrap_or(None).is_some());
assert!(loaded_db.get(id3).unwrap_or(None).is_none()); // v3 was inserted after snapshot was taken

// vector restore check
assert!(loaded_db.get(id1).unwrap().unwrap().vector.unwrap() == v1);
assert!(loaded_db.get(id2).unwrap().unwrap().vector.unwrap() == v2);
}

#[test]
fn test_snapshot_engine() {
let (_db, _temp_dir) = create_test_db();
let db = Arc::new(Mutex::new(_db));

let registry_tempdir = tempdir().unwrap();

let registry = Arc::new(Mutex::new(
LocalRegistry::new(registry_tempdir.path()).unwrap(),
));

let last_k = 4;
let mut se = SnapshotEngine::new(last_k, db.clone(), registry.clone());

let v1 = vec![0.0, 1.0, 2.0];
let v2 = vec![3.0, 4.0, 5.0];
let v3 = vec![6.0, 7.0, 8.0];

let test_vectors = vec![v1.clone(), v2.clone(), v3.clone()];
let mut inserted_ids = Vec::new();

for (i, vector) in test_vectors.clone().into_iter().enumerate() {
se.snapshot().unwrap();
let id = db
.lock()
.unwrap()
.insert(
vector.clone(),
Payload {
content_type: ContentType::Text,
content: format!("{}", i),
},
)
.unwrap();
inserted_ids.push(id);
}
se.snapshot().unwrap();
let snapshots = se.list_alive_snapshots().unwrap();

// asserting these cases:
// snapshot 0 : no vectors
// snapshot 1 : v1
// snapshot 2 : v1, v2
// snapshot 3 : v1, v2, v3

std::mem::drop(db);
std::mem::drop(se);

for (i, snapshot) in snapshots.iter().enumerate() {
let temp_dir = tempdir().unwrap();
let db = restore_from_snapshot(&DbRestoreConfig {
data_path: temp_dir.path().to_path_buf(),
snapshot_path: snapshot.path.clone(),
})
.unwrap();
for j in 0..i {
// test if point is present
assert!(db.get(inserted_ids[j]).unwrap_or(None).is_some());
// test vector restore
assert!(
db.get(inserted_ids[j]).unwrap().unwrap().vector.unwrap() == test_vectors[j]
);
// test payload restore
assert!(
db.get(inserted_ids[j])
.unwrap()
.unwrap()
.payload
.unwrap()
.content
== format!("{}", j)
);
}
for absent_id in inserted_ids.iter().skip(i) {
assert!(db.get(*absent_id).unwrap_or(None).is_none());
}
std::mem::drop(db);
}
}
}
9 changes: 9 additions & 0 deletions crates/defs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,16 @@ pub enum DbError {
DeserializationError,
IndexError(String),
LockError,
IndexInitError, //TODO: Change this
UnsupportedSimilarity,
DimensionMismatch,
SnapshotError(String),
StorageInitializationError,
StorageCheckpointError(String),
InvalidMagicBytes(String),
VectorNotFound(uuid::Uuid),
SnapshotRegistryError(String),
StorageEngineError(String),
}

#[derive(Debug)]
Expand Down
7 changes: 7 additions & 0 deletions crates/defs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
pub mod error;
pub mod types;

use std::path::{Path, PathBuf};

// Without re-exports, users would need to write defs::types::SomeType instead of just defs::SomeType. Re-exports simplify the API by flattening the module hierarchy. The * means "everything public" from that module.
pub use error::*;
pub use types::*;

// hoisted trait so it can be used by the snapshots crate
pub trait SnapshottableDb: Send + Sync {
fn create_snapshot(&self, dir_path: &Path) -> Result<PathBuf, DbError>;
}
2 changes: 2 additions & 0 deletions crates/defs/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub enum Similarity {
Cosine,
}

pub type Magic = [u8; 4];

// Struct which stores the distance between a vector and query vector and implements ordering traits
#[derive(Copy, Clone)]
pub struct DistanceOrderedVector<'q> {
Expand Down
Loading