@@ -3,7 +3,7 @@ use std::borrow::Cow;
33use std:: mem;
44use std:: path:: PathBuf ;
55
6- use heed:: types:: { Bytes , DecodeIgnore } ;
6+ use heed:: types:: { Bytes , DecodeIgnore , Unit } ;
77use heed:: { MdbError , PutFlags , RoTxn , RwTxn } ;
88use rand:: { Rng , SeedableRng } ;
99use rayon:: iter:: repeatn;
@@ -20,7 +20,6 @@ use crate::parallel::{
2020 TmpNodesReader ,
2121} ;
2222use crate :: reader:: item_leaf;
23- use crate :: roaring:: RoaringBitmapCodec ;
2423use crate :: unaligned_vector:: UnalignedVector ;
2524use crate :: {
2625 Database , Error , ItemId , Key , Metadata , MetadataCodec , Node , NodeCodec , NodeId , Prefix ,
@@ -224,8 +223,10 @@ impl<D: Distance> Writer<D> {
224223 pub fn need_build ( & self , rtxn : & RoTxn ) -> Result < bool > {
225224 Ok ( self
226225 . database
227- . remap_data_type :: < DecodeIgnore > ( )
228- . get ( rtxn, & Key :: updated ( self . index ) ) ?
226+ . remap_types :: < PrefixCodec , DecodeIgnore > ( )
227+ . prefix_iter ( rtxn, & Prefix :: updated ( self . index ) ) ?
228+ . remap_key_type :: < KeyCodec > ( )
229+ . next ( )
229230 . is_some ( )
230231 || self
231232 . database
@@ -266,17 +267,7 @@ impl<D: Distance> Writer<D> {
266267 let vector = UnalignedVector :: from_slice ( vector) ;
267268 let leaf = Leaf { header : D :: new_header ( & vector) , vector } ;
268269 self . database . put ( wtxn, & Key :: item ( self . index , item) , & Node :: Leaf ( leaf) ) ?;
269- let mut updated = self
270- . database
271- . remap_data_type :: < RoaringBitmapCodec > ( )
272- . get ( wtxn, & Key :: updated ( self . index ) ) ?
273- . unwrap_or_default ( ) ;
274- updated. insert ( item) ;
275- self . database . remap_data_type :: < RoaringBitmapCodec > ( ) . put (
276- wtxn,
277- & Key :: updated ( self . index ) ,
278- & updated,
279- ) ?;
270+ self . database . remap_data_type :: < Unit > ( ) . put ( wtxn, & Key :: updated ( self . index , item) , & ( ) ) ?;
280271
281272 Ok ( ( ) )
282273 }
@@ -302,35 +293,19 @@ impl<D: Distance> Writer<D> {
302293 Err ( heed:: Error :: Mdb ( MdbError :: KeyExist ) ) => return Err ( Error :: InvalidItemAppend ) ,
303294 Err ( e) => return Err ( e. into ( ) ) ,
304295 }
305- let mut updated = self
306- . database
307- . remap_data_type :: < RoaringBitmapCodec > ( )
308- . get ( wtxn, & Key :: updated ( self . index ) ) ?
309- . unwrap_or_default ( ) ;
310- // We cannot append here because we may have removed an item with a larger id before
311- updated. insert ( item) ;
312- self . database . remap_data_type :: < RoaringBitmapCodec > ( ) . put (
313- wtxn,
314- & Key :: updated ( self . index ) ,
315- & updated,
316- ) ?;
296+ // We cannot append here because the items appear after the updated keys
297+ self . database . remap_data_type :: < Unit > ( ) . put ( wtxn, & Key :: updated ( self . index , item) , & ( ) ) ?;
317298
318299 Ok ( ( ) )
319300 }
320301
321302 /// Deletes an item stored in this database and returns `true` if it existed.
322303 pub fn del_item ( & self , wtxn : & mut RwTxn , item : ItemId ) -> Result < bool > {
323304 if self . database . delete ( wtxn, & Key :: item ( self . index , item) ) ? {
324- let mut updated = self
325- . database
326- . remap_data_type :: < RoaringBitmapCodec > ( )
327- . get ( wtxn, & Key :: updated ( self . index ) ) ?
328- . unwrap_or_default ( ) ;
329- updated. insert ( item) ;
330- self . database . remap_data_type :: < RoaringBitmapCodec > ( ) . put (
305+ self . database . remap_data_type :: < Unit > ( ) . put (
331306 wtxn,
332- & Key :: updated ( self . index ) ,
333- & updated ,
307+ & Key :: updated ( self . index , item ) ,
308+ & ( ) ,
334309 ) ?;
335310
336311 Ok ( true )
@@ -430,7 +405,18 @@ impl<D: Distance> Writer<D> {
430405 }
431406
432407 log:: debug!( "reset the updated items..." ) ;
433- self . database . delete ( wtxn, & Key :: updated ( self . index ) ) ?;
408+ let mut updated_iter = self
409+ . database
410+ . remap_types :: < PrefixCodec , DecodeIgnore > ( )
411+ . prefix_iter_mut ( wtxn, & Prefix :: updated ( self . index ) ) ?
412+ . remap_key_type :: < KeyCodec > ( ) ;
413+ while updated_iter. next ( ) . transpose ( ) ?. is_some ( ) {
414+ // Safe because we don't hold any reference to the database currently
415+ unsafe {
416+ updated_iter. del_current ( ) ?;
417+ }
418+ }
419+ drop ( updated_iter) ;
434420
435421 log:: debug!( "write the metadata..." ) ;
436422 let metadata = Metadata {
@@ -448,11 +434,23 @@ impl<D: Distance> Writer<D> {
448434 return Ok ( ( ) ) ;
449435 }
450436
451- let updated_items = self
437+ log:: debug!( "reset and retrieve the updated items..." ) ;
438+ let mut updated_items = RoaringBitmap :: new ( ) ;
439+ let mut updated_iter = self
452440 . database
453- . remap_data_type :: < RoaringBitmapCodec > ( )
454- . get ( wtxn, & Key :: updated ( self . index ) ) ?
455- . unwrap_or_default ( ) ;
441+ . remap_types :: < PrefixCodec , DecodeIgnore > ( )
442+ . prefix_iter_mut ( wtxn, & Prefix :: updated ( self . index ) ) ?
443+ . remap_key_type :: < KeyCodec > ( ) ;
444+ while let Some ( ( key, _) ) = updated_iter. next ( ) . transpose ( ) ? {
445+ let inserted = updated_items. push ( key. node . item ) ;
446+ debug_assert ! ( inserted, "The keys should be sorted by LMDB" ) ;
447+ // Safe because we don't hold any reference to the database currently
448+ unsafe {
449+ updated_iter. del_current ( ) ?;
450+ }
451+ }
452+ drop ( updated_iter) ;
453+
456454 // while iterating on the nodes we want to delete all the modified element even if they are being inserted right after.
457455 let to_delete = & updated_items;
458456 let to_insert = & item_indices & & updated_items;
@@ -548,9 +546,6 @@ impl<D: Distance> Writer<D> {
548546 roots. append ( & mut thread_roots) ;
549547 }
550548
551- log:: debug!( "reset the updated items..." ) ;
552- self . database . delete ( wtxn, & Key :: updated ( self . index ) ) ?;
553-
554549 log:: debug!( "write the metadata..." ) ;
555550 let metadata = Metadata {
556551 dimensions : self . dimensions . try_into ( ) . unwrap ( ) ,
@@ -774,6 +769,7 @@ impl<D: Distance> Writer<D> {
774769 }
775770 }
776771 NodeMode :: Metadata => unreachable ! ( ) ,
772+ NodeMode :: Updated => todo ! ( ) ,
777773 }
778774 }
779775
0 commit comments