diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f8af889..19b10c3 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -23,7 +23,7 @@ jobs: - beta steps: - uses: actions/checkout@v1 - - uses: dtolnay/rust-toolchain@1.85 + - uses: dtolnay/rust-toolchain@1.89 - uses: actions-rs/cargo@v1 with: command: build @@ -39,7 +39,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v1 - - uses: dtolnay/rust-toolchain@1.85 + - uses: dtolnay/rust-toolchain@1.89 with: profile: minimal components: clippy, rustfmt @@ -61,7 +61,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v1 - - uses: dtolnay/rust-toolchain@1.85 + - uses: dtolnay/rust-toolchain@1.89 - name: Run fuzzer env: HANNOY_FUZZ_DURATION_SEC: 1800 diff --git a/Cargo.toml b/Cargo.toml index bc8dd7d..bc15b30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "hannoy" description = "HNSW Approximate Nearest Neighbors in Rust, based on LMDB and optimized for memory usage" -version = "0.1.2" +version = "0.1.7-nested-rtxns" repository = "https://github.com/nnethercott/hannoy" keywords = [ "HNSW", @@ -27,7 +27,7 @@ crate-type = ["cdylib", "rlib"] bytemuck = { version = "1.21.0", features = ["derive", "extern_crate_alloc"] } byteorder = "1.5.0" hashbrown = "0.15.4" -heed = { version = "0.22.0", default-features = false } +heed = { version = "0.22.1-nested-rtxns", default-features = false } min-max-heap = "1.3.0" page_size = "0.6.0" papaya = "0.2.3" @@ -44,7 +44,8 @@ pyo3-stub-gen = { version = "0.13.1", optional = true } once_cell = { version = "1.21.3", optional = true } tempfile = { version = "3.21.0", optional = true } parking_lot = { version = "0.12.4", optional = true } - +thread_local = "1.1.9" +crossbeam-channel = "0.5.15" [target.'cfg(not(windows))'.dependencies] madvise = "0.1.0" @@ -53,7 +54,7 @@ madvise = "0.1.0" anyhow = "1.0.95" approx = "0.5.1" arbitrary = { version = "1.4.1", features = ["derive"] } -arroy = "0.6.1" +arroy = { version = "0.6.4-nested-rtxns", git = "https://github.com/meilisearch/arroy", "tag" = "v0.6.4-nested-rtxns", default-features = false } clap = { version = "4.5.24", features = ["derive"] } divan = { version = "3.0.5", package = "codspeed-divan-compat" } hnsw_rs = "0.3.2" diff --git a/benches/benchmark.rs b/benches/benchmark.rs index 8bcb65f..8dfe9a7 100644 --- a/benches/benchmark.rs +++ b/benches/benchmark.rs @@ -1,6 +1,8 @@ -use hannoy::{distances::Cosine, Database, Writer}; +use hannoy::distances::Cosine; +use hannoy::{Database, Writer}; use heed::{Env, EnvOpenOptions, RwTxn}; -use rand::{rngs::StdRng, Rng, SeedableRng}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; use tempfile::tempdir; static M: usize = 16; diff --git a/benches/speed.rs b/benches/speed.rs index 2841a10..6549327 100644 --- a/benches/speed.rs +++ b/benches/speed.rs @@ -3,14 +3,14 @@ use std::fs::OpenOptions; use std::hint::black_box; use std::io::Write; -use hannoy::Reader; -use hannoy::{distances::Cosine, Database, Writer}; +use hannoy::distances::Cosine; +use hannoy::{Database, Reader, Writer}; use heed::{Env, EnvOpenOptions, RwTxn}; use hnsw_rs; use hnsw_rs::hnsw::Hnsw; use hnsw_rs::prelude::DistCosine; -use rand::thread_rng; -use rand::{rngs::StdRng, Rng, SeedableRng}; +use rand::rngs::StdRng; +use rand::{thread_rng, Rng, SeedableRng}; use tempfile::tempdir; static M: usize = 16; diff --git a/src/distance/hamming.rs b/src/distance/hamming.rs index 82e76e3..a5c7fee 100644 --- a/src/distance/hamming.rs +++ b/src/distance/hamming.rs @@ -1,9 +1,10 @@ use std::fmt; +use bytemuck::{Pod, Zeroable}; + use crate::distance::Distance; use crate::node::Item; use crate::unaligned_vector::{Binary, UnalignedVector}; -use bytemuck::{Pod, Zeroable}; /// The Hamming distance between two vectors is the number of positions at /// which the corresponding symbols are different. @@ -70,7 +71,7 @@ pub fn hamming_bitwise_fast(u: &[u8], v: &[u8]) -> f32 { }) .sum::(); - if u.len() % CHUNK_SIZE != 0 { + if !u.len().is_multiple_of(CHUNK_SIZE) { distance += u .chunks_exact(CHUNK_SIZE) .remainder() diff --git a/src/error.rs b/src/error.rs index 8b8f934..4da3271 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,9 @@ use std::io; -use crate::{key::Key, node_id::NodeMode, version::Version, ItemId, LayerId}; +use crate::key::Key; +use crate::node_id::NodeMode; +use crate::version::Version; +use crate::{ItemId, LayerId}; /// The different set of errors that hannoy can encounter. #[derive(Debug, thiserror::Error)] diff --git a/src/hnsw.rs b/src/hnsw.rs index 3a2e159..b5c1e77 100644 --- a/src/hnsw.rs +++ b/src/hnsw.rs @@ -21,10 +21,10 @@ use tracing::{debug, instrument}; use crate::key::Key; use crate::node::{Item, Links, Node}; use crate::ordered_float::OrderedFloat; -use crate::parallel::{ImmutableItems, ImmutableLinks}; +use crate::parallel::FrozenReader; use crate::progress::{AtomicInsertItemsStep, HannoyBuild}; use crate::stats::BuildStats; -use crate::writer::{BuildOption, FrozenReader}; +use crate::writer::BuildOption; use crate::{Database, Distance, Error, ItemId, Result, CANCELLATION_PROBING}; pub(crate) type ScoredLink = (OrderedFloat, ItemId); @@ -135,9 +135,7 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> { let mut build_stats = BuildStats::new(); - let items = ImmutableItems::new(wtxn, database, index, options)?; - let links = ImmutableLinks::new(wtxn, database, index, database.len(wtxn)?, options)?; - let lmdb = FrozenReader { index, items: &items, links: &links }; + let lmdb = FrozenReader::new(wtxn, index, database)?; // Generate a random level for each point let mut cur_max_level = usize::MIN; @@ -173,7 +171,8 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> level_groups.into_iter().try_for_each(|grp| { grp.into_par_iter().try_for_each(|&(item_id, lvl)| { - if cancel_index.fetch_add(1, Relaxed) % CANCELLATION_PROBING == 0 && (self.cancel)() + if cancel_index.fetch_add(1, Relaxed).is_multiple_of(CANCELLATION_PROBING) + && (self.cancel)() { Err(Error::BuildCancelled) } else { @@ -187,6 +186,8 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> self.fill_gaps_from_deleted(&lmdb, to_delete, options)?; + drop(lmdb); + // Single-threaded write to lmdb options.progress.update(HannoyBuild::WritingTheItems); let mut cancellation_index = 0; @@ -211,14 +212,13 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> } } - build_stats.compute_mean_degree(wtxn, &database, index)?; Ok(build_stats) } /// This function resolves several nasty edge cases that can occur, namely : deleted /// or partially deleted entrypoints, new indexed points assigned to higher layers, ensuring /// entry points are present on all layers before build - #[instrument(skip(self, options, lmdb, levels))] + #[instrument(level = "trace", skip(self, options, lmdb, levels))] fn prepare_levels_and_entry_points

( &mut self, levels: &mut Vec<(u32, usize)>, @@ -242,7 +242,7 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> let mut l = self.max_level; for _ in del_eps.iter() { loop { - for result in lmdb.links.iter_layer(l as u8) { + for result in lmdb.iter_layer_links(l as u8)? { let ((item_id, _), _) = result?; if !to_delete.contains(item_id) && new_eps.insert(item_id) { @@ -297,7 +297,7 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> ) -> Result<()> { let mut eps = Vec::from_iter(self.entry_points.clone()); - let q = lmdb.get_item(query)?; + let q = lmdb.item(query)?; // Greedy search with: ef = 1 for lvl in (level + 1..=self.max_level).rev() { @@ -333,7 +333,7 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> /// Algorithm 4 from FreshDiskANN paper. fn fill_gaps_from_deleted

( &mut self, - lmdb: &FrozenReader, + lmdb: &FrozenReader<'_, D>, to_delete: &RoaringBitmap, options: &BuildOption

, ) -> Result<()> @@ -344,8 +344,7 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> options.progress.update(HannoyBuild::PatchOldNewDeletedLinks); let links_in_db: Vec<_> = lmdb - .links - .iter() + .iter_links()? .map(|result| { result.map(|((id, lvl), v)| { // Resize the layers if necessary. We must do this to accomodate links from @@ -361,7 +360,7 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> let cancel_index = AtomicUsize::new(0); links_in_db.into_par_iter().try_for_each(|result| { - if cancel_index.fetch_add(1, Ordering::Relaxed) % CANCELLATION_PROBING == 0 + if cancel_index.fetch_add(1, Ordering::Relaxed).is_multiple_of(CANCELLATION_PROBING) && (self.cancel)() { return Err(Error::BuildCancelled); @@ -382,7 +381,7 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> let mut bitmap = RoaringBitmap::new(); for item_id in del_subset.iter() { - bitmap.extend(lmdb.get_links(item_id, lvl).unwrap_or_default().iter()); + bitmap.extend(lmdb.links(item_id, lvl).unwrap_or_default().iter()); } bitmap |= links; bitmap -= to_delete; @@ -401,10 +400,10 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> } // Case 2: Some old links may be popped to fill gaps from deleted nodes - let curr = &lmdb.get_item(id)?; + let curr = &lmdb.item(id)?; for other in bitmap { - let dist = D::distance(curr, &lmdb.get_item(other)?); + let dist = D::distance(curr, &lmdb.item(other)?); new_links.push((OrderedFloat(dist), other)); } let pruned = self.robust_prune(new_links, lvl, self.alpha, lmdb)?; @@ -436,7 +435,7 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> let mut res = Vec::new(); // O(1) from frozzenreader - if let Ok(Links { links }) = lmdb.get_links(item_id, level) { + if let Ok(Links { links }) = lmdb.links(item_id, level) { build_stats.incr_lmdb_hits(); res.extend(links.iter()); } @@ -457,7 +456,7 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> } #[allow(clippy::too_many_arguments)] - #[instrument(name = "walk_layer", skip(self, lmdb, query))] + #[instrument(level = "trace", name = "walk_layer", skip(self, lmdb, query))] fn walk_layer( &self, query: &Item, @@ -473,7 +472,7 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> // Register all entry points as visited and populate candidates for &ep in eps { - let ve = lmdb.get_item(ep)?; + let ve = lmdb.item(ep)?; let dist = D::distance(query, &ve); candidates.push((Reverse(OrderedFloat(dist)), ep)); @@ -496,7 +495,7 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> } // If the item isn't in the frozzen reader it must have been deleted from the index, // in which case its OK not to explore it - let item = match lmdb.get_item(point) { + let item = match lmdb.item(point) { Ok(item) => item, Err(Error::MissingKey { .. }) => continue, Err(e) => return Err(e), @@ -582,7 +581,7 @@ impl<'a, D: Distance, const M: usize, const M0: usize> HnswBuilder<'a, D, M, M0> // ensure we're closer to the query than we are to other candidates let mut ok_to_add = true; for i in selected.iter().map(|(_, i)| *i) { - let d = D::distance(&lmdb.get_item(c)?, &lmdb.get_item(i)?); + let d = D::distance(&lmdb.item(c)?, &lmdb.item(i)?); if OrderedFloat(d * alpha) < dist_to_query { ok_to_add = false; break; diff --git a/src/lib.rs b/src/lib.rs index 6c1f1c2..7ffcce6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,7 @@ mod reader; mod roaring; mod spaces; mod stats; +mod update_status; mod version; mod writer; diff --git a/src/metadata.rs b/src/metadata.rs index a5c0d96..c7a3309 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -57,8 +57,16 @@ impl<'a> heed::BytesDecode<'a> for MetadataCodec { let bytes = &bytes[size_of::()..]; let items = RoaringBitmap::deserialize_from(&bytes[..items_size])?; let bytes = &bytes[items_size..]; - let entry_points = ItemIds::from_bytes(&bytes[..bytes.len() - 1]); - let max_level = bytes[bytes.len() - 1]; + + let entry_points; + let max_level; + if bytes.is_empty() { + entry_points = ItemIds::from_slice(&[]); + max_level = 0; + } else { + entry_points = ItemIds::from_bytes(&bytes[..bytes.len() - 1]); + max_level = bytes[bytes.len() - 1]; + }; Ok(Metadata { dimensions, items, distance, entry_points, max_level }) } diff --git a/src/node.rs b/src/node.rs index 99e3359..09ddc50 100644 --- a/src/node.rs +++ b/src/node.rs @@ -189,11 +189,15 @@ impl fmt::Display for InvalidNodeDecoding { #[cfg(test)] mod tests { - use super::{Item, Links, Node, NodeCodec}; - use crate::{distance::Cosine, internals::UnalignedVector, Distance}; + use std::borrow::Cow; + use heed::{BytesDecode, BytesEncode}; use roaring::RoaringBitmap; - use std::borrow::Cow; + + use super::{Item, Links, Node, NodeCodec}; + use crate::distance::Cosine; + use crate::internals::UnalignedVector; + use crate::Distance; #[test] fn check_bytes_encode_decode() { diff --git a/src/parallel.rs b/src/parallel.rs index 6a35178..57b69b7 100644 --- a/src/parallel.rs +++ b/src/parallel.rs @@ -1,186 +1,112 @@ -use core::slice; use std::borrow::Cow; -use std::marker; -use hashbrown::HashMap; -use heed::types::Bytes; -use heed::{BytesDecode, RoTxn}; +use heed::{RoTxn, RwTxn, WithoutTls}; use roaring::RoaringBitmap; -use rustc_hash::FxBuildHasher; -use tracing::debug; - -use crate::internals::{Item, KeyCodec}; -use crate::key::{Prefix, PrefixCodec}; -use crate::node::{Links, Node, NodeCodec}; -use crate::progress::HannoyBuild; -use crate::writer::BuildOption; -use crate::{Database, Distance, ItemId, LayerId}; - -/// A struture used to keep a list of the item nodes in the graph. -/// -/// It is safe to share between threads as the pointer are pointing -/// in the mmapped file and the transaction is kept here and therefore -/// no longer touches the database. -pub struct ImmutableItems<'t, D> { - items: HashMap, - constant_length: Option, - _marker: marker::PhantomData<(&'t (), D)>, -} - -// NOTE: this previously took an arg `items: &RoaringBitmap` which corresponded to the `to_insert`. -// When building the hnsw in multiple dumps we need vecs from previous dumps in order to "glue" -// things together. -// To accomodate this we use a cursor over ALL Key::items in the db. -impl<'t, D: Distance> ImmutableItems<'t, D> { - /// Creates the structure by fetching all the item vector pointers - /// and keeping the transaction making the pointers valid. - /// Do not take more items than memory allows. - /// Remove from the list of candidates all the items that were selected and return them. - pub fn new

( - rtxn: &'t RoTxn, - database: Database, - index: u16, - options: &BuildOption

, - ) -> heed::Result - where - P: steppe::Progress, - { - debug!("fetching the pointers to the items from lmdb"); - options.progress.update(HannoyBuild::FetchItemPointers); - - let mut map = - HashMap::with_capacity_and_hasher(database.len(rtxn)? as usize, FxBuildHasher); - let mut constant_length = None; - - let cursor = database - .remap_types::() - .prefix_iter(rtxn, &Prefix::item(index))? - .remap_key_type::(); - - for res in cursor { - let (item_id, bytes) = res?; - assert_eq!(*constant_length.get_or_insert(bytes.len()), bytes.len()); - let ptr = bytes.as_ptr(); - map.insert(item_id.node.item, ptr); - } - - Ok(ImmutableItems { items: map, constant_length, _marker: marker::PhantomData }) - } - /// Returns the items identified by the given ID. - pub fn get(&self, item_id: ItemId) -> heed::Result>> { - let len = match self.constant_length { - Some(len) => len, - None => return Ok(None), - }; - let ptr = match self.items.get(&item_id) { - Some(ptr) => *ptr, - None => return Ok(None), - }; - - // safety: - // - ptr: The pointer comes from LMDB. Since the database cannot be written to, it is still valid. - // - len: All the items share the same dimensions and are the same size - let bytes = unsafe { slice::from_raw_parts(ptr, len) }; - NodeCodec::bytes_decode(bytes).map_err(heed::Error::Decoding).map(|node| node.item()) - } -} - -unsafe impl Sync for ImmutableItems<'_, D> {} +use crate::key::{Key, KeyCodec, Prefix, PrefixCodec}; +use crate::node::{Item, Links, Node}; +use crate::node_id::NodeId; +use crate::{Database, Distance, Error, ItemId, Result}; -/// A struture used to keep a list of all the links. -/// It is safe to share between threads as the pointers are pointing -/// in the mmapped file and the transaction is kept here and therefore -/// no longer touches the database. -pub struct ImmutableLinks<'t, D> { - links: HashMap<(u32, u8), (usize, *const u8), FxBuildHasher>, - _marker: marker::PhantomData<(&'t (), D)>, +pub(crate) struct FrozenReader<'t, D> { + rtxns_pool: crossbeam_channel::Receiver>, + rtxns: thread_local::ThreadLocal>, + index: u16, + database: Database, } -impl<'t, D: Distance> ImmutableLinks<'t, D> { - /// Creates the structure by fetching all the root pointers - /// and keeping the transaction making the pointers valid. - pub fn new

( - rtxn: &'t RoTxn, - database: Database, - index: u16, - nb_links: u64, - options: &BuildOption

, - ) -> heed::Result - where - P: steppe::Progress, - { - debug!("fetching the pointers to the links from lmdb"); - options.progress.update(HannoyBuild::FetchLinksPointers); - - let mut links = HashMap::with_capacity_and_hasher(nb_links as usize, FxBuildHasher); - - let iter = database - .remap_types::() - .prefix_iter(rtxn, &Prefix::links(index))? - .remap_key_type::(); +impl<'t, D: Distance> FrozenReader<'t, D> { + pub fn new(wtxn: &'t mut RwTxn<'_>, index: u16, database: Database) -> Result { + // We make sure to have one more thread so the current/main thread has a nested rtxn. + let num_threads = rayon::current_num_threads() + 1; + let (sender, rtxns_pool) = crossbeam_channel::bounded(num_threads); - for result in iter { - let (key, bytes) = result?; - let links_id = key.node.unwrap_node(); - links.insert(links_id, (bytes.len(), bytes.as_ptr())); + // Sequentially generate read transactions from the writer transaction + for _ in 0..num_threads { + let rtxn = wtxn.nested_read_txn()?; + sender.try_send(rtxn).unwrap(); } - Ok(ImmutableLinks { links, _marker: marker::PhantomData }) + Ok(Self { rtxns_pool, rtxns: thread_local::ThreadLocal::new(), index, database }) } - /// Returns the node identified by the given ID. - pub fn get(&self, item_id: ItemId, level: LayerId) -> heed::Result>> { - let key = (item_id, level); - let (ptr, len) = match self.links.get(&key) { - Some((len, ptr)) => (*ptr, *len), - None => return Ok(None), - }; - - // safety: - // - ptr: The pointer comes from LMDB. Since the database cannot be written to, it is still valid. - // - len: The len cannot change either - let bytes = unsafe { slice::from_raw_parts(ptr, len) }; - NodeCodec::bytes_decode(bytes) - .map_err(heed::Error::Decoding) - .map(|node: Node<'t, D>| node.links()) + pub fn item<'a>(&'a self, item_id: ItemId) -> Result> { + let rtxn = self.rtxns.get_or(|| self.rtxns_pool.try_recv().unwrap()); + let key = Key::item(self.index, item_id); + // key is a `Key::item` so returned result must be a Node::Item + self.database.get(rtxn, &key)?.and_then(|node| node.item()).ok_or(Error::missing_key(key)) } - pub fn iter( - &self, - ) -> impl Iterator)>> { - self.links.keys().map(|&k| { - let (item_id, level) = k; - match self.get(item_id, level) { - Ok(Some(Links { links })) => Ok((k, links)), - Ok(None) => { - unreachable!("link at level {level} with item_id {item_id} not found") - } - Err(e) => Err(e), - } - }) + pub fn links<'a>(&'a self, item_id: ItemId, level: usize) -> Result> { + let rtxn = self.rtxns.get_or(|| self.rtxns_pool.try_recv().unwrap()); + let key = Key::links(self.index, item_id, level as u8); + // key is a `Key::links` so returned result must be a Node::Links + self.database.get(rtxn, &key)?.and_then(|node| node.links()).ok_or(Error::missing_key(key)) } /// `Iter`s only over links in a given level - pub fn iter_layer( + pub fn iter_layer_links( &self, layer: u8, - ) -> impl Iterator)>> { - self.links.keys().filter_map(move |&k| { - let (item_id, level) = k; - if level != layer { - return None; - } - - match self.get(item_id, level) { - Ok(Some(Links { links })) => Some(Ok((k, links))), - Ok(None) => { - unreachable!("link at level {level} with item_id {item_id} not found") + ) -> heed::Result)>>> + { + let rtxn = self.rtxns.get_or(|| self.rtxns_pool.try_recv().unwrap()); + let prefix_key = Prefix::links(self.index); + + Ok(self + .database + .remap_key_type::() + .prefix_iter(rtxn, &prefix_key)? + .remap_key_type::() + .lazily_decode_data() + .filter_map(move |result| { + let (key, value) = match result { + Ok(value) => value, + Err(e) => return Some(Err(e)), + }; + + let Key { node: NodeId { item: item_id, layer: level, .. }, .. } = key; + + if level != layer { + return None; } - Err(e) => Some(Err(e)), - } - }) + + match value.decode() { + Ok(Node::Links(Links { links })) => Some(Ok(((item_id, level), links))), + Ok(Node::Item(_)) => { + unreachable!("link at level {level} with item_id {item_id} not found") + } + Err(e) => Some(Err(heed::Error::Decoding(e))), + } + })) } -} -unsafe impl Sync for ImmutableLinks<'_, D> {} + pub fn iter_links( + &self, + ) -> heed::Result)>>> + { + let rtxn = self.rtxns.get_or(|| self.rtxns_pool.try_recv().unwrap()); + let prefix_key = Prefix::links(self.index); + + Ok(self + .database + .remap_key_type::() + .prefix_iter(rtxn, &prefix_key)? + .remap_key_type::() + .map(move |result| { + let (key, value) = match result { + Ok(value) => value, + Err(e) => return Err(e), + }; + + let Key { node: NodeId { item: item_id, layer: level, .. }, .. } = key; + + match value { + Node::Links(Links { links }) => Ok(((item_id, level), links)), + Node::Item(_) => { + unreachable!("link at level {level} with item_id {item_id} not found") + } + } + })) + } +} diff --git a/src/progress.rs b/src/progress.rs index f05d9ca..2a843d4 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -2,11 +2,8 @@ use steppe::{make_atomic_progress, make_enum_progress}; make_enum_progress! { pub enum HannoyBuild { - RetrievingTheItemsIds, DeletingTheLinks, RetrieveTheUpdatedItems, - FetchItemPointers, - FetchLinksPointers, ResolveGraphEntryPoints, BuildingTheGraph, PatchOldNewDeletedLinks, diff --git a/src/python.rs b/src/python.rs index 03cef6e..fa3d172 100644 --- a/src/python.rs +++ b/src/python.rs @@ -1,17 +1,15 @@ //! Python bindings for hannoy. +use std::path::PathBuf; +use std::sync::LazyLock; + use heed::{RoTxn, RwTxn, WithoutTls}; use once_cell::sync::OnceCell; use parking_lot::{MappedMutexGuard, Mutex, MutexGuard}; -use pyo3::{ - exceptions::{PyIOError, PyRuntimeError}, - prelude::*, - types::PyType, -}; -use pyo3_stub_gen::{ - define_stub_info_gatherer, - derive::{gen_stub_pyclass, gen_stub_pyclass_enum, gen_stub_pymethods}, -}; -use std::{path::PathBuf, sync::LazyLock}; +use pyo3::exceptions::{PyIOError, PyRuntimeError}; +use pyo3::prelude::*; +use pyo3::types::PyType; +use pyo3_stub_gen::define_stub_info_gatherer; +use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pyclass_enum, gen_stub_pymethods}; use crate::{distance, Database, ItemId, Reader, Writer}; static DEFAULT_ENV_SIZE: usize = 1024 * 1024 * 1024; // 1GiB @@ -257,7 +255,8 @@ pub(super) struct PyWriter { impl PyWriter { fn build(&self) -> PyResult<()> { - use rand::{rngs::StdRng, SeedableRng}; + use rand::rngs::StdRng; + use rand::SeedableRng; let mut rng = StdRng::seed_from_u64(42); let mut wtxn = get_rw_txn()?; diff --git a/src/reader.rs b/src/reader.rs index 969b295..dfae889 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -25,13 +25,11 @@ const DEFAULT_EF_SEARCH: usize = 100; #[cfg(not(windows))] const READER_AVAILABLE_MEMORY: &str = "HANNOY_READER_PREFETCH_MEMORY"; -#[cfg(not(test))] -/// The threshold at which linear search is used instead of the HNSW algorithm. -const LINEAR_SEARCH_THRESHOLD: u64 = 1000; -#[cfg(test)] -/// Note that for tests purposes, we use set this threshold -/// to zero to make sure we test the HNSW algorithm. -const LINEAR_SEARCH_THRESHOLD: u64 = 0; +/// The default threshold at which linear search is used instead of the HNSW algorithm. +const DEFAULT_LINEAR_SCAN_THRESHOLD: usize = 1000; + +/// The default threshold ratio at which linear search is used instead of the HNSW algorithm. +const DEFAULT_LINEAR_SCAN_THRESHOLD_RATIO: f32 = 1.00; /// Container storing nearest neighbour search result #[derive(Debug)] @@ -64,6 +62,8 @@ pub struct QueryBuilder<'a, D: Distance> { candidates: Option<&'a RoaringBitmap>, count: usize, ef: usize, + linear_below: usize, + linear_below_ratio: f32, } impl<'a, D: Distance> QueryBuilder<'a, D> { @@ -218,6 +218,46 @@ impl<'a, D: Distance> QueryBuilder<'a, D> { self.ef = ef.max(self.count); self } + + /// Specify a threshold for the number of candidates below which a linear scan is used instead + /// of the HNSW algorithm. This can improve performance for small candidate sets. + /// + /// The default value is 1000. + /// + /// # Examples + /// + /// ```no_run + /// # use hannoy::{Reader, distances::Euclidean}; + /// # let (reader, rtxn): (Reader, heed::RoTxn) = todo!(); + /// reader.nns(20).linear_below(500).by_item(&rtxn, 6); + /// ``` + pub fn linear_below(&mut self, threshold: usize) -> &mut Self { + self.linear_below = threshold; + self + } + + /// Specify a threshold ratio for the number of candidates below which a linear scan is used + /// instead of the HNSW algorithm. This can improve performance for small candidate sets. + /// + /// The threshold ratio must be between 0.0 (inclusive) and 1.0 (inclusive). + /// The default value is 0.01. + /// + /// # Examples + /// + /// ```no_run + /// # use hannoy::{Reader, distances::Euclidean}; + /// # let (reader, rtxn): (Reader, heed::RoTxn) = todo!(); + /// reader.nns(20).linear_below_ratio(0.1).by_item(&rtxn, 6); + /// ``` + pub fn linear_below_ratio(&mut self, ratio: f32) -> &mut Self { + assert!( + (0.0..=1.0).contains(&ratio), + "linear scan threshold ratio must be between 0.0 and 1.0" + ); + + self.linear_below_ratio = ratio; + self + } } enum Completion { @@ -410,14 +450,15 @@ impl Reader { index: u16, metadata: &Metadata, ) -> Result<()> { - use crate::unaligned_vector::UnalignedVectorCodec; + use std::collections::VecDeque; + use std::sync::atomic::{AtomicUsize, Ordering}; use heed::types::Bytes; use madvise::AccessPattern; - use std::collections::VecDeque; - use std::sync::atomic::{AtomicUsize, Ordering}; use tracing::warn; + use crate::unaligned_vector::UnalignedVectorCodec; + let page_size = page_size::get(); let mut available_memory: usize = std::env::var(READER_AVAILABLE_MEMORY) .ok() @@ -568,7 +609,34 @@ impl Reader { /// /// You must provide the number of items you want to receive. pub fn nns(&self, count: usize) -> QueryBuilder<'_, D> { - QueryBuilder { reader: self, candidates: None, count, ef: DEFAULT_EF_SEARCH } + QueryBuilder { + reader: self, + candidates: None, + count, + ef: DEFAULT_EF_SEARCH, + linear_below: DEFAULT_LINEAR_SCAN_THRESHOLD, + linear_below_ratio: DEFAULT_LINEAR_SCAN_THRESHOLD_RATIO, + } + } + + fn should_linear_scan(&self, opt: &QueryBuilder) -> bool { + let all_ids = self.item_ids(); + if all_ids.is_empty() { + return false; + } + + let candidates = match opt.candidates { + Some(candidates) => candidates, + None => return false, + }; + + // We retrieve the subset of candidates that are actually + // part of the items in the database + let candidates_len = all_ids.intersection_len(candidates); + let is_below_threshold = candidates_len < opt.linear_below as u64; + let is_below_ratio = candidates_len as f32 / all_ids.len() as f32 <= opt.linear_below_ratio; + + is_below_threshold && is_below_ratio } fn nns_by_vec( @@ -580,13 +648,15 @@ impl Reader { ) -> Result>> { use Completion::*; + let item_ids = self.item_ids(); + // If we will never find any candidates, return an empty vector - if opt.candidates.is_some_and(|c| self.item_ids().is_disjoint(c)) { + if item_ids.is_empty() || opt.candidates.is_some_and(|c| item_ids.is_disjoint(c)) { return Ok(Done(Vec::new())); } // If the number of candidates is less than a given threshold, perform linear search - if let Some(candidates) = opt.candidates.filter(|c| c.len() < LINEAR_SEARCH_THRESHOLD) { + if let Some(candidates) = opt.candidates.filter(|_| self.should_linear_scan(opt)) { return self.brute_force_search(query, rtxn, candidates, opt.count, cancel_fn); } @@ -605,23 +675,39 @@ impl Reader { ) -> Result>> { use Completion::*; - let mut item_distances = Vec::with_capacity(candidates.len() as usize); + // We set the capacity to the maximum number of + // candidates we can return as it should be small enough. + let mut item_distances = BinaryHeap::<(OrderedFloat, _)>::with_capacity(count); + let mut cancelled = false; for item_id in candidates { if cancel_fn() { - return Ok(Cancelled(item_distances)); + cancelled = true; + break; } let Some(vector) = self.item_vector(rtxn, item_id)? else { continue }; let vector = UnalignedVector::from_vec(vector); let item = Item { header: D::new_header(&vector), vector }; let distance = D::distance(&item, query); - item_distances.push((item_id, distance)); + + // We make sure we maintain the number of items + // in the heap at a maximum of count elements. + if item_distances.len() >= count { + if let Some(mut peek) = item_distances.peek_mut() { + if peek.0 > OrderedFloat(distance) { + *peek = (OrderedFloat(distance), item_id); + } + } + } else { + item_distances.push((OrderedFloat(distance), item_id)); + } } - item_distances.sort_by_key(|(_, dist)| OrderedFloat(*dist)); - item_distances.truncate(count); - Ok(Done(item_distances)) + let item_distances = item_distances.into_sorted_vec(); + let output = item_distances.into_iter().map(|(OrderedFloat(d), i)| (i, d)).collect(); + + Ok(if cancelled { Cancelled(output) } else { Done(output) }) } /// Hnsw search according to arXiv:1603.09320. @@ -696,13 +782,13 @@ impl Reader { } visitor.eps = vec![id]; - visitor.ef = opt.count - neighbours.len(); + visitor.ef = opt.ef.saturating_sub(neighbours.len()); let more_nns = return_if_cancelled!(visitor.visit(query, self, rtxn, &mut path, cancel_fn)?); neighbours.extend(more_nns.into_iter()); - if neighbours.len() >= opt.count { + if neighbours.len() >= opt.ef { break; } } @@ -730,8 +816,10 @@ impl Reader { use Completion::*; let cancel_fn = &cancel_fn; + let item_ids = self.item_ids(); + // If we will never find any candidates, return none - if opt.candidates.is_some_and(|c| self.item_ids().is_disjoint(c)) { + if item_ids.is_empty() || opt.candidates.is_some_and(|c| item_ids.is_disjoint(c)) { return Ok(None); } @@ -740,7 +828,7 @@ impl Reader { let query = Item { header: D::new_header(&vector), vector }; // If the number of candidates is less than a given threshold, perform linear search - if let Some(candidates) = opt.candidates.filter(|c| c.len() < LINEAR_SEARCH_THRESHOLD) { + if let Some(candidates) = opt.candidates.filter(|_| self.should_linear_scan(opt)) { let nns = self.brute_force_search(&query, rtxn, candidates, opt.count, cancel_fn)?; return Ok(Some(nns)); } diff --git a/src/spaces/simple.rs b/src/spaces/simple.rs index fbf4fc2..dac121f 100644 --- a/src/spaces/simple.rs +++ b/src/spaces/simple.rs @@ -102,8 +102,8 @@ pub fn dot_product_non_optimized(u: &UnalignedVector, v: &UnalignedVector -2 /// 01 => 0 diff --git a/src/spaces/simple_neon.rs b/src/spaces/simple_neon.rs index d295555..9ed944f 100644 --- a/src/spaces/simple_neon.rs +++ b/src/spaces/simple_neon.rs @@ -1,8 +1,9 @@ -#[cfg(target_feature = "neon")] -use crate::unaligned_vector::UnalignedVector; use std::arch::aarch64::*; use std::ptr::read_unaligned; +#[cfg(target_feature = "neon")] +use crate::unaligned_vector::UnalignedVector; + #[cfg(target_feature = "neon")] pub(crate) unsafe fn euclid_similarity_neon( v1: &UnalignedVector, diff --git a/src/stats.rs b/src/stats.rs index 9f3a42e..e5a6bbb 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -2,11 +2,8 @@ use std::marker::PhantomData; use std::sync::atomic::{AtomicUsize, Ordering}; use hashbrown::HashMap; -use heed::{Result, RoTxn}; -use crate::key::{KeyCodec, Prefix, PrefixCodec}; -use crate::node::{Links, Node}; -use crate::{Database, Distance}; +use crate::Distance; // TODO: ignore the phantom #[derive(Debug)] @@ -15,8 +12,6 @@ pub(crate) struct BuildStats { pub n_links_added: AtomicUsize, /// a counter tracking how many times we hit lmdb pub lmdb_hits: AtomicUsize, - /// average rank of a node, calculated at the end of build - pub mean_degree: f32, /// number of elements per layer pub layer_dist: HashMap, @@ -28,7 +23,6 @@ impl BuildStats { BuildStats { n_links_added: AtomicUsize::new(0), lmdb_hits: AtomicUsize::new(0), - mean_degree: 0.0, layer_dist: HashMap::default(), _phantom: PhantomData, } @@ -41,36 +35,4 @@ impl BuildStats { pub fn incr_lmdb_hits(&self) { self.lmdb_hits.fetch_add(1, Ordering::Relaxed); } - - /// iterate over all links in db and average out node rank - pub fn compute_mean_degree( - &mut self, - rtxn: &RoTxn, - db: &Database, - index: u16, - ) -> Result<()> { - let iter = db - .remap_key_type::() - .prefix_iter(rtxn, &Prefix::links(index))? - .remap_key_type::(); - - let mut n_links = 0; - let mut total_links = 0; - - for res in iter { - let (_key, node) = res?; - - let links = match node { - Node::Links(Links { links }) => links, - Node::Item(_) => unreachable!("Node must not be an item"), - }; - - total_links += links.len(); - n_links += 1; - } - - self.mean_degree = (total_links as f32) / (n_links as f32); - - Ok(()) - } } diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 150c09c..14ebb10 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -7,7 +7,9 @@ use rand::distributions::Uniform; use rand::rngs::StdRng; use rand::{thread_rng, Rng, SeedableRng}; use tempfile::TempDir; -use tracing_subscriber::{fmt::layer, prelude::*, EnvFilter}; +use tracing_subscriber::fmt::layer; +use tracing_subscriber::prelude::*; +use tracing_subscriber::EnvFilter; use crate::version::VersionCodec; use crate::{Database, Distance, MetadataCodec, NodeCodec, NodeMode, Reader, Writer}; diff --git a/src/tests/reader.rs b/src/tests/reader.rs index 7b387e8..7bb5fa5 100644 --- a/src/tests/reader.rs +++ b/src/tests/reader.rs @@ -1,13 +1,13 @@ #[cfg(not(windows))] use proptest::prelude::*; -use rand::{rngs::StdRng, seq::SliceRandom, thread_rng, Rng, SeedableRng}; +use rand::rngs::StdRng; +use rand::seq::SliceRandom; +use rand::{thread_rng, Rng, SeedableRng}; use roaring::RoaringBitmap; -use crate::{ - distance::{BinaryQuantizedCosine, Cosine}, - tests::{create_database, create_database_indices_with_items, rng, DatabaseHandle}, - Reader, Writer, -}; +use crate::distance::{BinaryQuantizedCosine, Cosine}; +use crate::tests::{create_database, create_database_indices_with_items, rng, DatabaseHandle}; +use crate::{Reader, Writer}; const M: usize = 16; const M0: usize = 32; diff --git a/src/tests/snapshots/hannoy__tests__writer__write_and_update_lot_of_random_points_with_snapshot-2.snap b/src/tests/snapshots/hannoy__tests__writer__write_and_update_lot_of_random_points_with_snapshot-2.snap index 3da7532..d15cf2f 100644 --- a/src/tests/snapshots/hannoy__tests__writer__write_and_update_lot_of_random_points_with_snapshot-2.snap +++ b/src/tests/snapshots/hannoy__tests__writer__write_and_update_lot_of_random_points_with_snapshot-2.snap @@ -5,7 +5,7 @@ expression: handle ================== Dumping index 0 Root: Metadata { dimensions: 30, items: RoaringBitmap<100 values between 0 and 99>, distance: "euclidean", entry_points: [65], max_level: 6 } -Version: Version { major: 0, minor: 1, patch: 2 } +Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[34, 79, 92]> }) Links 1: Links(Links { links: RoaringBitmap<[45, 62, 75]> }) Links 2: Links(Links { links: RoaringBitmap<[3, 7, 45]> }) diff --git a/src/tests/snapshots/hannoy__tests__writer__write_and_update_lot_of_random_points_with_snapshot.snap b/src/tests/snapshots/hannoy__tests__writer__write_and_update_lot_of_random_points_with_snapshot.snap index fa0cbab..def494d 100644 --- a/src/tests/snapshots/hannoy__tests__writer__write_and_update_lot_of_random_points_with_snapshot.snap +++ b/src/tests/snapshots/hannoy__tests__writer__write_and_update_lot_of_random_points_with_snapshot.snap @@ -5,7 +5,7 @@ expression: handle ================== Dumping index 0 Root: Metadata { dimensions: 30, items: RoaringBitmap<100 values between 0 and 99>, distance: "euclidean", entry_points: [65], max_level: 6 } -Version: Version { major: 0, minor: 1, patch: 2 } +Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[7]> }) Links 1: Links(Links { links: RoaringBitmap<[45, 62, 75]> }) Links 2: Links(Links { links: RoaringBitmap<[7, 62, 98]> }) diff --git a/src/tests/writer.rs b/src/tests/writer.rs index 169e196..ea1916f 100644 --- a/src/tests/writer.rs +++ b/src/tests/writer.rs @@ -74,15 +74,15 @@ fn use_u32_max_minus_one_for_a_vec() { writer.builder(&mut rng()).build::(&mut wtxn).unwrap(); wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 3, items: RoaringBitmap<[4294967294]>, distance: "euclidean", entry_points: [4294967294], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 4294967294: Links(Links { links: RoaringBitmap<[]> }) Links 4294967294: Links(Links { links: RoaringBitmap<[]> }) Item 4294967294: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [0.0000, 1.0000, 2.0000] }) - "#); + "###); } #[test] @@ -95,15 +95,15 @@ fn use_u32_max_for_a_vec() { writer.builder(&mut rng()).build::(&mut wtxn).unwrap(); wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 3, items: RoaringBitmap<[4294967295]>, distance: "euclidean", entry_points: [4294967295], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 4294967295: Links(Links { links: RoaringBitmap<[]> }) Links 4294967295: Links(Links { links: RoaringBitmap<[]> }) Item 4294967295: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [0.0000, 1.0000, 2.0000] }) - "#); + "###); } #[test] @@ -116,15 +116,15 @@ fn write_one_vector() { writer.builder(&mut rng()).build::(&mut wtxn).unwrap(); wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 3, items: RoaringBitmap<[0]>, distance: "euclidean", entry_points: [0], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[]> }) Links 0: Links(Links { links: RoaringBitmap<[]> }) Item 0: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [0.0000, 1.0000, 2.0000] }) - "#); + "###); } #[test] @@ -166,43 +166,43 @@ fn write_multiple_indexes() { } wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 3, items: RoaringBitmap<[0]>, distance: "euclidean", entry_points: [0], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[]> }) Links 0: Links(Links { links: RoaringBitmap<[]> }) Item 0: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [0.0000, 1.0000, 2.0000] }) ================== Dumping index 1 Root: Metadata { dimensions: 3, items: RoaringBitmap<[0]>, distance: "euclidean", entry_points: [0], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[]> }) Links 0: Links(Links { links: RoaringBitmap<[]> }) Item 0: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [0.0000, 1.0000, 2.0000] }) ================== Dumping index 2 Root: Metadata { dimensions: 3, items: RoaringBitmap<[0]>, distance: "euclidean", entry_points: [0], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[]> }) Links 0: Links(Links { links: RoaringBitmap<[]> }) Item 0: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [0.0000, 1.0000, 2.0000] }) ================== Dumping index 3 Root: Metadata { dimensions: 3, items: RoaringBitmap<[0]>, distance: "euclidean", entry_points: [0], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[]> }) Links 0: Links(Links { links: RoaringBitmap<[]> }) Item 0: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [0.0000, 1.0000, 2.0000] }) ================== Dumping index 4 Root: Metadata { dimensions: 3, items: RoaringBitmap<[0]>, distance: "euclidean", entry_points: [0], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[]> }) Links 0: Links(Links { links: RoaringBitmap<[]> }) Item 0: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [0.0000, 1.0000, 2.0000] }) - "#); + "###); } #[test] @@ -385,11 +385,11 @@ fn overwrite_one_item_incremental() { writer.builder(&mut rng).build::(&mut wtxn).unwrap(); wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 2, items: RoaringBitmap<[0, 1, 2, 3, 4, 5]>, distance: "euclidean", entry_points: [0, 2, 3], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[1, 2]> }) Links 0: Links(Links { links: RoaringBitmap<[2]> }) Links 1: Links(Links { links: RoaringBitmap<[0, 2]> }) @@ -405,7 +405,7 @@ fn overwrite_one_item_incremental() { Item 3: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [3.0000, 0.0000] }) Item 4: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [4.0000, 0.0000] }) Item 5: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [5.0000, 0.0000] }) - "#); + "###); let mut wtxn = handle.env.write_txn().unwrap(); let writer = Writer::new(handle.database, 0, 2); @@ -414,11 +414,11 @@ fn overwrite_one_item_incremental() { writer.builder(&mut rng).build::(&mut wtxn).unwrap(); wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 2, items: RoaringBitmap<[0, 1, 2, 3, 4, 5]>, distance: "euclidean", entry_points: [0, 2, 3], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[1, 2]> }) Links 0: Links(Links { links: RoaringBitmap<[2]> }) Links 1: Links(Links { links: RoaringBitmap<[0, 2]> }) @@ -434,7 +434,7 @@ fn overwrite_one_item_incremental() { Item 3: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [6.0000, 0.0000] }) Item 4: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [4.0000, 0.0000] }) Item 5: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [5.0000, 0.0000] }) - "#); + "###); } // NOTE: this will fail while our deletions aren't properly handled @@ -449,15 +449,15 @@ fn delete_one_item_in_a_one_item_db() { writer.builder(&mut rng).build::(&mut wtxn).unwrap(); wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 2, items: RoaringBitmap<[0]>, distance: "euclidean", entry_points: [0], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[]> }) Links 0: Links(Links { links: RoaringBitmap<[]> }) Item 0: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [0.0000, 0.0000] }) - "#); + "###); // new transaction for the delete let mut wtxn = handle.env.write_txn().unwrap(); @@ -467,12 +467,12 @@ fn delete_one_item_in_a_one_item_db() { writer.builder(&mut rng).build::(&mut wtxn).unwrap(); wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 2, items: RoaringBitmap<[]>, distance: "euclidean", entry_points: [], max_level: 0 } - Version: Version { major: 0, minor: 1, patch: 2 } - "#); + Version: Version { major: 0, minor: 1, patch: 6 } + "###); let rtxn = handle.env.read_txn().unwrap(); let one_reader = Reader::open(&rtxn, 0, handle.database).unwrap(); @@ -493,15 +493,15 @@ fn delete_document_in_an_empty_index_74() { wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 2, items: RoaringBitmap<[0]>, distance: "euclidean", entry_points: [0], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[]> }) Links 0: Links(Links { links: RoaringBitmap<[]> }) Item 0: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [0.0000, 0.0000] }) - "#); + "###); let mut wtxn = handle.env.write_txn().unwrap(); @@ -525,16 +525,16 @@ fn delete_document_in_an_empty_index_74() { wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 2, items: RoaringBitmap<[]>, distance: "euclidean", entry_points: [], max_level: 0 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } ================== Dumping index 1 Root: Metadata { dimensions: 2, items: RoaringBitmap<[]>, distance: "euclidean", entry_points: [], max_level: 0 } - Version: Version { major: 0, minor: 1, patch: 2 } - "#); + Version: Version { major: 0, minor: 1, patch: 6 } + "###); let rtxn = handle.env.read_txn().unwrap(); let reader = Reader::open(&rtxn, 1, handle.database).unwrap(); @@ -559,15 +559,15 @@ fn delete_one_item_in_a_single_document_database() { writer.builder(&mut rng).build::(&mut wtxn).unwrap(); wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 2, items: RoaringBitmap<[0]>, distance: "cosine", entry_points: [0], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[]> }) Links 0: Links(Links { links: RoaringBitmap<[]> }) Item 0: Item(Item { header: NodeHeaderCosine { norm: "0.0000" }, vector: [0.0000, 0.0000] }) - "#); + "###); let mut wtxn = handle.env.write_txn().unwrap(); let writer = Writer::new(handle.database, 0, 2); @@ -577,12 +577,12 @@ fn delete_one_item_in_a_single_document_database() { writer.builder(&mut rng).build::(&mut wtxn).unwrap(); wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 2, items: RoaringBitmap<[]>, distance: "cosine", entry_points: [], max_level: 0 } - Version: Version { major: 0, minor: 1, patch: 2 } - "#); + Version: Version { major: 0, minor: 1, patch: 6 } + "###); } #[test] @@ -599,11 +599,11 @@ fn delete_one_item() { writer.builder(&mut rng).build::<3, 3>(&mut wtxn).unwrap(); wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 2, items: RoaringBitmap<[0, 1, 2, 3, 4, 5]>, distance: "euclidean", entry_points: [0, 2, 3], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[1, 2]> }) Links 0: Links(Links { links: RoaringBitmap<[2]> }) Links 1: Links(Links { links: RoaringBitmap<[0, 2]> }) @@ -619,7 +619,7 @@ fn delete_one_item() { Item 3: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [3.0000, 0.0000] }) Item 4: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [4.0000, 0.0000] }) Item 5: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [5.0000, 0.0000] }) - "#); + "###); let mut wtxn = handle.env.write_txn().unwrap(); let writer = Writer::new(handle.database, 0, 2); @@ -629,11 +629,11 @@ fn delete_one_item() { writer.builder(&mut rng).build::<3, 3>(&mut wtxn).unwrap(); wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 2, items: RoaringBitmap<[0, 1, 2, 4, 5]>, distance: "euclidean", entry_points: [0, 1, 2], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[1]> }) Links 0: Links(Links { links: RoaringBitmap<[1, 2]> }) Links 1: Links(Links { links: RoaringBitmap<[0, 2]> }) @@ -647,7 +647,7 @@ fn delete_one_item() { Item 2: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [2.0000, 0.0000] }) Item 4: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [4.0000, 0.0000] }) Item 5: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [5.0000, 0.0000] }) - "#); + "###); // delete another one let mut wtxn = handle.env.write_txn().unwrap(); @@ -658,11 +658,11 @@ fn delete_one_item() { writer.builder(&mut rng).build::<3, 3>(&mut wtxn).unwrap(); wtxn.commit().unwrap(); - insta::assert_snapshot!(handle, @r#" + insta::assert_snapshot!(handle, @r###" ================== Dumping index 0 Root: Metadata { dimensions: 2, items: RoaringBitmap<[0, 2, 4, 5]>, distance: "euclidean", entry_points: [0, 2, 4], max_level: 1 } - Version: Version { major: 0, minor: 1, patch: 2 } + Version: Version { major: 0, minor: 1, patch: 6 } Links 0: Links(Links { links: RoaringBitmap<[0, 2]> }) Links 0: Links(Links { links: RoaringBitmap<[0, 2]> }) Links 2: Links(Links { links: RoaringBitmap<[0, 2, 4]> }) @@ -674,7 +674,7 @@ fn delete_one_item() { Item 2: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [2.0000, 0.0000] }) Item 4: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [4.0000, 0.0000] }) Item 5: Item(Item { header: NodeHeaderEuclidean { bias: "0.0000" }, vector: [5.0000, 0.0000] }) - "#); + "###); } #[test] diff --git a/src/unaligned_vector/binary.rs b/src/unaligned_vector/binary.rs index 417a1cb..a0eeb53 100644 --- a/src/unaligned_vector/binary.rs +++ b/src/unaligned_vector/binary.rs @@ -101,7 +101,7 @@ unsafe fn from_slice_neon(slice: &[f32]) -> Vec { // The size of the returned vector must be a multiple of a word let remaining = slice.len() % PACKED_WORD_BYTES; let mut len = iterations; - if len % PACKED_WORD_BYTES != 0 { + if !len.is_multiple_of(PACKED_WORD_BYTES) { len += PACKED_WORD_BYTES - len % PACKED_WORD_BYTES; } else if remaining != 0 { // if we generated a valid number of Word but we're missing a few bits diff --git a/src/unaligned_vector/binary_quantized.rs b/src/unaligned_vector/binary_quantized.rs index 66e1e40..af66e13 100644 --- a/src/unaligned_vector/binary_quantized.rs +++ b/src/unaligned_vector/binary_quantized.rs @@ -98,7 +98,7 @@ unsafe fn from_slice_neon(slice: &[f32]) -> Vec { // The size of the returned vector must be a multiple of a word let remaining = slice.len() % QUANTIZED_WORD_BYTES; let mut len = iterations; - if len % QUANTIZED_WORD_BYTES != 0 { + if !len.is_multiple_of(QUANTIZED_WORD_BYTES) { len += QUANTIZED_WORD_BYTES - len % QUANTIZED_WORD_BYTES; } else if remaining != 0 { // if we generated a valid number of Word but we're missing a few bits diff --git a/src/unaligned_vector/f32.rs b/src/unaligned_vector/f32.rs index 60a2f6d..78a4537 100644 --- a/src/unaligned_vector/f32.rs +++ b/src/unaligned_vector/f32.rs @@ -1,7 +1,5 @@ -use std::{ - borrow::Cow, - mem::{size_of, transmute}, -}; +use std::borrow::Cow; +use std::mem::{size_of, transmute}; use bytemuck::cast_slice; use byteorder::{ByteOrder, NativeEndian}; diff --git a/src/unaligned_vector/mod.rs b/src/unaligned_vector/mod.rs index 1b6ec22..e0d3d7c 100644 --- a/src/unaligned_vector/mod.rs +++ b/src/unaligned_vector/mod.rs @@ -1,13 +1,10 @@ -use std::{ - borrow::{Borrow, Cow}, - fmt, - marker::PhantomData, - mem::transmute, -}; +use std::borrow::{Borrow, Cow}; +use std::fmt; +use std::marker::PhantomData; +use std::mem::transmute; pub use binary::Binary; pub use binary_quantized::BinaryQuantized; - use bytemuck::pod_collect_to_vec; mod binary; diff --git a/src/update_status.rs b/src/update_status.rs new file mode 100644 index 0000000..1961e6b --- /dev/null +++ b/src/update_status.rs @@ -0,0 +1,47 @@ +use std::borrow::Cow; +use std::fmt; + +use heed::BoxedError; + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[repr(u8)] +pub enum UpdateStatus { + Updated = 0, + Removed = 1, +} + +pub enum UpdateStatusCodec {} + +impl heed::BytesEncode<'_> for UpdateStatusCodec { + type EItem = UpdateStatus; + + fn bytes_encode(item: &'_ Self::EItem) -> Result, BoxedError> { + Ok(Cow::Owned(vec![*item as u8])) + } +} + +impl heed::BytesDecode<'_> for UpdateStatusCodec { + type DItem = UpdateStatus; + + fn bytes_decode(bytes: &'_ [u8]) -> Result { + match bytes { + [0] => Ok(UpdateStatus::Updated), + [1] => Ok(UpdateStatus::Removed), + _ => Err(Box::new(InvalidUpdateStatusDecoding { unknown_tag: bytes.to_vec() })), + } + } +} + +#[derive(Debug, thiserror::Error)] +struct InvalidUpdateStatusDecoding { + unknown_tag: Vec, +} + +impl fmt::Display for InvalidUpdateStatusDecoding { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.unknown_tag[..] { + bytes @ [_, ..] => write!(f, "Invalid update status decoding: unknown tag {bytes:?}"), + [] => write!(f, "Invalid update status decoding: empty array of bytes"), + } + } +} diff --git a/src/version.rs b/src/version.rs index 97a759a..2c37af6 100644 --- a/src/version.rs +++ b/src/version.rs @@ -1,5 +1,6 @@ +use std::borrow::Cow; +use std::fmt; use std::mem::size_of; -use std::{borrow::Cow, fmt}; use byteorder::{BigEndian, ByteOrder}; use heed::BoxedError; diff --git a/src/writer.rs b/src/writer.rs index 95be453..e08a3ab 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -1,22 +1,22 @@ use std::any::TypeId; use std::path::PathBuf; -use heed::types::{DecodeIgnore, Unit}; +use heed::types::DecodeIgnore; use heed::{PutFlags, RoTxn, RwTxn}; use rand::{Rng, SeedableRng}; use roaring::RoaringBitmap; use steppe::NoProgress; -use tracing::{debug, error, info}; +use tracing::{debug, error}; use crate::distance::Distance; use crate::hnsw::HnswBuilder; use crate::internals::KeyCodec; use crate::item_iter::ItemIter; -use crate::node::{Item, ItemIds, Links, NodeCodec}; -use crate::parallel::{ImmutableItems, ImmutableLinks}; +use crate::node::{Item, ItemIds, NodeCodec}; use crate::progress::HannoyBuild; use crate::reader::get_item; use crate::unaligned_vector::UnalignedVector; +use crate::update_status::{UpdateStatus, UpdateStatusCodec}; use crate::version::{Version, VersionCodec}; use crate::{ Database, Error, ItemId, Key, Metadata, MetadataCodec, Node, Prefix, PrefixCodec, Result, @@ -343,10 +343,10 @@ impl Writer { // We mark all the items as updated so // the Writer::build method can handle them. for item in new_items { - self.database.remap_data_type::().put( + self.database.remap_data_type::().put( wtxn, &Key::updated(self.index, item), - &(), + &UpdateStatus::Updated, )?; } @@ -397,8 +397,11 @@ impl Writer { drop(cursor); for item in updated_items { - let key = Key::updated(self.index, item); - self.database.remap_types::().put(wtxn, &key, &())?; + self.database.remap_types::().put( + wtxn, + &Key::updated(self.index, item), + &UpdateStatus::Updated, + )?; } } @@ -467,7 +470,11 @@ impl Writer { let vector = UnalignedVector::from_slice(vector); let db_item = Item { header: D::new_header(&vector), vector }; self.database.put(wtxn, &Key::item(self.index, item), &Node::Item(db_item))?; - self.database.remap_data_type::().put(wtxn, &Key::updated(self.index, item), &())?; + self.database.remap_data_type::().put( + wtxn, + &Key::updated(self.index, item), + &UpdateStatus::Updated, + )?; Ok(()) } @@ -475,10 +482,10 @@ impl Writer { /// Deletes an item stored in this database and returns `true` if it existed. pub fn del_item(&self, wtxn: &mut RwTxn, item: ItemId) -> Result { if self.database.delete(wtxn, &Key::item(self.index, item))? { - self.database.remap_data_type::().put( + self.database.remap_data_type::().put( wtxn, &Key::updated(self.index, item), - &(), + &UpdateStatus::Removed, )?; Ok(true) @@ -521,17 +528,29 @@ impl Writer { R: Rng + SeedableRng, P: steppe::Progress, { - let item_indices = self.item_indices(wtxn, options)?; + // Get the list of items we already registered in the metadata + let indexed_items = self + .database + .remap_data_type::() + .get(wtxn, &Key::metadata(self.index))? + .map_or_else(RoaringBitmap::default, |m| m.items); // In case we have to rebuild all links we can skip the deletion step. - let (to_delete, to_insert) = if options.relink_all_items { - (RoaringBitmap::new(), item_indices.clone()) + let (item_indices, to_delete, to_insert) = if options.relink_all_items { + (indexed_items.clone(), RoaringBitmap::new(), indexed_items) } else { // updated items can be an update, an addition or a removed item - let updated_items = self.reset_and_retrieve_updated_items(wtxn, options)?; - let to_delete = updated_items.clone() - &item_indices; - let to_insert = &item_indices & &updated_items; - (to_delete, to_insert) + // they are identified by a "updated" stone key + let (all_updated_items, deleted_items) = + self.reset_and_retrieve_updated_items(wtxn, options)?; + + // Item indices corresponds to all items, known ones and updates ones + let updated_items = &all_updated_items - &deleted_items; + let item_indices = (&updated_items | indexed_items) - &deleted_items; + + let to_delete = all_updated_items.clone() - &item_indices; + let to_insert = &item_indices & all_updated_items; + (item_indices, to_delete, to_insert) }; let metadata = self @@ -553,7 +572,7 @@ impl Writer { let stats = hnsw.build(to_insert, &to_delete, self.database, self.index, wtxn, rng, options)?; - info!("{stats:?}"); + debug!("{stats:?}"); // Remove deleted links from lmdb AFTER build; in DiskANN we use a deleted item's // neighbours when filling in the "gaps" left in the graph from deletions. See @@ -563,17 +582,16 @@ impl Writer { debug!("write the metadata..."); options.progress.update(HannoyBuild::WriteTheMetadata); - let metadata = Metadata { - dimensions: self.dimensions.try_into().unwrap(), - items: item_indices, - entry_points: ItemIds::from_slice(&hnsw.entry_points), - max_level: hnsw.max_level as u8, - distance: D::name(), - }; self.database.remap_data_type::().put( wtxn, &Key::metadata(self.index), - &metadata, + &Metadata { + dimensions: self.dimensions.try_into().unwrap(), + items: item_indices, + entry_points: ItemIds::from_slice(&hnsw.entry_points), + max_level: hnsw.max_level as u8, + distance: D::name(), + }, )?; self.database.remap_data_type::().put( wtxn, @@ -612,24 +630,23 @@ impl Writer { .get(wtxn, &Key::metadata(self.index))? .expect("The metadata must be there"); - // 3. delete metadata - self.database.delete(wtxn, &Key::metadata(self.index))?; - - // 4. delete version - self.database.delete(wtxn, &Key::version(self.index))?; - - // 5. delete all links + // 3. delete all links self.delete_links_from_db(&item_ids, wtxn, options)?; - // 5. trigger build + // 4. trigger build self.build::(wtxn, rng, options) } + /// Removes all the "updated" stones from the database + /// and returns the list of updated and deleted items. + /// + /// The updated items corresponds to all the items modified, inserted or deleted. + /// The deleted items corresponds to all the items deleted. fn reset_and_retrieve_updated_items

( &self, wtxn: &mut RwTxn, options: &BuildOption

, - ) -> Result + ) -> Result<(RoaringBitmap, RoaringBitmap), Error> where P: steppe::Progress, { @@ -637,14 +654,15 @@ impl Writer { options.progress.update(HannoyBuild::RetrieveTheUpdatedItems); let mut updated_items = RoaringBitmap::new(); + let mut deleted_items = RoaringBitmap::new(); let mut updated_iter = self .database - .remap_types::() + .remap_types::() .prefix_iter_mut(wtxn, &Prefix::updated(self.index))? .remap_key_type::(); let mut index = 0; - while let Some((key, _)) = updated_iter.next().transpose()? { + while let Some((key, update_status)) = updated_iter.next().transpose()? { if index % CANCELLATION_PROBING == 0 && (options.cancel)() { return Err(Error::BuildCancelled); } @@ -652,6 +670,11 @@ impl Writer { let inserted = updated_items.insert(key.node.item); debug_assert!(inserted, "The keys should be sorted by LMDB"); + if update_status == UpdateStatus::Removed { + let inserted = deleted_items.insert(key.node.item); + debug_assert!(inserted, "The keys should be sorted by LMDB"); + } + // SAFETY: Safe because we don't hold any reference to the database currently let did_delete = unsafe { updated_iter.del_current()? }; if !did_delete { @@ -660,34 +683,8 @@ impl Writer { index += 1; } - Ok(updated_items) - } - - // Fetches the item's ids, not the links. - fn item_indices

(&self, wtxn: &mut RwTxn, options: &BuildOption

) -> Result - where - P: steppe::Progress, - { - debug!("started retrieving all the items ids..."); - options.progress.update(HannoyBuild::RetrievingTheItemsIds); - - let mut indices = RoaringBitmap::new(); - for (index, result) in self - .database - .remap_types::() - .prefix_iter(wtxn, &Prefix::item(self.index))? - .remap_key_type::() - .enumerate() - { - if index % CANCELLATION_PROBING == 0 && (options.cancel)() { - return Err(Error::BuildCancelled); - } - - let (i, _) = result?; - indices.insert(i.node.unwrap_item()); - } - Ok(indices) + Ok((updated_items, deleted_items)) } // Iterates over links in lmdb and deletes those in `to_delete`. There can be several links @@ -721,27 +718,6 @@ impl Writer { } } -#[derive(Clone)] -pub(crate) struct FrozenReader<'a, D: Distance> { - pub index: u16, - pub items: &'a ImmutableItems<'a, D>, - pub links: &'a ImmutableLinks<'a, D>, -} - -impl<'a, D: Distance> FrozenReader<'a, D> { - pub fn get_item(&self, item_id: ItemId) -> Result> { - let key = Key::item(self.index, item_id); - // key is a `Key::item` so returned result must be a Node::Item - self.items.get(item_id)?.ok_or(Error::missing_key(key)) - } - - pub fn get_links(&self, item_id: ItemId, level: usize) -> Result> { - let key = Key::links(self.index, item_id, level as u8); - // key is a `Key::item` so returned result must be a Node::Item - self.links.get(item_id, level as u8)?.ok_or(Error::missing_key(key)) - } -} - /// Clears all the links. Starts from the last node and stops at the first item. fn clear_links(wtxn: &mut RwTxn, database: Database, index: u16) -> Result<()> { let mut cursor = database