diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 997cb075e3..38bb48d9bb 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -106,7 +106,6 @@ The schema defines all of the fields that the indexes [`Document`](src/schema/do Depending on the type of the field, you can decide to -- put it in the docstore - store it as a fast field - index it @@ -135,29 +134,6 @@ This conversion is done by the serializer. Finally, the reader is in charge of offering an API to read on this on-disk read-only representation. In tantivy, readers are designed to require very little anonymous memory. The data is read straight from an mmapped file, and loading an index is as fast as mmapping its files. -## [store/](src/store): Here is my DocId, Gimme my document - -The docstore is a row-oriented storage that, for each document, stores a subset of the fields -that are marked as stored in the schema. The docstore is compressed using a general-purpose algorithm -like LZ4. - -**Useful for** - -In search engines, it is often used to display search results. -Once the top 10 documents have been identified, we fetch them from the store, and display them or their snippet on the search result page (aka SERP). - -**Not useful for** - -Fetching a document from the store is typically a "slow" operation. It usually consists in - -- searching into a compact tree-like data structure to find the position of the right block. -- decompressing a small block -- returning the document from this block. - -It is NOT meant to be called for every document matching a query. - -As a rule of thumb, if you hit the docstore more than 100 times per search query, you are probably misusing tantivy. - ## [fastfield/](src/fastfield): Here is my DocId, Gimme my value Fast fields are stored in a column-oriented storage that allows for random access. diff --git a/CHANGELOG.md b/CHANGELOG.md index a076520fbc..a7bc5844f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ have been removed to keep the changelog focused on Yeehaw's history. - handle unknown column codes gracefully in `ColumnarReader::iter_columns`. ## Features/Improvements +- drop docstore module and references in preparation for trible.space rewrite. - remove `quickwit` feature flag and related async code. - add docs/example and Vec values to sstable [#2660](https://github.com/quickwit-oss/yeehaw/pull/2660)(@PSeitz) - Add string fast field support to `TopDocs`. [#2642](https://github.com/quickwit-oss/yeehaw/pull/2642)(@stuhood) diff --git a/INVENTORY.md b/INVENTORY.md index d36e8c1576..7fa2432d48 100644 --- a/INVENTORY.md +++ b/INVENTORY.md @@ -20,23 +20,19 @@ This document outlines the long term plan to rewrite this project so that it rel - Replace the `Directory` abstraction with a backend that reads and writes blobs via the Trible Space `BlobStore`. - Index writers and readers operate on blob handles instead of filesystem paths. -3. **Drop the docstore module** - - Primary documents are kept in Trible Space; segments no longer store their own row oriented docstore. - - Search results fetch documents via blob handles. - -4. **Remove `Opstamp` and use commit handles** +3. **Remove `Opstamp` and use commit handles** - Commits record the segments they include. - Merges rely on commit ancestry instead of monotonic operation stamps. -5. **Introduce 128-bit IDs with `Universe` mapping** +4. **Introduce 128-bit IDs with `Universe` mapping** - Map external `u128` identifiers to compact `DocId` values. - Persist the mapping so search results can translate back. -6. **Typed DSL for fuzzy search** +5. **Typed DSL for fuzzy search** - Generate search filters from Trible namespaces. - Provide macros that participate in both `find!` queries and full text search. -7. **Index update merge workflow** +6. **Index update merge workflow** - Wrap indexing operations in workspace commits. - Use Trible's compare-and-swap push mechanism so multiple writers merge gracefully. diff --git a/src/index/index.rs b/src/index/index.rs index e1a508a797..dd9880add3 100644 --- a/src/index/index.rs +++ b/src/index/index.rs @@ -95,11 +95,7 @@ fn save_new_metas( /// ); /// /// let schema = schema_builder.build(); -/// let settings = IndexSettings{ -/// docstore_blocksize: 100_000, -/// ..Default::default() -/// }; -/// let index = Index::builder().schema(schema).settings(settings).create_in_ram(); +/// let index = Index::builder().schema(schema).create_in_ram(); /// ``` pub struct IndexBuilder { schema: Option, diff --git a/src/index/index_meta.rs b/src/index/index_meta.rs index 0962bd9bc3..7a64ba59a6 100644 --- a/src/index/index_meta.rs +++ b/src/index/index_meta.rs @@ -1,15 +1,12 @@ use std::collections::HashSet; use std::fmt; use std::path::PathBuf; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; use serde::{Deserialize, Serialize}; use super::SegmentComponent; use crate::index::SegmentId; use crate::schema::Schema; -use crate::store::Compressor; use crate::{Inventory, Opstamp, TrackedObject}; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -37,7 +34,6 @@ impl SegmentMetaInventory { let inner = InnerSegmentMeta { segment_id, max_doc, - include_temp_doc_store: Arc::new(AtomicBool::new(true)), deletes: None, }; SegmentMeta::from(self.inventory.track(inner)) @@ -85,15 +81,6 @@ impl SegmentMeta { self.tracked.segment_id } - /// Removes the Component::TempStore from the alive list and - /// therefore marks the temp docstore file to be deleted by - /// the garbage collection. - pub fn untrack_temp_docstore(&self) { - self.tracked - .include_temp_doc_store - .store(false, std::sync::atomic::Ordering::Relaxed); - } - /// Returns the number of deleted documents. pub fn num_deleted_docs(&self) -> u32 { self.tracked @@ -111,20 +98,9 @@ impl SegmentMeta { /// is by removing all files that have been created by tantivy /// and are not used by any segment anymore. pub fn list_files(&self) -> HashSet { - if self - .tracked - .include_temp_doc_store - .load(std::sync::atomic::Ordering::Relaxed) - { - SegmentComponent::iterator() - .map(|component| self.relative_path(*component)) - .collect::>() - } else { - SegmentComponent::iterator() - .filter(|comp| *comp != &SegmentComponent::TempStore) - .map(|component| self.relative_path(*component)) - .collect::>() - } + SegmentComponent::iterator() + .map(|component| self.relative_path(*component)) + .collect::>() } /// Returns the relative path of a component of our segment. @@ -137,8 +113,6 @@ impl SegmentMeta { SegmentComponent::Postings => ".idx".to_string(), SegmentComponent::Positions => ".pos".to_string(), SegmentComponent::Terms => ".term".to_string(), - SegmentComponent::Store => ".store".to_string(), - SegmentComponent::TempStore => ".store.temp".to_string(), SegmentComponent::FastFields => ".fast".to_string(), SegmentComponent::FieldNorms => ".fieldnorm".to_string(), SegmentComponent::Delete => format!(".{}.del", self.delete_opstamp().unwrap_or(0)), @@ -183,7 +157,6 @@ impl SegmentMeta { segment_id: inner_meta.segment_id, max_doc, deletes: None, - include_temp_doc_store: Arc::new(AtomicBool::new(true)), }); SegmentMeta { tracked } } @@ -202,7 +175,6 @@ impl SegmentMeta { let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta { segment_id: inner_meta.segment_id, max_doc: inner_meta.max_doc, - include_temp_doc_store: Arc::new(AtomicBool::new(true)), deletes: Some(delete_meta), }); SegmentMeta { tracked } @@ -214,14 +186,6 @@ struct InnerSegmentMeta { segment_id: SegmentId, max_doc: u32, deletes: Option, - /// If you want to avoid the SegmentComponent::TempStore file to be covered by - /// garbage collection and deleted, set this to true. This is used during merge. - #[serde(skip)] - #[serde(default = "default_temp_store")] - pub(crate) include_temp_doc_store: Arc, -} -fn default_temp_store() -> Arc { - Arc::new(AtomicBool::new(false)) } impl InnerSegmentMeta { @@ -232,48 +196,9 @@ impl InnerSegmentMeta { } } -fn return_true() -> bool { - true -} - -fn is_true(val: &bool) -> bool { - *val -} - /// Search Index Settings. -/// -/// Contains settings which are applied on the whole -/// index, like presort documents. -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct IndexSettings { - /// The `Compressor` used to compress the doc store. - #[serde(default)] - pub docstore_compression: Compressor, - /// If set to true, docstore compression will happen on a dedicated thread. - /// (defaults: true) - #[doc(hidden)] - #[serde(default = "return_true")] - #[serde(skip_serializing_if = "is_true")] - pub docstore_compress_dedicated_thread: bool, - #[serde(default = "default_docstore_blocksize")] - /// The size of each block that will be compressed and written to disk - pub docstore_blocksize: usize, -} - -/// Must be a function to be compatible with serde defaults -fn default_docstore_blocksize() -> usize { - 16_384 -} - -impl Default for IndexSettings { - fn default() -> Self { - Self { - docstore_compression: Compressor::default(), - docstore_blocksize: default_docstore_blocksize(), - docstore_compress_dedicated_thread: true, - } - } -} +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)] +pub struct IndexSettings {} /// The order to sort by #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 057a51b10c..2ba6b5258d 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -4,13 +4,11 @@ use crate::directory::WritePtr; use crate::fieldnorm::FieldNormsSerializer; use crate::index::{Segment, SegmentComponent}; use crate::postings::InvertedIndexSerializer; -use crate::store::StoreWriter; /// Segment serializer is in charge of laying out on disk /// the data accumulated and sorted by the `SegmentWriter`. pub struct SegmentSerializer { segment: Segment, - pub(crate) store_writer: StoreWriter, fast_field_write: WritePtr, fieldnorms_serializer: Option, postings_serializer: InvertedIndexSerializer, @@ -19,17 +17,6 @@ pub struct SegmentSerializer { impl SegmentSerializer { /// Creates a new `SegmentSerializer`. pub fn for_segment(mut segment: Segment) -> crate::Result { - let settings = segment.index().settings().clone(); - let store_writer = { - let store_write = segment.open_write(SegmentComponent::Store)?; - StoreWriter::new( - store_write, - settings.docstore_compression, - settings.docstore_blocksize, - settings.docstore_compress_dedicated_thread, - )? - }; - let fast_field_write = segment.open_write(SegmentComponent::FastFields)?; let fieldnorms_write = segment.open_write(SegmentComponent::FieldNorms)?; @@ -38,7 +25,6 @@ impl SegmentSerializer { let postings_serializer = InvertedIndexSerializer::open(&mut segment)?; Ok(SegmentSerializer { segment, - store_writer, fast_field_write, fieldnorms_serializer: Some(fieldnorms_serializer), postings_serializer, @@ -47,7 +33,7 @@ impl SegmentSerializer { /// The memory used (inclusive childs) pub fn mem_usage(&self) -> usize { - self.store_writer.mem_usage() + 0 } pub fn segment(&self) -> &Segment { @@ -71,11 +57,6 @@ impl SegmentSerializer { self.fieldnorms_serializer.take() } - /// Accessor to the `StoreWriter`. - pub fn get_store_writer(&mut self) -> &mut StoreWriter { - &mut self.store_writer - } - /// Finalize the segment serialization. pub fn close(mut self) -> crate::Result<()> { if let Some(fieldnorms_serializer) = self.extract_fieldnorms_serializer() { @@ -83,7 +64,6 @@ impl SegmentSerializer { } self.fast_field_write.terminate()?; self.postings_serializer.close()?; - self.store_writer.close()?; Ok(()) } } diff --git a/src/lib.rs b/src/lib.rs index cf4308d557..575231c32a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -196,7 +196,6 @@ pub mod postings; pub mod query; pub mod schema; pub mod space_usage; -pub mod store; pub mod termdict; mod docset; @@ -267,13 +266,13 @@ impl fmt::Display for Version { static VERSION_STRING: Lazy = Lazy::new(|| VERSION.to_string()); /// Expose the current version of tantivy as found in Cargo.toml during compilation. -/// eg. "0.11.0" as well as the compression scheme used in the docstore. +/// eg. "0.11.0". pub fn version() -> &'static Version { &VERSION } /// Exposes the complete version of tantivy as found in Cargo.toml during compilation as a string. -/// eg. "tantivy v0.11.0, index_format v1, store_compression: lz4". +/// eg. "tantivy v0.11.0, index_format v1". pub fn version_string() -> &'static str { VERSION_STRING.as_str() } diff --git a/src/store/compression_lz4_block.rs b/src/store/compression_lz4_block.rs deleted file mode 100644 index ee904470c6..0000000000 --- a/src/store/compression_lz4_block.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::io::{self}; -use std::mem; - -use lz4_flex::{compress_into, decompress_into}; - -#[inline] -#[expect(clippy::uninit_vec)] -pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> { - compressed.clear(); - let maximum_output_size = - mem::size_of::() + lz4_flex::block::get_maximum_output_size(uncompressed.len()); - compressed.reserve(maximum_output_size); - unsafe { - compressed.set_len(maximum_output_size); - } - let bytes_written = compress_into(uncompressed, &mut compressed[4..]) - .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; - let num_bytes = uncompressed.len() as u32; - compressed[0..4].copy_from_slice(&num_bytes.to_le_bytes()); - unsafe { - compressed.set_len(bytes_written + mem::size_of::()); - } - Ok(()) -} - -#[inline] -#[expect(clippy::uninit_vec)] -pub fn decompress(compressed: &[u8], decompressed: &mut Vec) -> io::Result<()> { - decompressed.clear(); - let uncompressed_size_bytes: &[u8; 4] = compressed - .get(..4) - .ok_or(io::ErrorKind::InvalidData)? - .try_into() - .unwrap(); - let uncompressed_size = u32::from_le_bytes(*uncompressed_size_bytes) as usize; - decompressed.reserve(uncompressed_size); - unsafe { - decompressed.set_len(uncompressed_size); - } - let bytes_written = decompress_into(&compressed[4..], decompressed) - .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; - if bytes_written != uncompressed_size { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "doc store block not completely decompressed, data corruption".to_string(), - )); - } - Ok(()) -} diff --git a/src/store/compression_zstd_block.rs b/src/store/compression_zstd_block.rs deleted file mode 100644 index 7ef0aa9df1..0000000000 --- a/src/store/compression_zstd_block.rs +++ /dev/null @@ -1,54 +0,0 @@ -use std::io; - -use zstd::bulk::{compress_to_buffer, decompress_to_buffer}; -use zstd::DEFAULT_COMPRESSION_LEVEL; - -#[inline] -pub fn compress( - uncompressed: &[u8], - compressed: &mut Vec, - compression_level: Option, -) -> io::Result<()> { - let count_size = std::mem::size_of::(); - let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size; - - compressed.clear(); - compressed.resize(max_size, 0); - - let compressed_size = compress_to_buffer( - uncompressed, - &mut compressed[count_size..], - compression_level.unwrap_or(DEFAULT_COMPRESSION_LEVEL), - )?; - - compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes()); - compressed.resize(compressed_size + count_size, 0); - - Ok(()) -} - -#[inline] -pub fn decompress(compressed: &[u8], decompressed: &mut Vec) -> io::Result<()> { - let count_size = std::mem::size_of::(); - let uncompressed_size = u32::from_le_bytes( - compressed - .get(..count_size) - .ok_or(io::ErrorKind::InvalidData)? - .try_into() - .unwrap(), - ) as usize; - - decompressed.clear(); - decompressed.resize(uncompressed_size, 0); - - let decompressed_size = decompress_to_buffer(&compressed[count_size..], decompressed)?; - - if decompressed_size != uncompressed_size { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "doc store block not completely decompressed, data corruption".to_string(), - )); - } - - Ok(()) -} diff --git a/src/store/compressors.rs b/src/store/compressors.rs deleted file mode 100644 index ed5c8b4a92..0000000000 --- a/src/store/compressors.rs +++ /dev/null @@ -1,224 +0,0 @@ -use std::io; - -use serde::{Deserialize, Deserializer, Serialize}; - -/// Compressor can be used on `IndexSettings` to choose -/// the compressor used to compress the doc store. -/// -/// The default is Lz4Block, but also depends on the enabled feature flags. -#[derive(Clone, Debug, Copy, PartialEq, Eq)] -pub enum Compressor { - /// No compression - None, - /// Use the lz4 compressor (block format) - #[cfg(feature = "lz4-compression")] - Lz4, - /// Use the zstd compressor - #[cfg(feature = "zstd-compression")] - Zstd(ZstdCompressor), -} - -impl Serialize for Compressor { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - match *self { - Compressor::None => serializer.serialize_str("none"), - #[cfg(feature = "lz4-compression")] - Compressor::Lz4 => serializer.serialize_str("lz4"), - #[cfg(feature = "zstd-compression")] - Compressor::Zstd(zstd) => serializer.serialize_str(&zstd.ser_to_string()), - } - } -} - -impl<'de> Deserialize<'de> for Compressor { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let buf = String::deserialize(deserializer)?; - let compressor = - match buf.as_str() { - "none" => Compressor::None, - #[cfg(feature = "lz4-compression")] - "lz4" => Compressor::Lz4, - #[cfg(not(feature = "lz4-compression"))] - "lz4" => return Err(serde::de::Error::custom( - "unsupported variant `lz4`, please enable Tantivy's `lz4-compression` feature", - )), - #[cfg(feature = "zstd-compression")] - _ if buf.starts_with("zstd") => Compressor::Zstd( - ZstdCompressor::deser_from_str(&buf).map_err(serde::de::Error::custom)?, - ), - #[cfg(not(feature = "zstd-compression"))] - _ if buf.starts_with("zstd") => { - return Err(serde::de::Error::custom( - "unsupported variant `zstd`, please enable Tantivy's `zstd-compression` \ - feature", - )) - } - _ => { - return Err(serde::de::Error::unknown_variant( - &buf, - &[ - "none", - #[cfg(feature = "lz4-compression")] - "lz4", - #[cfg(feature = "zstd-compression")] - "zstd", - #[cfg(feature = "zstd-compression")] - "zstd(compression_level=5)", - ], - )); - } - }; - - Ok(compressor) - } -} - -#[derive(Clone, Default, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)] -/// The Zstd compressor, with optional compression level. -pub struct ZstdCompressor { - /// The compression level, if unset defaults to zstd::DEFAULT_COMPRESSION_LEVEL = 3 - pub compression_level: Option, -} - -#[cfg(feature = "zstd-compression")] -impl ZstdCompressor { - fn deser_from_str(val: &str) -> Result { - if !val.starts_with("zstd") { - return Err(format!("needs to start with zstd, but got {val}")); - } - if val == "zstd" { - return Ok(ZstdCompressor::default()); - } - let options = &val["zstd".len() + 1..val.len() - 1]; - - let mut compressor = ZstdCompressor::default(); - for option in options.split(',') { - let (opt_name, value) = options - .split_once('=') - .ok_or_else(|| format!("no '=' found in option {option:?}"))?; - - match opt_name { - "compression_level" => { - let value = value.parse::().map_err(|err| { - format!("Could not parse value {value} of option {opt_name}, e: {err}") - })?; - if value >= 15 { - warn!( - "High zstd compression level detected: {:?}. High compression levels \ - (>=15) are slow and will limit indexing speed.", - value - ) - } - compressor.compression_level = Some(value); - } - _ => { - return Err(format!("unknown zstd option {opt_name:?}")); - } - } - } - Ok(compressor) - } - fn ser_to_string(&self) -> String { - if let Some(compression_level) = self.compression_level { - format!("zstd(compression_level={compression_level})") - } else { - "zstd".to_string() - } - } -} - -impl Default for Compressor { - #[allow(unreachable_code)] - fn default() -> Self { - #[cfg(feature = "lz4-compression")] - return Compressor::Lz4; - - #[cfg(feature = "zstd-compression")] - return Compressor::Zstd(ZstdCompressor::default()); - - Compressor::None - } -} - -impl Compressor { - #[inline] - pub(crate) fn compress_into( - &self, - uncompressed: &[u8], - compressed: &mut Vec, - ) -> io::Result<()> { - match self { - Self::None => { - compressed.clear(); - compressed.extend_from_slice(uncompressed); - Ok(()) - } - #[cfg(feature = "lz4-compression")] - Self::Lz4 => super::compression_lz4_block::compress(uncompressed, compressed), - #[cfg(feature = "zstd-compression")] - Self::Zstd(_zstd_compressor) => super::compression_zstd_block::compress( - uncompressed, - compressed, - _zstd_compressor.compression_level, - ), - } - } -} - -#[cfg(all(feature = "zstd-compression", test))] -mod tests { - use super::*; - - #[test] - fn zstd_serde_roundtrip() { - let compressor = ZstdCompressor { - compression_level: Some(15), - }; - - assert_eq!( - ZstdCompressor::deser_from_str(&compressor.ser_to_string()).unwrap(), - compressor - ); - - assert_eq!( - ZstdCompressor::deser_from_str(&ZstdCompressor::default().ser_to_string()).unwrap(), - ZstdCompressor::default() - ); - } - - #[test] - fn deser_zstd_test() { - assert_eq!( - ZstdCompressor::deser_from_str("zstd").unwrap(), - ZstdCompressor::default() - ); - - assert!(ZstdCompressor::deser_from_str("zzstd").is_err()); - assert!(ZstdCompressor::deser_from_str("zzstd()").is_err()); - assert_eq!( - ZstdCompressor::deser_from_str("zstd(compression_level=15)").unwrap(), - ZstdCompressor { - compression_level: Some(15) - } - ); - assert_eq!( - ZstdCompressor::deser_from_str("zstd(compresion_level=15)").unwrap_err(), - "unknown zstd option \"compresion_level\"" - ); - assert_eq!( - ZstdCompressor::deser_from_str("zstd(compression_level->2)").unwrap_err(), - "no '=' found in option \"compression_level->2\"" - ); - assert_eq!( - ZstdCompressor::deser_from_str("zstd(compression_level=over9000)").unwrap_err(), - "Could not parse value over9000 of option compression_level, e: invalid digit found \ - in string" - ); - } -} diff --git a/src/store/decompressors.rs b/src/store/decompressors.rs deleted file mode 100644 index 4d0319aca8..0000000000 --- a/src/store/decompressors.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::io; - -use serde::{Deserialize, Serialize}; - -use super::Compressor; - -/// Decompressor is deserialized from the doc store footer, when opening an index. -#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub enum Decompressor { - /// No compression - None, - /// Use the lz4 decompressor (block format) - #[cfg(feature = "lz4-compression")] - Lz4, - /// Use the zstd decompressor - #[cfg(feature = "zstd-compression")] - Zstd, -} - -impl From for Decompressor { - fn from(compressor: Compressor) -> Self { - match compressor { - Compressor::None => Decompressor::None, - #[cfg(feature = "lz4-compression")] - Compressor::Lz4 => Decompressor::Lz4, - #[cfg(feature = "zstd-compression")] - Compressor::Zstd(_) => Decompressor::Zstd, - } - } -} - -impl Decompressor { - pub(crate) fn from_id(id: u8) -> Decompressor { - match id { - 0 => Decompressor::None, - #[cfg(feature = "lz4-compression")] - 1 => Decompressor::Lz4, - #[cfg(feature = "zstd-compression")] - 4 => Decompressor::Zstd, - _ => panic!("unknown compressor id {id:?}"), - } - } - - pub(crate) fn get_id(&self) -> u8 { - match self { - Self::None => 0, - #[cfg(feature = "lz4-compression")] - Self::Lz4 => 1, - #[cfg(feature = "zstd-compression")] - Self::Zstd => 4, - } - } - - pub(crate) fn decompress(&self, compressed_block: &[u8]) -> io::Result> { - let mut decompressed_block = vec![]; - self.decompress_into(compressed_block, &mut decompressed_block)?; - Ok(decompressed_block) - } - - #[inline] - pub(crate) fn decompress_into( - &self, - compressed: &[u8], - decompressed: &mut Vec, - ) -> io::Result<()> { - match self { - Self::None => { - decompressed.clear(); - decompressed.extend_from_slice(compressed); - Ok(()) - } - #[cfg(feature = "lz4-compression")] - Self::Lz4 => super::compression_lz4_block::decompress(compressed, decompressed), - #[cfg(feature = "zstd-compression")] - Self::Zstd => super::compression_zstd_block::decompress(compressed, decompressed), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn compressor_decompressor_id_test() { - assert_eq!(Decompressor::from(Compressor::None), Decompressor::None); - #[cfg(feature = "lz4-compression")] - assert_eq!(Decompressor::from(Compressor::Lz4), Decompressor::Lz4); - #[cfg(feature = "zstd-compression")] - assert_eq!( - Decompressor::from(Compressor::Zstd(Default::default())), - Decompressor::Zstd - ); - } -} diff --git a/src/store/footer.rs b/src/store/footer.rs deleted file mode 100644 index b4cc65a201..0000000000 --- a/src/store/footer.rs +++ /dev/null @@ -1,87 +0,0 @@ -use std::io; - -use common::{BinarySerializable, FixedSize, HasLen}; - -use super::{Decompressor, DocStoreVersion, DOC_STORE_VERSION}; -use crate::directory::FileSlice; - -#[derive(Debug, Clone, PartialEq)] -pub struct DocStoreFooter { - pub offset: u64, - pub doc_store_version: DocStoreVersion, - pub decompressor: Decompressor, -} - -/// Serialises the footer to a byte-array -/// - offset : 8 bytes -/// - compressor id: 1 byte -/// - reserved for future use: 15 bytes -impl BinarySerializable for DocStoreFooter { - fn serialize(&self, writer: &mut W) -> io::Result<()> { - BinarySerializable::serialize(&DOC_STORE_VERSION, writer)?; - BinarySerializable::serialize(&self.offset, writer)?; - BinarySerializable::serialize(&self.decompressor.get_id(), writer)?; - writer.write_all(&[0; 15])?; - Ok(()) - } - - fn deserialize(reader: &mut R) -> io::Result { - let doc_store_version = DocStoreVersion::deserialize(reader)?; - if doc_store_version > DOC_STORE_VERSION { - panic!( - "actual doc store version: {doc_store_version}, max_supported: {DOC_STORE_VERSION}" - ); - } - let offset = u64::deserialize(reader)?; - let compressor_id = u8::deserialize(reader)?; - let mut skip_buf = [0; 15]; - reader.read_exact(&mut skip_buf)?; - Ok(DocStoreFooter { - offset, - doc_store_version, - decompressor: Decompressor::from_id(compressor_id), - }) - } -} - -impl FixedSize for DocStoreFooter { - const SIZE_IN_BYTES: usize = 28; -} - -impl DocStoreFooter { - pub fn new( - offset: u64, - decompressor: Decompressor, - doc_store_version: DocStoreVersion, - ) -> Self { - DocStoreFooter { - offset, - doc_store_version, - decompressor, - } - } - - pub fn extract_footer(file: FileSlice) -> io::Result<(DocStoreFooter, FileSlice)> { - if file.len() < DocStoreFooter::SIZE_IN_BYTES { - return Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - format!( - "File corrupted. The file is smaller than Footer::SIZE_IN_BYTES (len={}).", - file.len() - ), - )); - } - let (body, footer_slice) = file.split_from_end(DocStoreFooter::SIZE_IN_BYTES); - let mut footer_bytes = footer_slice.read_bytes()?; - let footer = DocStoreFooter::deserialize(&mut footer_bytes)?; - Ok((footer, body)) - } -} - -#[test] -fn doc_store_footer_test() { - // This test is just to safe guard changes on the footer. - // When the doc store footer is updated, make sure to update also the serialize/deserialize - // methods - assert_eq!(core::mem::size_of::(), 16); -} diff --git a/src/store/index/block.rs b/src/store/index/block.rs deleted file mode 100644 index ae969110cf..0000000000 --- a/src/store/index/block.rs +++ /dev/null @@ -1,174 +0,0 @@ -use std::io; -use std::ops::Range; - -use common::{read_u32_vint, VInt}; - -use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD}; -use crate::DocId; - -/// Represents a block of checkpoints. -/// -/// The DocStore index checkpoints are organized into block -/// for code-readability and compression purpose. -/// -/// A block can be of any size. -pub struct CheckpointBlock { - pub checkpoints: Vec, -} - -impl Default for CheckpointBlock { - fn default() -> CheckpointBlock { - CheckpointBlock { - checkpoints: Vec::with_capacity(2 * CHECKPOINT_PERIOD), - } - } -} - -impl CheckpointBlock { - /// If non-empty returns [start_doc, end_doc) - /// for the overall block. - pub fn doc_interval(&self) -> Option> { - let start_doc_opt = self - .checkpoints - .first() - .cloned() - .map(|checkpoint| checkpoint.doc_range.start); - let end_doc_opt = self - .checkpoints - .last() - .cloned() - .map(|checkpoint| checkpoint.doc_range.end); - match (start_doc_opt, end_doc_opt) { - (Some(start_doc), Some(end_doc)) => Some(start_doc..end_doc), - _ => None, - } - } - - /// Adding another checkpoint in the block. - pub fn push(&mut self, checkpoint: Checkpoint) { - if let Some(prev_checkpoint) = self.checkpoints.last() { - assert!(checkpoint.follows(prev_checkpoint)); - } - self.checkpoints.push(checkpoint); - } - - /// Returns the number of checkpoints in the block. - pub fn len(&self) -> usize { - self.checkpoints.len() - } - - pub fn get(&self, idx: usize) -> Checkpoint { - self.checkpoints[idx].clone() - } - - pub fn clear(&mut self) { - self.checkpoints.clear(); - } - - pub fn serialize(&mut self, buffer: &mut Vec) { - VInt(self.checkpoints.len() as u64).serialize_into_vec(buffer); - if self.checkpoints.is_empty() { - return; - } - VInt(self.checkpoints[0].doc_range.start as u64).serialize_into_vec(buffer); - VInt(self.checkpoints[0].byte_range.start as u64).serialize_into_vec(buffer); - for checkpoint in &self.checkpoints { - let delta_doc = checkpoint.doc_range.end - checkpoint.doc_range.start; - VInt(delta_doc as u64).serialize_into_vec(buffer); - VInt((checkpoint.byte_range.end - checkpoint.byte_range.start) as u64) - .serialize_into_vec(buffer); - } - } - - pub fn deserialize(&mut self, data: &mut &[u8]) -> io::Result<()> { - if data.is_empty() { - return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "")); - } - self.checkpoints.clear(); - let len = read_u32_vint(data); - if len == 0 { - return Ok(()); - } - let mut doc = read_u32_vint(data); - let mut start_offset = VInt::deserialize_u64(data)? as usize; - for _ in 0..len { - let num_docs = read_u32_vint(data); - let block_num_bytes = read_u32_vint(data) as usize; - self.checkpoints.push(Checkpoint { - doc_range: doc..doc + num_docs, - byte_range: start_offset..start_offset + block_num_bytes, - }); - doc += num_docs; - start_offset += block_num_bytes; - } - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use std::io; - - use crate::store::index::block::CheckpointBlock; - use crate::store::index::Checkpoint; - use crate::DocId; - - fn test_aux_ser_deser(checkpoints: &[Checkpoint]) -> io::Result<()> { - let mut block = CheckpointBlock::default(); - for checkpoint in checkpoints { - block.push(checkpoint.clone()); - } - let mut buffer = Vec::new(); - block.serialize(&mut buffer); - let mut block_deser = CheckpointBlock::default(); - let checkpoint = Checkpoint { - doc_range: 0..1, - byte_range: 2..3, - }; - block_deser.push(checkpoint); // < check that value is erased before deser - let mut data = &buffer[..]; - block_deser.deserialize(&mut data)?; - assert!(data.is_empty()); - assert_eq!(checkpoints, &block_deser.checkpoints[..]); - Ok(()) - } - - #[test] - fn test_block_serialize_empty() -> io::Result<()> { - test_aux_ser_deser(&[]) - } - - #[test] - fn test_block_serialize_simple() -> io::Result<()> { - let checkpoints = vec![Checkpoint { - doc_range: 10..12, - byte_range: 100..120, - }]; - test_aux_ser_deser(&checkpoints) - } - - #[test] - fn test_block_serialize_large_byte_range() -> io::Result<()> { - let checkpoints = vec![Checkpoint { - doc_range: 10..12, - byte_range: 8_000_000_000..9_000_000_000, - }]; - test_aux_ser_deser(&checkpoints) - } - - #[test] - fn test_block_serialize() -> io::Result<()> { - let offsets: Vec = (0..11).map(|i| i * i * i).collect(); - let mut checkpoints = vec![]; - let mut start_doc = 0; - for i in 0..10 { - let end_doc = (i * i) as DocId; - checkpoints.push(Checkpoint { - doc_range: start_doc..end_doc, - byte_range: offsets[i]..offsets[i + 1], - }); - start_doc = end_doc; - } - test_aux_ser_deser(&checkpoints) - } -} diff --git a/src/store/index/mod.rs b/src/store/index/mod.rs deleted file mode 100644 index 13c252e924..0000000000 --- a/src/store/index/mod.rs +++ /dev/null @@ -1,246 +0,0 @@ -const CHECKPOINT_PERIOD: usize = 8; - -use std::fmt; -use std::ops::Range; -mod block; -mod skip_index; -mod skip_index_builder; - -pub use self::skip_index::SkipIndex; -pub use self::skip_index_builder::SkipIndexBuilder; -use crate::DocId; - -/// A checkpoint contains meta-information about -/// a block. Either a block of documents, or another block -/// of checkpoints. -/// -/// All of the intervals here defined are semi-open. -/// The checkpoint describes that the block within the `byte_range` -/// and spans over the `doc_range`. -#[derive(Clone, Eq, PartialEq, Default)] -pub struct Checkpoint { - pub doc_range: Range, - pub byte_range: Range, -} - -impl Checkpoint { - pub(crate) fn follows(&self, other: &Checkpoint) -> bool { - (self.doc_range.start == other.doc_range.end) - && (self.byte_range.start == other.byte_range.end) - } -} - -impl fmt::Debug for Checkpoint { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "(doc={:?}, bytes={:?})", self.doc_range, self.byte_range) - } -} - -#[cfg(test)] -mod tests { - - use std::io; - - use proptest::prelude::*; - - use super::{SkipIndex, SkipIndexBuilder}; - use crate::directory::OwnedBytes; - use crate::indexer::NoMergePolicy; - use crate::schema::{SchemaBuilder, STORED, TEXT}; - use crate::store::index::Checkpoint; - use crate::{DocAddress, DocId, Index, IndexWriter, TantivyDocument, Term}; - - #[test] - fn test_skip_index_empty() -> io::Result<()> { - let mut output: Vec = Vec::new(); - let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new(); - skip_index_builder.serialize_into(&mut output)?; - let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); - let mut skip_cursor = skip_index.checkpoints(); - assert!(skip_cursor.next().is_none()); - Ok(()) - } - - #[test] - fn test_skip_index_single_el() -> io::Result<()> { - let mut output: Vec = Vec::new(); - let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new(); - let checkpoint = Checkpoint { - doc_range: 0..2, - byte_range: 0..3, - }; - skip_index_builder.insert(checkpoint.clone()); - skip_index_builder.serialize_into(&mut output)?; - let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); - let mut skip_cursor = skip_index.checkpoints(); - assert_eq!(skip_cursor.next(), Some(checkpoint)); - assert_eq!(skip_cursor.next(), None); - Ok(()) - } - - #[test] - fn test_skip_index() -> io::Result<()> { - let mut output: Vec = Vec::new(); - let checkpoints = vec![ - Checkpoint { - doc_range: 0..3, - byte_range: 0..9, - }, - Checkpoint { - doc_range: 3..4, - byte_range: 9..25, - }, - Checkpoint { - doc_range: 4..6, - byte_range: 25..49, - }, - Checkpoint { - doc_range: 6..8, - byte_range: 49..81, - }, - Checkpoint { - doc_range: 8..10, - byte_range: 81..100, - }, - ]; - - let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new(); - for checkpoint in &checkpoints { - skip_index_builder.insert(checkpoint.clone()); - } - skip_index_builder.serialize_into(&mut output)?; - - let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); - assert_eq!( - &skip_index.checkpoints().collect::>()[..], - &checkpoints[..] - ); - Ok(()) - } - - fn offset_test(doc: DocId) -> usize { - (doc as usize) * (doc as usize) - } - - #[test] - fn test_merge_store_with_stacking_reproducing_issue969() -> crate::Result<()> { - let mut schema_builder = SchemaBuilder::default(); - let text = schema_builder.add_text_field("text", STORED | TEXT); - let body = schema_builder.add_text_field("body", STORED); - let schema = schema_builder.build(); - let index = Index::create_in_ram(schema); - let mut index_writer: IndexWriter = index.writer_for_tests()?; - index_writer.set_merge_policy(Box::new(NoMergePolicy)); - let long_text: String = "abcdefghijklmnopqrstuvwxyz".repeat(1_000); - for _ in 0..20 { - index_writer.add_document(doc!(body=>long_text.clone()))?; - } - index_writer.commit()?; - index_writer.add_document(doc!(text=>"testb"))?; - for _ in 0..10 { - index_writer.add_document(doc!(text=>"testd", body=>long_text.clone()))?; - } - index_writer.commit()?; - index_writer.delete_term(Term::from_field_text(text, "testb")); - index_writer.commit()?; - let segment_ids = index.searchable_segment_ids()?; - index_writer.merge(&segment_ids).wait().unwrap(); - let reader = index.reader()?; - let searcher = reader.searcher(); - assert_eq!(searcher.num_docs(), 30); - for i in 0..searcher.num_docs() as u32 { - let _doc = searcher.doc::(DocAddress::new(0u32, i))?; - } - Ok(()) - } - - #[test] - fn test_skip_index_long() -> io::Result<()> { - let mut output: Vec = Vec::new(); - let checkpoints: Vec = (0..1000) - .map(|i| Checkpoint { - doc_range: i..(i + 1), - byte_range: offset_test(i)..offset_test(i + 1), - }) - .collect(); - let mut skip_index_builder = SkipIndexBuilder::new(); - for checkpoint in &checkpoints { - skip_index_builder.insert(checkpoint.clone()); - } - skip_index_builder.serialize_into(&mut output)?; - assert_eq!(output.len(), 4035); - let resulting_checkpoints: Vec = SkipIndex::open(OwnedBytes::new(output)) - .checkpoints() - .collect(); - assert_eq!(&resulting_checkpoints, &checkpoints); - Ok(()) - } - - fn integrate_delta(vals: Vec) -> Vec { - let mut output = Vec::with_capacity(vals.len() + 1); - output.push(0); - let mut prev = 0; - for val in vals { - let new_val = val + prev; - prev = new_val; - output.push(new_val); - } - output - } - - // Generates a sequence of n valid checkpoints, with n < max_len. - fn monotonic_checkpoints(max_len: usize) -> BoxedStrategy> { - (0..max_len) - .prop_flat_map(move |len: usize| { - ( - proptest::collection::vec(1usize..20, len).prop_map(integrate_delta), - proptest::collection::vec(1usize..26, len).prop_map(integrate_delta), - ) - .prop_map(|(docs, offsets)| { - (0..docs.len() - 1) - .map(move |i| Checkpoint { - doc_range: docs[i] as DocId..docs[i + 1] as DocId, - byte_range: offsets[i]..offsets[i + 1], - }) - .collect::>() - }) - }) - .boxed() - } - - fn seek_manual>( - checkpoints: I, - target: DocId, - ) -> Option { - checkpoints - .into_iter() - .find(|checkpoint| checkpoint.doc_range.end > target) - } - - fn test_skip_index_aux(skip_index: SkipIndex, checkpoints: &[Checkpoint]) { - if let Some(last_checkpoint) = checkpoints.last() { - for doc in 0u32..last_checkpoint.doc_range.end { - let expected = seek_manual(skip_index.checkpoints(), doc); - assert_eq!(expected, skip_index.seek(doc), "Doc {doc}"); - } - assert!(skip_index.seek(last_checkpoint.doc_range.end).is_none()); - } - } - - proptest! { - #![proptest_config(ProptestConfig::with_cases(20))] - #[test] - fn test_proptest_skip(checkpoints in monotonic_checkpoints(100)) { - let mut skip_index_builder = SkipIndexBuilder::new(); - for checkpoint in checkpoints.iter().cloned() { - skip_index_builder.insert(checkpoint); - } - let mut buffer = Vec::new(); - skip_index_builder.serialize_into(&mut buffer).unwrap(); - let skip_index = SkipIndex::open(OwnedBytes::new(buffer)); - let iter_checkpoints: Vec = skip_index.checkpoints().collect(); - assert_eq!(&checkpoints[..], &iter_checkpoints[..]); - test_skip_index_aux(skip_index, &checkpoints[..]); - } - } -} diff --git a/src/store/index/skip_index.rs b/src/store/index/skip_index.rs deleted file mode 100644 index 46b30dae10..0000000000 --- a/src/store/index/skip_index.rs +++ /dev/null @@ -1,107 +0,0 @@ -use common::{BinarySerializable, VInt}; - -use crate::directory::OwnedBytes; -use crate::store::index::block::CheckpointBlock; -use crate::store::index::Checkpoint; -use crate::DocId; - -pub struct LayerCursor<'a> { - remaining: &'a [u8], - block: CheckpointBlock, - cursor: usize, -} - -impl Iterator for LayerCursor<'_> { - type Item = Checkpoint; - - fn next(&mut self) -> Option { - if self.cursor == self.block.len() { - if self.remaining.is_empty() { - return None; - } - let (block_mut, remaining_mut) = (&mut self.block, &mut self.remaining); - block_mut.deserialize(remaining_mut).ok()?; - self.cursor = 0; - } - let res = Some(self.block.get(self.cursor)); - self.cursor += 1; - res - } -} - -struct Layer { - data: OwnedBytes, -} - -impl Layer { - fn cursor(&self) -> impl Iterator + '_ { - self.cursor_at_offset(0) - } - - fn cursor_at_offset(&self, start_offset: usize) -> impl Iterator + '_ { - let data = &self.data.as_slice(); - LayerCursor { - remaining: &data[start_offset..], - block: CheckpointBlock::default(), - cursor: 0, - } - } - - fn seek_start_at_offset(&self, target: DocId, offset: usize) -> Option { - self.cursor_at_offset(offset) - .find(|checkpoint| checkpoint.doc_range.end > target) - } -} - -pub struct SkipIndex { - layers: Vec, -} - -impl SkipIndex { - pub fn open(mut data: OwnedBytes) -> SkipIndex { - let offsets: Vec = Vec::::deserialize(&mut data) - .unwrap() - .into_iter() - .map(|el| el.0) - .collect(); - let mut start_offset = 0; - let mut layers = Vec::new(); - for end_offset in offsets { - let layer = Layer { - data: data.slice(start_offset as usize..end_offset as usize), - }; - layers.push(layer); - start_offset = end_offset; - } - SkipIndex { layers } - } - - pub(crate) fn checkpoints(&self) -> impl Iterator + '_ { - self.layers - .last() - .into_iter() - .flat_map(|layer| layer.cursor()) - } - - pub fn seek(&self, target: DocId) -> Option { - let first_layer_len = self - .layers - .first() - .map(|layer| layer.data.len()) - .unwrap_or(0); - let mut cur_checkpoint = Checkpoint { - doc_range: 0u32..1u32, - byte_range: 0..first_layer_len, - }; - for layer in &self.layers { - if let Some(checkpoint) = - layer.seek_start_at_offset(target, cur_checkpoint.byte_range.start) - { - cur_checkpoint = checkpoint; - } else { - return None; - } - } - Some(cur_checkpoint) - } -} diff --git a/src/store/index/skip_index_builder.rs b/src/store/index/skip_index_builder.rs deleted file mode 100644 index ffebf8e954..0000000000 --- a/src/store/index/skip_index_builder.rs +++ /dev/null @@ -1,117 +0,0 @@ -use std::io; -use std::io::Write; - -use common::{BinarySerializable, VInt}; - -use crate::store::index::block::CheckpointBlock; -use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD}; - -// Each skip contains iterator over pairs (last doc in block, offset to start of block). - -struct LayerBuilder { - buffer: Vec, - pub block: CheckpointBlock, -} - -impl LayerBuilder { - fn finish(self) -> Vec { - self.buffer - } - - fn new() -> LayerBuilder { - LayerBuilder { - buffer: Vec::new(), - block: CheckpointBlock::default(), - } - } - - /// Serializes the block, and return a checkpoint representing - /// the entire block. - /// - /// If the block was empty to begin with, simply return `None`. - fn flush_block(&mut self) -> Option { - if let Some(doc_range) = self.block.doc_interval() { - let start_offset = self.buffer.len(); - self.block.serialize(&mut self.buffer); - let end_offset = self.buffer.len(); - self.block.clear(); - Some(Checkpoint { - doc_range, - byte_range: start_offset..end_offset, - }) - } else { - None - } - } - - fn push(&mut self, checkpoint: Checkpoint) { - self.block.push(checkpoint); - } - - fn insert(&mut self, checkpoint: Checkpoint) -> Option { - self.push(checkpoint); - let emit_skip_info = self.block.len() >= CHECKPOINT_PERIOD; - if emit_skip_info { - self.flush_block() - } else { - None - } - } -} - -pub struct SkipIndexBuilder { - layers: Vec, -} - -impl SkipIndexBuilder { - pub fn new() -> SkipIndexBuilder { - SkipIndexBuilder { layers: Vec::new() } - } - - fn get_layer(&mut self, layer_id: usize) -> &mut LayerBuilder { - if layer_id == self.layers.len() { - let layer_builder = LayerBuilder::new(); - self.layers.push(layer_builder); - } - &mut self.layers[layer_id] - } - - pub fn insert(&mut self, checkpoint: Checkpoint) { - let mut skip_pointer = Some(checkpoint); - for layer_id in 0.. { - if let Some(checkpoint) = skip_pointer { - skip_pointer = self.get_layer(layer_id).insert(checkpoint); - } else { - break; - } - } - } - - pub fn serialize_into(mut self, output: &mut W) -> io::Result<()> { - let mut last_pointer = None; - for skip_layer in self.layers.iter_mut() { - if let Some(checkpoint) = last_pointer { - skip_layer.push(checkpoint); - } - last_pointer = skip_layer.flush_block(); - } - let layer_buffers: Vec> = self - .layers - .into_iter() - .rev() - .map(|layer| layer.finish()) - .collect(); - - let mut layer_offset = 0; - let mut layer_sizes = Vec::new(); - for layer_buffer in &layer_buffers { - layer_offset += layer_buffer.len() as u64; - layer_sizes.push(VInt(layer_offset)); - } - layer_sizes.serialize(output)?; - for layer_buffer in layer_buffers { - output.write_all(&layer_buffer[..])?; - } - Ok(()) - } -} diff --git a/src/store/mod.rs b/src/store/mod.rs deleted file mode 100644 index 5826435158..0000000000 --- a/src/store/mod.rs +++ /dev/null @@ -1,405 +0,0 @@ -//! Compressed/slow/row-oriented storage for documents. -//! -//! A field needs to be marked as stored in the schema in -//! order to be handled in the `Store`. -//! -//! Internally, documents (or rather their stored fields) are serialized to a buffer. -//! When the buffer exceeds `block_size` (defaults to 16K), the buffer is compressed -//! using LZ4 or Zstd and the resulting block is written to disk. -//! -//! One can then request for a specific `DocId`. -//! A skip list helps navigating to the right block, -//! decompresses it entirely and returns the document within it. -//! -//! If the last document requested was in the same block, -//! the reader is smart enough to avoid decompressing -//! the block a second time, but their is no real -//! uncompressed block* cache. -//! -//! A typical use case for the store is, once -//! the search result page has been computed, returning -//! the actual content of the 10 best document. -//! -//! # Usage -//! -//! Most users should not access the `StoreReader` directly -//! and should rely on either -//! -//! - at the segment level, the [`SegmentReader`'s `doc` -//! method](../struct.SegmentReader.html#method.doc) -//! - at the index level, the [`Searcher::doc()`](crate::Searcher::doc) method - -mod compressors; -mod decompressors; -mod footer; -mod index; -mod reader; -mod writer; - -pub use self::compressors::{Compressor, ZstdCompressor}; -pub use self::decompressors::Decompressor; -pub use self::reader::{CacheStats, StoreReader}; -pub(crate) use self::reader::{DocStoreVersion, DOCSTORE_CACHE_CAPACITY}; -pub use self::writer::StoreWriter; -mod store_compressor; - -/// Doc store version in footer to handle format changes. -pub(crate) const DOC_STORE_VERSION: DocStoreVersion = DocStoreVersion::V2; - -#[cfg(feature = "lz4-compression")] -mod compression_lz4_block; - -#[cfg(feature = "zstd-compression")] -mod compression_zstd_block; - -#[cfg(test)] -pub(crate) mod tests { - - use std::path::Path; - - use super::*; - use crate::directory::{Directory, RamDirectory, WritePtr}; - use crate::fastfield::AliveBitSet; - use crate::schema::{ - self, Schema, TantivyDocument, TextFieldIndexing, TextOptions, Value, STORED, TEXT, - }; - use crate::{Index, IndexWriter, Term}; - - const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do \ - eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad \ - minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip \ - ex ea commodo consequat. Duis aute irure dolor in reprehenderit in \ - voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur \ - sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt \ - mollit anim id est laborum."; - - const BLOCK_SIZE: usize = 16_384; - - pub fn write_lorem_ipsum_store( - writer: WritePtr, - num_docs: usize, - compressor: Compressor, - blocksize: usize, - separate_thread: bool, - ) -> Schema { - let mut schema_builder = Schema::builder(); - let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored()); - let field_title = - schema_builder.add_text_field("title", TextOptions::default().set_stored()); - let schema = schema_builder.build(); - { - let mut store_writer = - StoreWriter::new(writer, compressor, blocksize, separate_thread).unwrap(); - for i in 0..num_docs { - let mut doc = TantivyDocument::default(); - doc.add_text(field_body, LOREM); - doc.add_text(field_title, format!("Doc {i}")); - store_writer.store(&doc, &schema).unwrap(); - } - store_writer.close().unwrap(); - } - schema - } - - const NUM_DOCS: usize = 1_000; - #[test] - fn test_doc_store_iter_with_delete_bug_1077() -> crate::Result<()> { - // this will cover deletion of the first element in a checkpoint - let deleted_doc_ids = (200..300).collect::>(); - let alive_bitset = - AliveBitSet::for_test_from_deleted_docs(&deleted_doc_ids, NUM_DOCS as u32); - - let path = Path::new("store"); - let directory = RamDirectory::create(); - let store_wrt = directory.open_write(path)?; - let schema = - write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE, true); - let field_title = schema.get_field("title").unwrap(); - let store_file = directory.open_read(path)?; - let store = StoreReader::open(store_file, 10)?; - for i in 0..NUM_DOCS as u32 { - assert_eq!( - store - .get::(i)? - .get_first(field_title) - .unwrap() - .as_value() - .as_str() - .unwrap(), - format!("Doc {i}") - ); - } - - for doc in store.iter::(Some(&alive_bitset)) { - let doc = doc?; - let title_content = doc - .get_first(field_title) - .unwrap() - .as_value() - .as_str() - .unwrap() - .to_string(); - if !title_content.starts_with("Doc ") { - panic!("unexpected title_content {title_content}"); - } - - let id = title_content - .strip_prefix("Doc ") - .unwrap() - .parse::() - .unwrap(); - if alive_bitset.is_deleted(id) { - panic!("unexpected deleted document {id}"); - } - } - - Ok(()) - } - - fn test_store( - compressor: Compressor, - blocksize: usize, - separate_thread: bool, - ) -> crate::Result<()> { - let path = Path::new("store"); - let directory = RamDirectory::create(); - let store_wrt = directory.open_write(path)?; - let schema = - write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize, separate_thread); - let field_title = schema.get_field("title").unwrap(); - let store_file = directory.open_read(path)?; - let store = StoreReader::open(store_file, 10)?; - for i in 0..NUM_DOCS as u32 { - assert_eq!( - *store - .get::(i)? - .get_first(field_title) - .unwrap() - .as_str() - .unwrap(), - format!("Doc {i}") - ); - } - for (i, doc) in store.iter::(None).enumerate() { - assert_eq!( - *doc?.get_first(field_title).unwrap().as_str().unwrap(), - format!("Doc {i}") - ); - } - Ok(()) - } - - #[test] - fn test_store_no_compression_same_thread() -> crate::Result<()> { - test_store(Compressor::None, BLOCK_SIZE, false) - } - - #[test] - fn test_store_no_compression() -> crate::Result<()> { - test_store(Compressor::None, BLOCK_SIZE, true) - } - - #[cfg(feature = "lz4-compression")] - #[test] - fn test_store_lz4_block() -> crate::Result<()> { - test_store(Compressor::Lz4, BLOCK_SIZE, true) - } - - #[cfg(feature = "zstd-compression")] - #[test] - fn test_store_zstd() -> crate::Result<()> { - test_store( - Compressor::Zstd(ZstdCompressor::default()), - BLOCK_SIZE, - true, - ) - } - - #[test] - fn test_store_with_delete() -> crate::Result<()> { - let mut schema_builder = schema::Schema::builder(); - - let text_field_options = TextOptions::default() - .set_indexing_options( - TextFieldIndexing::default() - .set_index_option(schema::IndexRecordOption::WithFreqsAndPositions), - ) - .set_stored(); - let text_field = schema_builder.add_text_field("text_field", text_field_options); - let schema = schema_builder.build(); - let index_builder = Index::builder().schema(schema); - - let index = index_builder.create_in_ram()?; - - { - let mut index_writer: IndexWriter = index.writer_for_tests().unwrap(); - index_writer.add_document(doc!(text_field=> "deleteme"))?; - index_writer.add_document(doc!(text_field=> "deletemenot"))?; - index_writer.add_document(doc!(text_field=> "deleteme"))?; - index_writer.add_document(doc!(text_field=> "deletemenot"))?; - index_writer.add_document(doc!(text_field=> "deleteme"))?; - - index_writer.delete_term(Term::from_field_text(text_field, "deleteme")); - index_writer.commit()?; - } - - let searcher = index.reader()?.searcher(); - let reader = searcher.segment_reader(0); - let store = reader.get_store_reader(10)?; - for doc in store.iter::(reader.alive_bitset()) { - assert_eq!( - *doc?.get_first(text_field).unwrap().as_str().unwrap(), - "deletemenot".to_string() - ); - } - Ok(()) - } - - #[cfg(feature = "lz4-compression")] - #[cfg(feature = "zstd-compression")] - #[test] - fn test_merge_with_changed_compressor() -> crate::Result<()> { - let mut schema_builder = schema::Schema::builder(); - - let text_field = schema_builder.add_text_field("text_field", TEXT | STORED); - let schema = schema_builder.build(); - let index_builder = Index::builder().schema(schema); - - let mut index = index_builder.create_in_ram().unwrap(); - index.settings_mut().docstore_compression = Compressor::Lz4; - { - let mut index_writer: IndexWriter = index.writer_for_tests().unwrap(); - // put enough data create enough blocks in the doc store to be considered for stacking - for _ in 0..200 { - index_writer.add_document(doc!(text_field=> LOREM))?; - } - assert!(index_writer.commit().is_ok()); - for _ in 0..200 { - index_writer.add_document(doc!(text_field=> LOREM))?; - } - assert!(index_writer.commit().is_ok()); - } - assert_eq!( - index.reader().unwrap().searcher().segment_readers()[0] - .get_store_reader(10) - .unwrap() - .decompressor(), - Decompressor::Lz4 - ); - // Change compressor, this disables stacking on merging - let index_settings = index.settings_mut(); - index_settings.docstore_compression = Compressor::Zstd(Default::default()); - // Merging the segments - { - let segment_ids = index - .searchable_segment_ids() - .expect("Searchable segments failed."); - let mut index_writer: IndexWriter = index.writer_for_tests().unwrap(); - assert!(index_writer.merge(&segment_ids).wait().is_ok()); - assert!(index_writer.wait_merging_threads().is_ok()); - } - - let searcher = index.reader().unwrap().searcher(); - assert_eq!(searcher.segment_readers().len(), 1); - let reader = searcher.segment_readers().iter().last().unwrap(); - let store = reader.get_store_reader(10).unwrap(); - - for doc in store - .iter::(reader.alive_bitset()) - .take(50) - { - assert_eq!( - *doc?.get_first(text_field).and_then(|v| v.as_str()).unwrap(), - LOREM.to_string() - ); - } - assert_eq!(store.decompressor(), Decompressor::Zstd); - - Ok(()) - } - - #[test] - fn test_merge_of_small_segments() -> crate::Result<()> { - let mut schema_builder = schema::Schema::builder(); - - let text_field = schema_builder.add_text_field("text_field", TEXT | STORED); - let schema = schema_builder.build(); - let index_builder = Index::builder().schema(schema); - - let index = index_builder.create_in_ram().unwrap(); - - { - let mut index_writer = index.writer_for_tests()?; - index_writer.add_document(doc!(text_field=> "1"))?; - index_writer.commit()?; - index_writer.add_document(doc!(text_field=> "2"))?; - index_writer.commit()?; - index_writer.add_document(doc!(text_field=> "3"))?; - index_writer.commit()?; - index_writer.add_document(doc!(text_field=> "4"))?; - index_writer.commit()?; - index_writer.add_document(doc!(text_field=> "5"))?; - index_writer.commit()?; - } - // Merging the segments - { - let segment_ids = index.searchable_segment_ids()?; - let mut index_writer: IndexWriter = index.writer_for_tests()?; - index_writer.merge(&segment_ids).wait()?; - index_writer.wait_merging_threads()?; - } - - let searcher = index.reader()?.searcher(); - assert_eq!(searcher.segment_readers().len(), 1); - let reader = searcher.segment_readers().iter().last().unwrap(); - let store = reader.get_store_reader(10)?; - assert_eq!(store.block_checkpoints().count(), 1); - Ok(()) - } -} - -#[cfg(all(test, feature = "unstable"))] -mod bench { - - use std::path::Path; - - use test::Bencher; - - use super::tests::write_lorem_ipsum_store; - use crate::directory::{Directory, RamDirectory}; - use crate::store::{Compressor, StoreReader}; - use crate::TantivyDocument; - - #[bench] - #[cfg(feature = "mmap")] - fn bench_store_encode(b: &mut Bencher) { - let directory = RamDirectory::create(); - let path = Path::new("store"); - b.iter(|| { - write_lorem_ipsum_store( - directory.open_write(path).unwrap(), - 1_000, - Compressor::default(), - 16_384, - true, - ); - directory.delete(path).unwrap(); - }); - } - - #[bench] - fn bench_store_decode(b: &mut Bencher) { - let directory = RamDirectory::create(); - let path = Path::new("store"); - write_lorem_ipsum_store( - directory.open_write(path).unwrap(), - 1_000, - Compressor::default(), - 16_384, - true, - ); - let store_file = directory.open_read(path).unwrap(); - let store = StoreReader::open(store_file, 10).unwrap(); - b.iter(|| store.iter::(None).collect::>()); - } -} diff --git a/src/store/reader.rs b/src/store/reader.rs deleted file mode 100644 index 7aa0c4aa13..0000000000 --- a/src/store/reader.rs +++ /dev/null @@ -1,442 +0,0 @@ -use std::fmt::Display; -use std::io; -use std::iter::Sum; -use std::num::NonZeroUsize; -use std::ops::{AddAssign, Range}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; - -use common::{BinarySerializable, OwnedBytes}; -use lru::LruCache; - -use super::footer::DocStoreFooter; -use super::index::SkipIndex; -use super::Decompressor; -use crate::directory::FileSlice; -use crate::error::DataCorruption; -use crate::fastfield::AliveBitSet; -use crate::schema::document::{BinaryDocumentDeserializer, DocumentDeserialize}; -use crate::space_usage::StoreSpaceUsage; -use crate::store::index::Checkpoint; -use crate::DocId; - -pub(crate) const DOCSTORE_CACHE_CAPACITY: usize = 100; - -type Block = OwnedBytes; - -/// The format version of the document store. -#[derive(Clone, Copy, Debug, PartialEq, PartialOrd)] -pub(crate) enum DocStoreVersion { - V1 = 1, - V2 = 2, -} -impl Display for DocStoreVersion { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - DocStoreVersion::V1 => write!(f, "V1"), - DocStoreVersion::V2 => write!(f, "V2"), - } - } -} -impl BinarySerializable for DocStoreVersion { - fn serialize(&self, writer: &mut W) -> io::Result<()> { - (*self as u32).serialize(writer) - } - - fn deserialize(reader: &mut R) -> io::Result { - Ok(match u32::deserialize(reader)? { - 1 => DocStoreVersion::V1, - 2 => DocStoreVersion::V2, - v => { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("Invalid doc store version {v}"), - )) - } - }) - } -} - -/// Reads document off tantivy's [`Store`](./index.html) -pub struct StoreReader { - decompressor: Decompressor, - doc_store_version: DocStoreVersion, - data: FileSlice, - skip_index: Arc, - space_usage: StoreSpaceUsage, - cache: BlockCache, -} - -/// The cache for decompressed blocks. -struct BlockCache { - cache: Option>>, - cache_hits: AtomicUsize, - cache_misses: AtomicUsize, -} - -impl BlockCache { - fn get_from_cache(&self, pos: usize) -> Option { - if let Some(block) = self - .cache - .as_ref() - .and_then(|cache| cache.lock().unwrap().get(&pos).cloned()) - { - self.cache_hits.fetch_add(1, Ordering::SeqCst); - return Some(block); - } - self.cache_misses.fetch_add(1, Ordering::SeqCst); - None - } - - fn put_into_cache(&self, pos: usize, data: Block) { - if let Some(cache) = self.cache.as_ref() { - cache.lock().unwrap().put(pos, data); - } - } - - fn stats(&self) -> CacheStats { - CacheStats { - cache_hits: self.cache_hits.load(Ordering::Relaxed), - cache_misses: self.cache_misses.load(Ordering::Relaxed), - num_entries: self.len(), - } - } - - fn len(&self) -> usize { - self.cache - .as_ref() - .map_or(0, |cache| cache.lock().unwrap().len()) - } - - #[cfg(test)] - fn peek_lru(&self) -> Option { - self.cache - .as_ref() - .and_then(|cache| cache.lock().unwrap().peek_lru().map(|(&k, _)| k)) - } -} - -#[derive(Debug, Default)] -/// CacheStats for the `StoreReader`. -pub struct CacheStats { - /// The number of entries in the cache - pub num_entries: usize, - /// The number of cache hits. - pub cache_hits: usize, - /// The number of cache misses. - pub cache_misses: usize, -} - -impl AddAssign for CacheStats { - fn add_assign(&mut self, other: Self) { - *self = Self { - num_entries: self.num_entries + other.num_entries, - cache_hits: self.cache_hits + other.cache_hits, - cache_misses: self.cache_misses + other.cache_misses, - }; - } -} - -impl Sum for CacheStats { - fn sum>(mut iter: I) -> Self { - let mut first = iter.next().unwrap_or_default(); - for el in iter { - first += el; - } - first - } -} - -impl StoreReader { - /// Opens a store reader - /// - /// `cache_num_blocks` sets the number of decompressed blocks to be cached in an LRU. - /// The size of blocks is configurable, this should be reflexted in the - pub fn open(store_file: FileSlice, cache_num_blocks: usize) -> io::Result { - let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?; - - let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize); - let index_data = offset_index_file.read_bytes()?; - let space_usage = - StoreSpaceUsage::new(data_file.num_bytes(), offset_index_file.num_bytes()); - let skip_index = SkipIndex::open(index_data); - Ok(StoreReader { - decompressor: footer.decompressor, - doc_store_version: footer.doc_store_version, - data: data_file, - cache: BlockCache { - cache: NonZeroUsize::new(cache_num_blocks) - .map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))), - cache_hits: Default::default(), - cache_misses: Default::default(), - }, - skip_index: Arc::new(skip_index), - space_usage, - }) - } - - pub(crate) fn block_checkpoints(&self) -> impl Iterator + '_ { - self.skip_index.checkpoints() - } - - pub(crate) fn decompressor(&self) -> Decompressor { - self.decompressor - } - - /// Returns the cache hit and miss statistics of the store reader. - pub(crate) fn cache_stats(&self) -> CacheStats { - self.cache.stats() - } - - /// Get checkpoint for `DocId`. The checkpoint can be used to load a block containing the - /// document. - /// - /// Advanced API. In most cases use [`get`](Self::get). - fn block_checkpoint(&self, doc_id: DocId) -> crate::Result { - self.skip_index.seek(doc_id).ok_or_else(|| { - crate::TantivyError::InvalidArgument(format!("Failed to lookup Doc #{doc_id}.")) - }) - } - - pub(crate) fn block_data(&self) -> io::Result { - self.data.read_bytes() - } - - fn get_compressed_block(&self, checkpoint: &Checkpoint) -> io::Result { - self.data.slice(checkpoint.byte_range.clone()).read_bytes() - } - - /// Loads and decompresses a block. - /// - /// Advanced API. In most cases use [`get`](Self::get). - fn read_block(&self, checkpoint: &Checkpoint) -> io::Result { - let cache_key = checkpoint.byte_range.start; - if let Some(block) = self.cache.get_from_cache(cache_key) { - return Ok(block); - } - - let compressed_block = self.get_compressed_block(checkpoint)?; - let decompressed_block = - OwnedBytes::new(self.decompressor.decompress(compressed_block.as_ref())?); - - self.cache - .put_into_cache(cache_key, decompressed_block.clone()); - - Ok(decompressed_block) - } - - /// Reads a given document. - /// - /// Calling `.get(doc)` is relatively costly as it requires - /// decompressing a compressed block. The store utilizes a LRU cache, - /// so accessing docs from the same compressed block should be faster. - /// For that reason a store reader should be kept and reused. - /// - /// It should not be called to score documents - /// for instance. - pub fn get(&self, doc_id: DocId) -> crate::Result { - let mut doc_bytes = self.get_document_bytes(doc_id)?; - - let deserializer = - BinaryDocumentDeserializer::from_reader(&mut doc_bytes, self.doc_store_version) - .map_err(crate::TantivyError::from)?; - D::deserialize(deserializer).map_err(crate::TantivyError::from) - } - - /// Returns raw bytes of a given document. - /// - /// Calling `.get(doc)` is relatively costly as it requires - /// decompressing a compressed block. The store utilizes a LRU cache, - /// so accessing docs from the same compressed block should be faster. - /// For that reason a store reader should be kept and reused. - pub fn get_document_bytes(&self, doc_id: DocId) -> crate::Result { - let checkpoint = self.block_checkpoint(doc_id)?; - let block = self.read_block(&checkpoint)?; - Self::get_document_bytes_from_block(block, doc_id, &checkpoint) - } - - /// Advanced API. - /// - /// In most cases use [`get_document_bytes`](Self::get_document_bytes). - fn get_document_bytes_from_block( - block: OwnedBytes, - doc_id: DocId, - checkpoint: &Checkpoint, - ) -> crate::Result { - let doc_pos = doc_id - checkpoint.doc_range.start; - - let range = block_read_index(&block, doc_pos)?; - Ok(block.slice(range)) - } - - /// Iterator over all Documents in their order as they are stored in the doc store. - /// Use this, if you want to extract all Documents from the doc store. - /// The `alive_bitset` has to be forwarded from the `SegmentReader` or the results may be wrong. - pub fn iter<'a: 'b, 'b, D: DocumentDeserialize>( - &'b self, - alive_bitset: Option<&'a AliveBitSet>, - ) -> impl Iterator> + 'b { - self.iter_raw(alive_bitset).map(|doc_bytes_res| { - let mut doc_bytes = doc_bytes_res?; - - let deserializer = - BinaryDocumentDeserializer::from_reader(&mut doc_bytes, self.doc_store_version) - .map_err(crate::TantivyError::from)?; - D::deserialize(deserializer).map_err(crate::TantivyError::from) - }) - } - - /// Iterator over all raw Documents in their order as they are stored in the doc store. - /// Use this, if you want to extract all Documents from the doc store. - /// The `alive_bitset` has to be forwarded from the `SegmentReader` or the results may be wrong. - pub(crate) fn iter_raw<'a: 'b, 'b>( - &'b self, - alive_bitset: Option<&'a AliveBitSet>, - ) -> impl Iterator> + 'b { - let last_doc_id = self - .block_checkpoints() - .last() - .map(|checkpoint| checkpoint.doc_range.end) - .unwrap_or(0); - let mut checkpoint_block_iter = self.block_checkpoints(); - let mut curr_checkpoint = checkpoint_block_iter.next(); - let mut curr_block = curr_checkpoint - .as_ref() - .map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); // map error in order to enable cloning - let mut doc_pos = 0; - (0..last_doc_id) - .filter_map(move |doc_id| { - // filter_map is only used to resolve lifetime issues between the two closures on - // the outer variables - - // check move to next checkpoint - if doc_id >= curr_checkpoint.as_ref().unwrap().doc_range.end { - curr_checkpoint = checkpoint_block_iter.next(); - curr_block = curr_checkpoint - .as_ref() - .map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); - doc_pos = 0; - } - - let alive = alive_bitset - .map(|bitset| bitset.is_alive(doc_id)) - .unwrap_or(true); - let res = if alive { - Some((curr_block.clone(), doc_pos)) - } else { - None - }; - doc_pos += 1; - res - }) - .map(move |(block, doc_pos)| { - let block = block - .ok_or_else(|| { - DataCorruption::comment_only( - "the current checkpoint in the doc store iterator is none, this \ - should never happen", - ) - })? - .map_err(|error_kind| { - std::io::Error::new(error_kind, "error when reading block in doc store") - })?; - - let range = block_read_index(&block, doc_pos)?; - Ok(block.slice(range)) - }) - } - - /// Summarize total space usage of this store reader. - pub fn space_usage(&self) -> StoreSpaceUsage { - self.space_usage.clone() - } -} - -fn block_read_index(block: &[u8], doc_pos: u32) -> crate::Result> { - let doc_pos = doc_pos as usize; - let size_of_u32 = std::mem::size_of::(); - - let index_len_pos = block.len() - size_of_u32; - let index_len = u32::deserialize(&mut &block[index_len_pos..])? as usize; - - if doc_pos > index_len { - return Err(crate::TantivyError::InternalError( - "Attempted to read doc from wrong block".to_owned(), - )); - } - - let index_start = block.len() - (index_len + 1) * size_of_u32; - let index = &block[index_start..index_start + index_len * size_of_u32]; - - let start_offset = u32::deserialize(&mut &index[doc_pos * size_of_u32..])? as usize; - let end_offset = u32::deserialize(&mut &index[(doc_pos + 1) * size_of_u32..]) - .unwrap_or(index_start as u32) as usize; - Ok(start_offset..end_offset) -} - -#[cfg(test)] -mod tests { - use std::path::Path; - - use super::*; - use crate::directory::RamDirectory; - use crate::schema::{Field, TantivyDocument, Value}; - use crate::store::tests::write_lorem_ipsum_store; - use crate::store::Compressor; - use crate::Directory; - - const BLOCK_SIZE: usize = 16_384; - - fn get_text_field<'a>(doc: &'a TantivyDocument, field: &'a Field) -> Option<&'a str> { - doc.get_first(*field).and_then(|f| f.as_value().as_str()) - } - - #[test] - fn test_doc_store_version_ord() { - assert!(DocStoreVersion::V1 < DocStoreVersion::V2); - } - - #[test] - fn test_store_lru_cache() -> crate::Result<()> { - let directory = RamDirectory::create(); - let path = Path::new("store"); - let writer = directory.open_write(path)?; - let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE, true); - let title = schema.get_field("title").unwrap(); - let store_file = directory.open_read(path)?; - let store = StoreReader::open(store_file, DOCSTORE_CACHE_CAPACITY)?; - - assert_eq!(store.cache.len(), 0); - assert_eq!(store.cache_stats().cache_hits, 0); - assert_eq!(store.cache_stats().cache_misses, 0); - - let doc = store.get(0)?; - assert_eq!(get_text_field(&doc, &title), Some("Doc 0")); - - assert_eq!(store.cache.len(), 1); - assert_eq!(store.cache_stats().cache_hits, 0); - assert_eq!(store.cache_stats().cache_misses, 1); - - assert_eq!(store.cache.peek_lru(), Some(0)); - - let doc = store.get(499)?; - assert_eq!(get_text_field(&doc, &title), Some("Doc 499")); - - assert_eq!(store.cache.len(), 2); - assert_eq!(store.cache_stats().cache_hits, 0); - assert_eq!(store.cache_stats().cache_misses, 2); - - assert_eq!(store.cache.peek_lru(), Some(0)); - - let doc = store.get(0)?; - assert_eq!(get_text_field(&doc, &title), Some("Doc 0")); - - assert_eq!(store.cache.len(), 2); - assert_eq!(store.cache_stats().cache_hits, 1); - assert_eq!(store.cache_stats().cache_misses, 2); - - assert_eq!(store.cache.peek_lru(), Some(11207)); - - Ok(()) - } -} diff --git a/src/store/store_compressor.rs b/src/store/store_compressor.rs deleted file mode 100644 index 20211b25a6..0000000000 --- a/src/store/store_compressor.rs +++ /dev/null @@ -1,272 +0,0 @@ -use std::io::Write; -use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; -use std::thread::JoinHandle; -use std::{io, thread}; - -use common::{BinarySerializable, CountingWriter, TerminatingWrite}; - -use super::DOC_STORE_VERSION; -use crate::directory::WritePtr; -use crate::store::footer::DocStoreFooter; -use crate::store::index::{Checkpoint, SkipIndexBuilder}; -use crate::store::{Compressor, Decompressor, StoreReader}; -use crate::DocId; - -pub struct BlockCompressor(BlockCompressorVariants); - -// The struct wrapping an enum is just here to keep the -// impls private. -enum BlockCompressorVariants { - SameThread(BlockCompressorImpl), - DedicatedThread(DedicatedThreadBlockCompressorImpl), -} - -impl BlockCompressor { - pub fn new(compressor: Compressor, wrt: WritePtr, dedicated_thread: bool) -> io::Result { - let block_compressor_impl = BlockCompressorImpl::new(compressor, wrt); - if dedicated_thread { - let dedicated_thread_compressor = - DedicatedThreadBlockCompressorImpl::new(block_compressor_impl)?; - Ok(BlockCompressor(BlockCompressorVariants::DedicatedThread( - dedicated_thread_compressor, - ))) - } else { - Ok(BlockCompressor(BlockCompressorVariants::SameThread( - block_compressor_impl, - ))) - } - } - - pub fn compress_block_and_write( - &mut self, - bytes: &[u8], - num_docs_in_block: u32, - ) -> io::Result<()> { - match &mut self.0 { - BlockCompressorVariants::SameThread(block_compressor) => { - block_compressor.compress_block_and_write(bytes, num_docs_in_block)?; - } - BlockCompressorVariants::DedicatedThread(different_thread_block_compressor) => { - different_thread_block_compressor - .compress_block_and_write(bytes, num_docs_in_block)?; - } - } - Ok(()) - } - - pub fn stack_reader(&mut self, store_reader: StoreReader) -> io::Result<()> { - match &mut self.0 { - BlockCompressorVariants::SameThread(block_compressor) => { - block_compressor.stack(store_reader)?; - } - BlockCompressorVariants::DedicatedThread(different_thread_block_compressor) => { - different_thread_block_compressor.stack_reader(store_reader)?; - } - } - Ok(()) - } - - pub fn close(self) -> io::Result<()> { - let imp = self.0; - match imp { - BlockCompressorVariants::SameThread(block_compressor) => block_compressor.close(), - BlockCompressorVariants::DedicatedThread(different_thread_block_compressor) => { - different_thread_block_compressor.close() - } - } - } -} - -struct BlockCompressorImpl { - compressor: Compressor, - first_doc_in_block: DocId, - offset_index_writer: SkipIndexBuilder, - intermediary_buffer: Vec, - writer: CountingWriter, -} - -impl BlockCompressorImpl { - fn new(compressor: Compressor, writer: WritePtr) -> Self { - Self { - compressor, - first_doc_in_block: 0, - offset_index_writer: SkipIndexBuilder::new(), - intermediary_buffer: Vec::new(), - writer: CountingWriter::wrap(writer), - } - } - - fn compress_block_and_write(&mut self, data: &[u8], num_docs_in_block: u32) -> io::Result<()> { - assert!(num_docs_in_block > 0); - self.intermediary_buffer.clear(); - self.compressor - .compress_into(data, &mut self.intermediary_buffer)?; - - let start_offset = self.writer.written_bytes() as usize; - self.writer.write_all(&self.intermediary_buffer)?; - let end_offset = self.writer.written_bytes() as usize; - - self.register_checkpoint(Checkpoint { - doc_range: self.first_doc_in_block..self.first_doc_in_block + num_docs_in_block, - byte_range: start_offset..end_offset, - }); - Ok(()) - } - - fn register_checkpoint(&mut self, checkpoint: Checkpoint) { - self.offset_index_writer.insert(checkpoint.clone()); - self.first_doc_in_block = checkpoint.doc_range.end; - } - - /// Stacks a store reader on top of the documents written so far. - /// This method is an optimization compared to iterating over the documents - /// in the store and adding them one by one, as the store's data will - /// not be decompressed and then recompressed. - fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { - let doc_shift = self.first_doc_in_block; - let start_shift = self.writer.written_bytes() as usize; - - // just bulk write all of the block of the given reader. - self.writer - .write_all(store_reader.block_data()?.as_slice())?; - - // concatenate the index of the `store_reader`, after translating - // its start doc id and its start file offset. - for mut checkpoint in store_reader.block_checkpoints() { - checkpoint.doc_range.start += doc_shift; - checkpoint.doc_range.end += doc_shift; - checkpoint.byte_range.start += start_shift; - checkpoint.byte_range.end += start_shift; - self.register_checkpoint(checkpoint); - } - Ok(()) - } - - fn close(mut self) -> io::Result<()> { - let header_offset: u64 = self.writer.written_bytes(); - let docstore_footer = DocStoreFooter::new( - header_offset, - Decompressor::from(self.compressor), - DOC_STORE_VERSION, - ); - self.offset_index_writer.serialize_into(&mut self.writer)?; - docstore_footer.serialize(&mut self.writer)?; - self.writer.terminate() - } -} - -// --------------------------------- -enum BlockCompressorMessage { - CompressBlockAndWrite { - block_data: Vec, - num_docs_in_block: u32, - }, - Stack(StoreReader), -} - -struct DedicatedThreadBlockCompressorImpl { - join_handle: Option>>, - tx: SyncSender, -} - -impl DedicatedThreadBlockCompressorImpl { - fn new(mut block_compressor: BlockCompressorImpl) -> io::Result { - let (tx, rx): ( - SyncSender, - Receiver, - ) = sync_channel(3); - let join_handle = thread::Builder::new() - .name("docstore-compressor-thread".to_string()) - .spawn(move || { - while let Ok(packet) = rx.recv() { - match packet { - BlockCompressorMessage::CompressBlockAndWrite { - block_data, - num_docs_in_block, - } => { - block_compressor - .compress_block_and_write(&block_data[..], num_docs_in_block)?; - } - BlockCompressorMessage::Stack(store_reader) => { - block_compressor.stack(store_reader)?; - } - } - } - block_compressor.close()?; - Ok(()) - })?; - Ok(DedicatedThreadBlockCompressorImpl { - join_handle: Some(join_handle), - tx, - }) - } - - fn compress_block_and_write(&mut self, bytes: &[u8], num_docs_in_block: u32) -> io::Result<()> { - self.send(BlockCompressorMessage::CompressBlockAndWrite { - block_data: bytes.to_vec(), - num_docs_in_block, - }) - } - - fn stack_reader(&mut self, store_reader: StoreReader) -> io::Result<()> { - self.send(BlockCompressorMessage::Stack(store_reader)) - } - - fn send(&mut self, msg: BlockCompressorMessage) -> io::Result<()> { - if self.tx.send(msg).is_err() { - harvest_thread_result(self.join_handle.take())?; - return Err(io::Error::other("Unidentified error.")); - } - Ok(()) - } - - fn close(self) -> io::Result<()> { - drop(self.tx); - harvest_thread_result(self.join_handle) - } -} - -/// Wait for the thread result to terminate and returns its result. -/// -/// If the thread panicked, or if the result has already been harvested, -/// returns an explicit error. -fn harvest_thread_result(join_handle_opt: Option>>) -> io::Result<()> { - let join_handle = join_handle_opt.ok_or_else(|| io::Error::other("Thread already joined."))?; - join_handle - .join() - .map_err(|_err| io::Error::other("Compressing thread panicked."))? -} - -#[cfg(test)] -mod tests { - use std::io; - use std::path::Path; - - use crate::directory::RamDirectory; - use crate::store::store_compressor::BlockCompressor; - use crate::store::Compressor; - use crate::Directory; - - fn populate_block_compressor(mut block_compressor: BlockCompressor) -> io::Result<()> { - block_compressor.compress_block_and_write(b"hello", 1)?; - block_compressor.compress_block_and_write(b"happy", 1)?; - block_compressor.close()?; - Ok(()) - } - - #[test] - fn test_block_store_compressor_impls_yield_the_same_result() { - let ram_directory = RamDirectory::default(); - let path1 = Path::new("path1"); - let path2 = Path::new("path2"); - let wrt1 = ram_directory.open_write(path1).unwrap(); - let wrt2 = ram_directory.open_write(path2).unwrap(); - let block_compressor1 = BlockCompressor::new(Compressor::None, wrt1, true).unwrap(); - let block_compressor2 = BlockCompressor::new(Compressor::None, wrt2, false).unwrap(); - populate_block_compressor(block_compressor1).unwrap(); - populate_block_compressor(block_compressor2).unwrap(); - let data1 = ram_directory.open_read(path1).unwrap(); - let data2 = ram_directory.open_read(path2).unwrap(); - assert_eq!(data1.read_bytes().unwrap(), data2.read_bytes().unwrap()); - } -} diff --git a/src/store/writer.rs b/src/store/writer.rs deleted file mode 100644 index ef514accc4..0000000000 --- a/src/store/writer.rs +++ /dev/null @@ -1,142 +0,0 @@ -use std::io; - -use common::BinarySerializable; - -use super::compressors::Compressor; -use super::StoreReader; -use crate::directory::WritePtr; -use crate::schema::document::{BinaryDocumentSerializer, Document}; -use crate::schema::Schema; -use crate::store::store_compressor::BlockCompressor; -use crate::DocId; - -/// Write tantivy's [`Store`](./index.html) -/// -/// Contrary to the other components of `tantivy`, -/// the store is written to disc as document as being added, -/// as opposed to when the segment is getting finalized. -/// -/// The skip list index on the other hand, is built in memory. -pub struct StoreWriter { - compressor: Compressor, - block_size: usize, - num_docs_in_current_block: DocId, - current_block: Vec, - doc_pos: Vec, - block_compressor: BlockCompressor, -} - -impl StoreWriter { - /// Create a store writer. - /// - /// The store writer will writes blocks on disc as - /// document are added. - pub fn new( - writer: WritePtr, - compressor: Compressor, - block_size: usize, - dedicated_thread: bool, - ) -> io::Result { - let block_compressor = BlockCompressor::new(compressor, writer, dedicated_thread)?; - Ok(StoreWriter { - compressor, - block_size, - num_docs_in_current_block: 0, - doc_pos: Vec::new(), - current_block: Vec::new(), - block_compressor, - }) - } - - pub(crate) fn compressor(&self) -> Compressor { - self.compressor - } - - /// The memory used (inclusive childs) - pub fn mem_usage(&self) -> usize { - self.current_block.capacity() + self.doc_pos.capacity() * std::mem::size_of::() - } - - /// Checks if the current block is full, and if so, compresses and flushes it. - fn check_flush_block(&mut self) -> io::Result<()> { - // this does not count the VInt storing the index length itself, but it is negligible in - // front of everything else. - let index_len = self.doc_pos.len() * std::mem::size_of::(); - if self.current_block.len() + index_len > self.block_size { - self.send_current_block_to_compressor()?; - } - Ok(()) - } - - /// Flushes current uncompressed block and sends to compressor. - fn send_current_block_to_compressor(&mut self) -> io::Result<()> { - // We don't do anything if the current block is empty to begin with. - if self.current_block.is_empty() { - return Ok(()); - } - - let size_of_u32 = std::mem::size_of::(); - self.current_block - .reserve((self.doc_pos.len() + 1) * size_of_u32); - - for pos in self.doc_pos.iter() { - pos.serialize(&mut self.current_block)?; - } - (self.doc_pos.len() as u32).serialize(&mut self.current_block)?; - - self.block_compressor - .compress_block_and_write(&self.current_block, self.num_docs_in_current_block)?; - self.doc_pos.clear(); - self.current_block.clear(); - self.num_docs_in_current_block = 0; - Ok(()) - } - - /// Store a new document. - /// - /// The document id is implicitly the current number - /// of documents. - pub fn store(&mut self, document: &D, schema: &Schema) -> io::Result<()> { - self.doc_pos.push(self.current_block.len() as u32); - - let mut serializer = BinaryDocumentSerializer::new(&mut self.current_block, schema); - serializer.serialize_doc(document)?; - - self.num_docs_in_current_block += 1; - self.check_flush_block()?; - Ok(()) - } - - /// Store bytes of a serialized document. - /// - /// The document id is implicitly the current number - /// of documents. - pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { - self.doc_pos.push(self.current_block.len() as u32); - self.current_block.extend_from_slice(serialized_document); - self.num_docs_in_current_block += 1; - self.check_flush_block()?; - Ok(()) - } - - /// Stacks a store reader on top of the documents written so far. - /// This method is an optimization compared to iterating over the documents - /// in the store and adding them one by one, as the store's data will - /// not be decompressed and then recompressed. - pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { - // We flush the current block first before stacking - self.send_current_block_to_compressor()?; - self.block_compressor.stack_reader(store_reader)?; - Ok(()) - } - - /// Finalized the store writer. - /// - /// Compress the last unfinished block if any, - /// and serializes the skip list index on disc. - pub fn close(mut self) -> io::Result<()> { - self.send_current_block_to_compressor()?; - self.block_compressor.close()?; - Ok(()) - } -}