Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ad16bd8
Remove a first batch of useless writes to tmp files and LMDB
irevoire May 28, 2025
e9d2c64
update the tmp_nodes to be readable
irevoire May 28, 2025
20b891e
Parallelize the split of large descendants
irevoire Jun 10, 2025
fe56192
randomize the item selection
irevoire Jun 10, 2025
ae9c4c2
get rids of the remap method on the tmp files
irevoire Jun 11, 2025
c118447
makes the clippy gods happy
irevoire Jun 11, 2025
dc14f4b
fmt
irevoire Jun 11, 2025
9c70222
Make the deletion of items faster
irevoire Jun 12, 2025
5a6dcb3
Merge with main, it was too complex to rebase
irevoire Jun 12, 2025
ae21f60
fmt
irevoire Jun 12, 2025
9c29c04
make clippy happy
irevoire Jun 12, 2025
a16fbdd
share the code between the writer and the incremental indexing. That …
irevoire Jun 16, 2025
d4d646d
handle error properly in the rayon pool
irevoire Jun 16, 2025
c1874ff
use try_send instead on send in case two errors happens at the same m…
irevoire Jun 16, 2025
2face0e
fmt+lints
irevoire Jun 16, 2025
5b8efc0
remove debug prints
irevoire Jun 17, 2025
e9209c2
remove new_fit_in_memory
irevoire Jun 17, 2025
af2cdca
fix panic when we have low memory
irevoire Jun 17, 2025
ecf0df4
check for error when inserting elements in a tree
irevoire Jun 17, 2025
5262b08
fix multiple bugs
irevoire Jun 17, 2025
fa62650
fix a new bug and add unit tests on fit_in_memory
irevoire Jun 17, 2025
3a94056
Add a large test with no memory to force the full indexing process to…
irevoire Jun 17, 2025
ccdafc2
fmt
irevoire Jun 17, 2025
29bc623
Update src/writer.rs
irevoire Jun 18, 2025
e0d18a0
fmt
irevoire Jun 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ thiserror = "2.0.9"
nohash = "0.2.0"
page_size = "0.6.0"
enum-iterator = "2.1.0"
thread_local = "1.1.8"
crossbeam = "0.8.4"

[dev-dependencies]
anyhow = "1.0.95"
Expand Down
2 changes: 1 addition & 1 deletion src/distance/cosine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub enum Cosine {}
#[repr(C)]
#[derive(Pod, Zeroable, Clone, Copy)]
pub struct NodeHeaderCosine {
norm: f32,
pub(crate) norm: f32,
}
impl fmt::Debug for NodeHeaderCosine {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
4 changes: 4 additions & 0 deletions src/distance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ pub trait Distance: Send + Sync + Sized + Clone + fmt::Debug + 'static {
) -> heed::Result<()> {
Ok(())
}

fn size_of_item(dimensions: usize) -> usize {
std::mem::size_of::<Self::Header>() + Self::VectorCodec::size_of_item(dimensions)
}
}

fn two_means<D: Distance, R: Rng>(
Expand Down
2 changes: 1 addition & 1 deletion src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl<'a> heed::BytesEncode<'a> for MetadataCodec {

fn bytes_encode(item: &'a Self::EItem) -> Result<Cow<'a, [u8]>, BoxedError> {
let Metadata { dimensions, items, roots, distance } = item;
debug_assert!(!distance.as_bytes().iter().any(|&b| b == 0));
debug_assert!(!distance.as_bytes().contains(&0));

let mut output = Vec::with_capacity(
size_of::<u32>()
Expand Down
36 changes: 35 additions & 1 deletion src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ pub enum Node<'a, D: Distance> {
SplitPlaneNormal(SplitPlaneNormal<'a, D>),
}

impl<'a, D: Distance> Node<'a, D> {
pub fn into_owned(self) -> Node<'static, D> {
match self {
Node::Leaf(leaf) => Node::Leaf(leaf.into_owned()),
Node::Descendants(descendants) => Node::Descendants(Descendants {
descendants: Cow::Owned(descendants.descendants.into_owned()),
}),
Node::SplitPlaneNormal(split_plane_normal) => {
Node::SplitPlaneNormal(split_plane_normal.into_owned())
}
}
}
}

/// A node generic over the version of the database.
/// Should only be used while reading from the database.
#[derive(Clone, Debug)]
Expand All @@ -40,8 +54,15 @@ impl<'a, D: Distance> Node<'a, D> {
None
}
}
}

pub fn descendants(self) -> Option<Descendants<'a>> {
if let Node::Descendants(descendants) = self {
Some(descendants)
} else {
None
}
}
}
/// A leaf node which corresponds to the vector inputed
/// by the user and the distance header.
pub struct Leaf<'a, D: Distance> {
Expand Down Expand Up @@ -142,6 +163,19 @@ impl<D: Distance> fmt::Debug for SplitPlaneNormal<'_, D> {
}
}

impl<D: Distance> SplitPlaneNormal<'_, D> {
pub fn into_owned(self) -> SplitPlaneNormal<'static, D> {
SplitPlaneNormal {
left: self.left,
right: self.right,
normal: self.normal.map(|normal| Leaf {
header: normal.header,
vector: Cow::Owned(normal.vector.into_owned()),
}),
}
}
}

impl<D: Distance> Clone for SplitPlaneNormal<'_, D> {
fn clone(&self) -> Self {
Self { left: self.left, right: self.right, normal: self.normal.clone() }
Expand Down
191 changes: 122 additions & 69 deletions src/parallel.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::slice;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::marker;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
Expand All @@ -15,40 +15,109 @@ use roaring::{RoaringBitmap, RoaringTreemap};

use crate::internals::{KeyCodec, Leaf, NodeCodec};
use crate::key::{Key, Prefix, PrefixCodec};
use crate::node::{Node, SplitPlaneNormal};
use crate::node::Node;
use crate::{Database, Distance, Error, ItemId, Result};

#[derive(Default, Debug)]
enum TmpNodesState {
Writing(BufWriter<File>),
Reading(BufReader<File>),
// Ugly trick because otherwise I can't take the value out of the enum. Can we do better?
// The enum should never be let in this state.
#[default]
Invalid,
}

impl TmpNodesState {
pub fn write_all(&mut self, bytes: &[u8]) -> Result<()> {
let this = match std::mem::take(self) {
TmpNodesState::Writing(mut writer) => {
writer.write_all(bytes)?;
TmpNodesState::Writing(writer)
}
TmpNodesState::Reading(reader) => {
let mut writer = BufWriter::new(reader.into_inner());
writer.write_all(bytes)?;
TmpNodesState::Writing(writer)
}
TmpNodesState::Invalid => unreachable!(),
};

debug_assert!(!matches!(this, TmpNodesState::Invalid));

*self = this;
Ok(())
}

pub fn read_all(&mut self, (start, end): (usize, usize)) -> Result<Vec<u8>> {
debug_assert!(start < end);
let mut buffer = vec![0; end - start];

let this = match std::mem::take(self) {
TmpNodesState::Writing(mut writer) => {
writer.flush()?;
let mut reader = BufReader::new(writer.into_inner().expect("Could not convert the writer to a file even thought it was flushed right before"));
reader.seek(SeekFrom::Start(start as u64))?;
reader.read_exact(&mut buffer)?;
TmpNodesState::Reading(reader)
}
TmpNodesState::Reading(mut reader) => {
reader.seek(SeekFrom::Start(start as u64))?;
reader.read_exact(&mut buffer)?;
TmpNodesState::Reading(reader)
}
TmpNodesState::Invalid => unreachable!(),
};

debug_assert!(!matches!(this, TmpNodesState::Invalid));

*self = this;
Ok(buffer)
}

pub fn into_inner(self) -> Result<File> {
let file = match self {
TmpNodesState::Writing(mut writer) => {
writer.flush()?;
writer.into_inner().expect("Could not convert the writer to a file even thought it was flushed right before")
}
TmpNodesState::Reading(reader) => reader.into_inner(),
TmpNodesState::Invalid => unreachable!(),
};
Ok(file)
}
}

/// A structure to store the tree nodes out of the heed database.
/// You should avoid alternating between writing and reading as it flushes the file on each operation and lose the read buffer.
/// The structure is optimized for reading the last written nodes.
pub struct TmpNodes<DE> {
file: BufWriter<File>,
file: TmpNodesState,
ids: Vec<ItemId>,
bounds: Vec<usize>,
deleted: RoaringBitmap,
remap_ids: IntMap<ItemId, ItemId>,
_marker: marker::PhantomData<DE>,
}

impl<'a, DE: BytesEncode<'a>> TmpNodes<DE> {
impl<'a, D: Distance> TmpNodes<D> {
/// Creates an empty `TmpNodes`.
pub fn new() -> heed::Result<TmpNodes<DE>> {
pub fn new() -> heed::Result<TmpNodes<D>> {
Ok(TmpNodes {
file: tempfile::tempfile().map(BufWriter::new)?,
file: TmpNodesState::Writing(tempfile::tempfile().map(BufWriter::new)?),
ids: Vec::new(),
bounds: vec![0],
deleted: RoaringBitmap::new(),
remap_ids: IntMap::default(),
_marker: marker::PhantomData,
})
}

/// Creates an empty `TmpNodes` in the defined folder.
pub fn new_in(path: &Path) -> heed::Result<TmpNodes<DE>> {
pub fn new_in(path: &Path) -> heed::Result<TmpNodes<D>> {
Ok(TmpNodes {
file: tempfile::tempfile_in(path).map(BufWriter::new)?,
file: TmpNodesState::Writing(tempfile::tempfile_in(path).map(BufWriter::new)?),
ids: Vec::new(),
bounds: vec![0],
deleted: RoaringBitmap::new(),
remap_ids: IntMap::default(),
_marker: marker::PhantomData,
})
}
Expand All @@ -59,10 +128,10 @@ impl<'a, DE: BytesEncode<'a>> TmpNodes<DE> {
// TODO move that in the type
&mut self,
item: ItemId,
data: &'a DE::EItem,
) -> heed::Result<()> {
data: &'a Node<D>,
) -> Result<()> {
assert!(item != ItemId::MAX);
let bytes = DE::bytes_encode(data).map_err(heed::Error::Encoding)?;
let bytes = NodeCodec::bytes_encode(data).map_err(heed::Error::Encoding)?;
self.file.write_all(&bytes)?;
let last_bound = self.bounds.last().unwrap();
self.bounds.push(last_bound + bytes.len());
Expand All @@ -74,13 +143,18 @@ impl<'a, DE: BytesEncode<'a>> TmpNodes<DE> {
Ok(())
}

/// Remap the item id of an already inserted node to another node.
///
/// Only applies to the nodes to insert. It won't interact with the to_delete nodes.
pub fn remap(&mut self, current: ItemId, new: ItemId) {
if current != new {
self.remap_ids.insert(current, new);
}
/// Get the node at the given item id.
/// Ignore the remapped ids and deletions, only suitable when appending to the file.
/// A flush will be executed on the file if the previous operation was a write.
pub fn get(&mut self, item: ItemId) -> Result<Option<Node<'static, D>>> {
// In our current implementation, when we starts retrieving the nodes, it's always the nodes of the last tree,
// so it makes sense to search in reverse order.
let Some(position) = self.ids.iter().rev().position(|id| *id == item) else {
return Ok(None);
};
let bounds = &self.bounds[self.bounds.len() - position - 2..self.bounds.len() - position];
let bytes = self.file.read_all((bounds[0], bounds[1]))?;
Ok(Some(NodeCodec::bytes_decode(&bytes).map_err(heed::Error::Decoding)?.into_owned()))
}

/// Delete the tmp_nodes and the node in the database.
Expand All @@ -91,18 +165,12 @@ impl<'a, DE: BytesEncode<'a>> TmpNodes<DE> {

/// Converts it into a readers to read the nodes.
pub fn into_bytes_reader(self) -> Result<TmpNodesReader> {
let file = self.file.into_inner().map_err(|iie| iie.into_error())?;
let file = self.file.into_inner()?;
// safety: No one should move our files around
let mmap = unsafe { Mmap::map(&file)? };
#[cfg(unix)]
mmap.advise(memmap2::Advice::Sequential)?;
Ok(TmpNodesReader {
mmap,
ids: self.ids,
bounds: self.bounds,
deleted: self.deleted,
remap_ids: self.remap_ids,
})
Ok(TmpNodesReader { mmap, ids: self.ids, bounds: self.bounds, deleted: self.deleted })
}
}

Expand All @@ -112,7 +180,6 @@ pub struct TmpNodesReader {
ids: Vec<ItemId>,
bounds: Vec<usize>,
deleted: RoaringBitmap,
remap_ids: IntMap<ItemId, ItemId>,
}

impl TmpNodesReader {
Expand All @@ -126,10 +193,6 @@ impl TmpNodesReader {
.iter()
.zip(self.bounds.windows(2))
.filter(|(&id, _)| !self.deleted.contains(id))
.map(|(id, bounds)| match self.remap_ids.get(id) {
Some(new_id) => (new_id, bounds),
None => (id, bounds),
})
.map(|(id, bounds)| {
let [start, end] = [bounds[0], bounds[1]];
(*id, &self.mmap[start..end])
Expand Down Expand Up @@ -200,11 +263,35 @@ pub struct ImmutableLeafs<'t, D> {
}

impl<'t, D: Distance> ImmutableLeafs<'t, D> {
/// Creates the structure by fetching all the leaf pointers
/// and keeping the transaction making the pointers valid.
pub fn new(
rtxn: &'t RoTxn,
database: Database<D>,
items: &RoaringBitmap,
index: u16,
) -> heed::Result<Self> {
let mut leafs =
IntMap::with_capacity_and_hasher(items.len() as usize, BuildNoHashHasher::default());
let mut constant_length = None;

for item_id in items {
let bytes =
database.remap_data_type::<Bytes>().get(rtxn, &Key::item(index, item_id))?.unwrap();
assert_eq!(*constant_length.get_or_insert(bytes.len()), bytes.len());

let ptr = bytes.as_ptr();
leafs.insert(item_id, ptr);
}

Ok(ImmutableLeafs { leafs, constant_length, _marker: marker::PhantomData })
}

/// Creates the structure by fetching all the leaf pointers
/// and keeping the transaction making the pointers valid.
/// Do not take more items than memory allows.
/// Remove from the list of candidates all the items that were selected and return them.
pub fn new(
pub fn new_fits_in_memory(
rtxn: &'t RoTxn,
database: Database<D>,
index: u16,
Expand Down Expand Up @@ -467,40 +554,6 @@ impl<'t, D: Distance> ImmutableTrees<'t, D> {
Ok(ImmutableTrees { trees, _marker: marker::PhantomData })
}

/// Creates the structure by fetching all the children of the `start`ing tree nodes specified.
/// Keeps a reference to the transaction to ensure the pointers stays valid.
pub fn sub_tree_from_id(
rtxn: &'t RoTxn,
database: Database<D>,
index: u16,
start: ItemId,
) -> Result<Self> {
let mut trees = IntMap::default();
let mut explore = vec![start];
while let Some(current) = explore.pop() {
let bytes =
database.remap_data_type::<Bytes>().get(rtxn, &Key::tree(index, current))?.unwrap();
let node: Node<'_, D> = NodeCodec::bytes_decode(bytes).unwrap();
match node {
Node::Leaf(_leaf) => unreachable!(),
Node::Descendants(_descendants) => {
trees.insert(current, (bytes.len(), bytes.as_ptr()));
}
Node::SplitPlaneNormal(SplitPlaneNormal { left, right, normal: _ }) => {
trees.insert(current, (bytes.len(), bytes.as_ptr()));
explore.push(left);
explore.push(right);
}
}
}

Ok(Self { trees, _marker: marker::PhantomData })
}

pub fn empty() -> Self {
Self { trees: IntMap::default(), _marker: marker::PhantomData }
}

/// Returns the tree node identified by the given ID.
pub fn get(&self, item_id: ItemId) -> heed::Result<Option<Node<'t, D>>> {
let (ptr, len) = match self.trees.get(&item_id) {
Expand Down
1 change: 1 addition & 0 deletions src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{Database, Distance, MetadataCodec, NodeCodec, NodeMode, Reader};

mod binary_quantized;
mod reader;
mod tmp_nodes;
mod upgrade;
mod writer;

Expand Down
Loading
Loading