diff --git a/CHANGELOG.md b/CHANGELOG.md index c5f65aebbc..b9c1fbe8e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,3 +30,4 @@ have been removed to keep the changelog focused on Yeehaw's history. - remove unused `std::iter` imports from test modules. - expand documentation for document deserialization traits. - reorder inventory tasks to prioritize fixing doctest regressions. +- remove `quickwit` feature and associated asynchronous APIs. diff --git a/Cargo.toml b/Cargo.toml index 4d72152bab..8a994844b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,6 @@ arc-swap = "1.5.0" bon = "3.3.1" columnar = { version = "0.5", path = "./columnar", package = "tantivy-columnar" } -sstable = { version = "0.5", path = "./sstable", package = "tantivy-sstable", optional = true } stacker = { version = "0.5", path = "./stacker", package = "tantivy-stacker" } query-grammar = { version = "0.1.0", path = "./query-grammar", package = "tantivy-query-grammar" } tantivy-bitpacker = { version = "0.8", path = "./bitpacker" } @@ -66,8 +65,6 @@ common = { version = "0.9", path = "./common/", package = "tantivy-common" } tokenizer-api = { version = "0.5", path = "./tokenizer-api", package = "tantivy-tokenizer-api" } sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] } hyperloglogplus = { version = "0.4.1", features = ["const-loop"] } -futures-util = { version = "0.3.28", optional = true } -futures-channel = { version = "0.3.28", optional = true } fnv = "1.0.7" [target.'cfg(windows)'.dependencies] @@ -125,8 +122,6 @@ columnar-zstd-compression = ["columnar/zstd-compression"] failpoints = ["fail", "fail/failpoints"] unstable = [] # useful for benches. -quickwit = ["sstable", "futures-util", "futures-channel"] - # Compares only the hash of a string when indexing data. # Increases indexing speed, but may lead to extremely rare missing terms, when there's a hash collision. # Uses 64bit ahash. diff --git a/src/compat_tests.rs b/src/compat_tests.rs index ac1d00a45d..ec6604c073 100644 --- a/src/compat_tests.rs +++ b/src/compat_tests.rs @@ -35,7 +35,6 @@ fn path_for_version(version: &str) -> String { /// feature flag quickwit uses a different dictionary type #[test] -#[cfg(not(feature = "quickwit"))] fn test_format_6() { let path = path_for_version("6"); @@ -46,7 +45,6 @@ fn test_format_6() { /// feature flag quickwit uses a different dictionary type #[test] -#[cfg(not(feature = "quickwit"))] fn test_format_7() { let path = path_for_version("7"); @@ -55,7 +53,6 @@ fn test_format_7() { assert_date_time_precision(&index, DateTimePrecision::Nanoseconds); } -#[cfg(not(feature = "quickwit"))] fn assert_date_time_precision(index: &Index, doc_store_precision: DateTimePrecision) { use collector::TopDocs; let reader = index.reader().expect("Failed to create reader"); diff --git a/src/core/executor.rs b/src/core/executor.rs index 8cc7e0026f..7bfca2808a 100644 --- a/src/core/executor.rs +++ b/src/core/executor.rs @@ -1,8 +1,5 @@ use std::sync::Arc; -#[cfg(feature = "quickwit")] -use futures_util::{future::Either, FutureExt}; - use crate::TantivyError; /// Executor makes it possible to run tasks in single thread or @@ -15,13 +12,6 @@ pub enum Executor { ThreadPool(Arc), } -#[cfg(feature = "quickwit")] -impl From> for Executor { - fn from(thread_pool: Arc) -> Self { - Executor::ThreadPool(thread_pool) - } -} - impl Executor { /// Creates an Executor that performs all task in the caller thread. pub fn single_thread() -> Executor { @@ -92,32 +82,6 @@ impl Executor { } } } - - /// Spawn a task on the pool, returning a future completing on task success. - /// - /// If the task panics, returns `Err(())`. - #[cfg(feature = "quickwit")] - pub fn spawn_blocking( - &self, - cpu_intensive_task: impl FnOnce() -> T + Send + 'static, - ) -> impl std::future::Future> { - match self { - Executor::SingleThread => Either::Left(std::future::ready(Ok(cpu_intensive_task()))), - Executor::ThreadPool(pool) => { - let (sender, receiver) = oneshot::channel(); - pool.spawn(|| { - if sender.is_closed() { - return; - } - let task_result = cpu_intensive_task(); - let _ = sender.send(task_result); - }); - - let res = receiver.map(|res| res.map_err(|_| ())); - Either::Right(res) - } - } - } } #[cfg(test)] @@ -173,62 +137,4 @@ mod tests { assert_eq!(result[i], i * 2); } } - - #[cfg(feature = "quickwit")] - #[test] - fn test_cancel_cpu_intensive_tasks() { - use std::sync::atomic::{AtomicU64, Ordering}; - use std::sync::Arc; - - let counter: Arc = Default::default(); - - let other_counter: Arc = Default::default(); - - let mut futures = Vec::new(); - let mut other_futures = Vec::new(); - - let (tx, rx) = crossbeam_channel::bounded::<()>(0); - let rx = Arc::new(rx); - let executor = Executor::multi_thread(3, "search-test").unwrap(); - for _ in 0..1000 { - let counter_clone: Arc = counter.clone(); - let other_counter_clone: Arc = other_counter.clone(); - - let rx_clone = rx.clone(); - let rx_clone2 = rx.clone(); - let fut = executor.spawn_blocking(move || { - counter_clone.fetch_add(1, Ordering::SeqCst); - let _ = rx_clone.recv(); - }); - futures.push(fut); - let other_fut = executor.spawn_blocking(move || { - other_counter_clone.fetch_add(1, Ordering::SeqCst); - let _ = rx_clone2.recv(); - }); - other_futures.push(other_fut); - } - - // We execute 100 futures. - for _ in 0..100 { - tx.send(()).unwrap(); - } - - let counter_val = counter.load(Ordering::SeqCst); - let other_counter_val = other_counter.load(Ordering::SeqCst); - assert!(counter_val >= 30); - assert!(other_counter_val >= 30); - - drop(other_futures); - - // We execute 100 futures. - for _ in 0..100 { - tx.send(()).unwrap(); - } - - let counter_val2 = counter.load(Ordering::SeqCst); - assert!(counter_val2 >= counter_val + 100 - 6); - - let other_counter_val2 = other_counter.load(Ordering::SeqCst); - assert!(other_counter_val2 <= other_counter_val + 6); - } } diff --git a/src/core/searcher.rs b/src/core/searcher.rs index 51db7311f4..6d9e0cf74f 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -103,17 +103,6 @@ impl Searcher { cache_stats } - /// Fetches a document in an asynchronous manner. - #[cfg(feature = "quickwit")] - pub async fn doc_async( - &self, - doc_address: DocAddress, - ) -> crate::Result { - let executor = self.inner.index.search_executor(); - let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize]; - store_reader.get_async(doc_address.doc_id, executor).await - } - /// Access the schema associated with the index of this searcher. pub fn schema(&self) -> &Schema { &self.inner.schema @@ -140,19 +129,6 @@ impl Searcher { Ok(total_doc_freq) } - /// Return the overall number of documents containing - /// the given term in an asynchronous manner. - #[cfg(feature = "quickwit")] - pub async fn doc_freq_async(&self, term: &Term) -> crate::Result { - let mut total_doc_freq = 0; - for segment_reader in &self.inner.segment_readers { - let inverted_index = segment_reader.inverted_index(term.field())?; - let doc_freq = inverted_index.doc_freq_async(term).await?; - total_doc_freq += u64::from(doc_freq); - } - Ok(total_doc_freq) - } - /// Return the list of segment readers pub fn segment_readers(&self) -> &[SegmentReader] { &self.inner.segment_readers diff --git a/src/fastfield/readers.rs b/src/fastfield/readers.rs index 56bc87778c..c39baa1d2e 100644 --- a/src/fastfield/readers.rs +++ b/src/fastfield/readers.rs @@ -31,11 +31,7 @@ impl FastFieldReaders { } fn resolve_field(&self, column_name: &str) -> crate::Result> { - let default_field_opt: Option = if cfg!(feature = "quickwit") { - self.schema.get_field("_dynamic").ok() - } else { - None - }; + let default_field_opt: Option = None; self.resolve_column_name_given_default_field(column_name, default_field_opt) } diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index 4be9d9c51f..f9548672d9 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -3,12 +3,6 @@ use std::io; use common::json_path_writer::JSON_END_OF_PATH; use common::BinarySerializable; use fnv::FnvHashSet; -#[cfg(feature = "quickwit")] -use futures_util::{FutureExt, StreamExt, TryStreamExt}; -#[cfg(feature = "quickwit")] -use itertools::Itertools; -#[cfg(feature = "quickwit")] -use tantivy_fst::automaton::{AlwaysMatch, Automaton}; use crate::directory::FileSlice; use crate::positions::PositionReader; @@ -218,215 +212,3 @@ impl InvertedIndexReader { .unwrap_or(0u32)) } } - -#[cfg(feature = "quickwit")] -impl InvertedIndexReader { - pub(crate) async fn get_term_info_async(&self, term: &Term) -> io::Result> { - self.termdict.get_async(term.serialized_value_bytes()).await - } - - async fn get_term_range_async<'a, A: Automaton + 'a>( - &'a self, - terms: impl std::ops::RangeBounds, - automaton: A, - limit: Option, - merge_holes_under_bytes: usize, - ) -> io::Result + 'a> - where - A::State: Clone, - { - use std::ops::Bound; - let range_builder = self.termdict.search(automaton); - let range_builder = match terms.start_bound() { - Bound::Included(bound) => range_builder.ge(bound.serialized_value_bytes()), - Bound::Excluded(bound) => range_builder.gt(bound.serialized_value_bytes()), - Bound::Unbounded => range_builder, - }; - let range_builder = match terms.end_bound() { - Bound::Included(bound) => range_builder.le(bound.serialized_value_bytes()), - Bound::Excluded(bound) => range_builder.lt(bound.serialized_value_bytes()), - Bound::Unbounded => range_builder, - }; - let range_builder = if let Some(limit) = limit { - range_builder.limit(limit) - } else { - range_builder - }; - - let mut stream = range_builder - .into_stream_async_merging_holes(merge_holes_under_bytes) - .await?; - - let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone())); - - // limit on stream is only an optimization to load less data, the stream may still return - // more than limit elements. - let limit = limit.map(|limit| limit as usize).unwrap_or(usize::MAX); - let iter = iter.take(limit); - - Ok(iter) - } - - /// Warmup a block postings given a `Term`. - /// This method is for an advanced usage only. - /// - /// returns a boolean, whether the term was found in the dictionary - pub async fn warm_postings(&self, term: &Term, with_positions: bool) -> io::Result { - let term_info_opt: Option = self.get_term_info_async(term).await?; - if let Some(term_info) = term_info_opt { - let postings = self - .postings_file_slice - .read_bytes_slice_async(term_info.postings_range.clone()); - if with_positions { - let positions = self - .positions_file_slice - .read_bytes_slice_async(term_info.positions_range.clone()); - futures_util::future::try_join(postings, positions).await?; - } else { - postings.await?; - } - Ok(true) - } else { - Ok(false) - } - } - - /// Warmup a block postings given a range of `Term`s. - /// This method is for an advanced usage only. - /// - /// returns a boolean, whether a term matching the range was found in the dictionary - pub async fn warm_postings_range( - &self, - terms: impl std::ops::RangeBounds, - limit: Option, - with_positions: bool, - ) -> io::Result { - let mut term_info = self - .get_term_range_async(terms, AlwaysMatch, limit, 0) - .await?; - - let Some(first_terminfo) = term_info.next() else { - // no key matches, nothing more to load - return Ok(false); - }; - - let last_terminfo = term_info.last().unwrap_or_else(|| first_terminfo.clone()); - - let postings_range = first_terminfo.postings_range.start..last_terminfo.postings_range.end; - let positions_range = - first_terminfo.positions_range.start..last_terminfo.positions_range.end; - - let postings = self - .postings_file_slice - .read_bytes_slice_async(postings_range); - if with_positions { - let positions = self - .positions_file_slice - .read_bytes_slice_async(positions_range); - futures_util::future::try_join(postings, positions).await?; - } else { - postings.await?; - } - Ok(true) - } - - /// Warmup a block postings given a range of `Term`s. - /// This method is for an advanced usage only. - /// - /// returns a boolean, whether a term matching the range was found in the dictionary - pub async fn warm_postings_automaton< - A: Automaton + Clone + Send + 'static, - E: FnOnce(Box io::Result<()> + Send>) -> F, - F: std::future::Future>, - >( - &self, - automaton: A, - // with_positions: bool, at the moment we have no use for it, and supporting it would add - // complexity to the coalesce - executor: E, - ) -> io::Result - where - A::State: Clone, - { - // merge holes under 4MiB, that's how many bytes we can hope to receive during a TTFB from - // S3 (~80MiB/s, and 50ms latency) - const MERGE_HOLES_UNDER_BYTES: usize = (80 * 1024 * 1024 * 50) / 1000; - // we build a first iterator to download everything. Simply calling the function already - // download everything we need from the sstable, but doesn't start iterating over it. - let _term_info_iter = self - .get_term_range_async(.., automaton.clone(), None, MERGE_HOLES_UNDER_BYTES) - .await?; - - let (sender, posting_ranges_to_load_stream) = futures_channel::mpsc::unbounded(); - let termdict = self.termdict.clone(); - let cpu_bound_task = move || { - // then we build a 2nd iterator, this one with no holes, so we don't go through blocks - // we can't match. - // This makes the assumption there is a caching layer below us, which gives sync read - // for free after the initial async access. This might not always be true, but is in - // Quickwit. - // We build things from this closure otherwise we get into lifetime issues that can only - // be solved with self referential strucs. Returning an io::Result from here is a bit - // more leaky abstraction-wise, but a lot better than the alternative - let mut stream = termdict.search(automaton).into_stream()?; - - // we could do without an iterator, but this allows us access to coalesce which simplify - // things - let posting_ranges_iter = - std::iter::from_fn(move || stream.next().map(|(_k, v)| v.postings_range.clone())); - - let merged_posting_ranges_iter = posting_ranges_iter.coalesce(|range1, range2| { - if range1.end + MERGE_HOLES_UNDER_BYTES >= range2.start { - Ok(range1.start..range2.end) - } else { - Err((range1, range2)) - } - }); - - for posting_range in merged_posting_ranges_iter { - if let Err(_) = sender.unbounded_send(posting_range) { - // this should happen only when search is cancelled - return Err(io::Error::other("failed to send posting range back")); - } - } - Ok(()) - }; - let task_handle = executor(Box::new(cpu_bound_task)); - - let posting_downloader = posting_ranges_to_load_stream - .map(|posting_slice| { - self.postings_file_slice - .read_bytes_slice_async(posting_slice) - .map(|result| result.map(|_slice| ())) - }) - .buffer_unordered(5) - .try_collect::>(); - - let (_, slices_downloaded) = - futures_util::future::try_join(task_handle, posting_downloader).await?; - - Ok(!slices_downloaded.is_empty()) - } - - /// Warmup the block postings for all terms. - /// This method is for an advanced usage only. - /// - /// If you know which terms to pre-load, prefer using [`Self::warm_postings`] or - /// [`Self::warm_postings`] instead. - pub async fn warm_postings_full(&self, with_positions: bool) -> io::Result<()> { - self.postings_file_slice.read_bytes_async().await?; - if with_positions { - self.positions_file_slice.read_bytes_async().await?; - } - Ok(()) - } - - /// Returns the number of documents containing the term asynchronously. - pub async fn doc_freq_async(&self, term: &Term) -> io::Result { - Ok(self - .get_term_info_async(term) - .await? - .map(|term_info| term_info.doc_freq) - .unwrap_or(0u32)) - } -} diff --git a/src/query/phrase_prefix_query/phrase_prefix_weight.rs b/src/query/phrase_prefix_query/phrase_prefix_weight.rs index 546eb89e8e..21569c41b7 100644 --- a/src/query/phrase_prefix_query/phrase_prefix_weight.rs +++ b/src/query/phrase_prefix_query/phrase_prefix_weight.rs @@ -73,16 +73,6 @@ impl PhrasePrefixWeight { stream = stream.lt(&end); } - #[cfg(feature = "quickwit")] - { - // We don't have this on the fst, hence we end up needing a feature flag. - // - // This is not a problem however as we enforce the limit below too. - // The point of `stream.limit` is to limit the number of term dictionary - // blocks being downloaded. - stream = stream.limit(self.max_expansions as u64); - } - let mut stream = stream.into_stream()?; let mut suffixes = Vec::with_capacity(self.max_expansions as usize); diff --git a/src/query/range_query/range_query.rs b/src/query/range_query/range_query.rs index 5035c43f17..f38f4b18b4 100644 --- a/src/query/range_query/range_query.rs +++ b/src/query/range_query/range_query.rs @@ -174,7 +174,6 @@ pub struct InvertedIndexRangeWeight { impl InvertedIndexRangeWeight { /// Creates a new RangeWeight /// - /// Note: The limit is only enabled with the quickwit feature flag. pub fn new( field: Field, lower_bound: &Bound, @@ -203,10 +202,6 @@ impl InvertedIndexRangeWeight { Excluded(ref term_val) => term_stream_builder.lt(term_val), Unbounded => term_stream_builder, }; - #[cfg(feature = "quickwit")] - if let Some(limit) = self.limit { - term_stream_builder = term_stream_builder.limit(limit); - } term_stream_builder.into_stream() } } diff --git a/src/store/reader.rs b/src/store/reader.rs index fb15339885..7aa0c4aa13 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -19,8 +19,6 @@ use crate::schema::document::{BinaryDocumentDeserializer, DocumentDeserialize}; use crate::space_usage::StoreSpaceUsage; use crate::store::index::Checkpoint; use crate::DocId; -#[cfg(feature = "quickwit")] -use crate::Executor; pub(crate) const DOCSTORE_CACHE_CAPACITY: usize = 100; @@ -376,68 +374,6 @@ fn block_read_index(block: &[u8], doc_pos: u32) -> crate::Result> { Ok(start_offset..end_offset) } -#[cfg(feature = "quickwit")] -impl StoreReader { - /// Advanced API. - /// - /// In most cases use [`get_async`](Self::get_async) - /// - /// Loads and decompresses a block asynchronously. - async fn read_block_async( - &self, - checkpoint: &Checkpoint, - executor: &Executor, - ) -> io::Result { - let cache_key = checkpoint.byte_range.start; - if let Some(block) = self.cache.get_from_cache(checkpoint.byte_range.start) { - return Ok(block); - } - - let compressed_block = self - .data - .slice(checkpoint.byte_range.clone()) - .read_bytes_async() - .await?; - - let decompressor = self.decompressor; - let maybe_decompressed_block = executor - .spawn_blocking(move || decompressor.decompress(compressed_block.as_ref())) - .await - .expect("decompression panicked"); - let decompressed_block = OwnedBytes::new(maybe_decompressed_block?); - - self.cache - .put_into_cache(cache_key, decompressed_block.clone()); - - Ok(decompressed_block) - } - - /// Reads raw bytes of a given document asynchronously. - pub async fn get_document_bytes_async( - &self, - doc_id: DocId, - executor: &Executor, - ) -> crate::Result { - let checkpoint = self.block_checkpoint(doc_id)?; - let block = self.read_block_async(&checkpoint, executor).await?; - Self::get_document_bytes_from_block(block, doc_id, &checkpoint) - } - - /// Fetches a document asynchronously. Async version of [`get`](Self::get). - pub async fn get_async( - &self, - doc_id: DocId, - executor: &Executor, - ) -> crate::Result { - let mut doc_bytes = self.get_document_bytes_async(doc_id, executor).await?; - - let deserializer = - BinaryDocumentDeserializer::from_reader(&mut doc_bytes, self.doc_store_version) - .map_err(crate::TantivyError::from)?; - D::deserialize(deserializer).map_err(crate::TantivyError::from) - } -} - #[cfg(test)] mod tests { use std::path::Path; diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 17d34bd481..e1f07c53d5 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -18,16 +18,9 @@ //! //! A second datastructure makes it possible to access a [`TermInfo`]. -#[cfg(not(feature = "quickwit"))] mod fst_termdict; -#[cfg(not(feature = "quickwit"))] use fst_termdict as termdict; -#[cfg(feature = "quickwit")] -mod sstable_termdict; -#[cfg(feature = "quickwit")] -use sstable_termdict as termdict; - #[cfg(test)] mod tests; @@ -66,12 +59,8 @@ impl TryFrom for DictionaryType { } } -#[cfg(not(feature = "quickwit"))] const CURRENT_TYPE: DictionaryType = DictionaryType::Fst; -#[cfg(feature = "quickwit")] -const CURRENT_TYPE: DictionaryType = DictionaryType::SSTable; - // TODO in the future this should become an enum of supported dictionaries /// A TermDictionary wrapping either an FST based dictionary or a SSTable based one. #[derive(Clone)] @@ -157,29 +146,6 @@ impl TermDictionary { { self.0.search(automaton) } - - #[cfg(feature = "quickwit")] - /// Lookups the value corresponding to the key. - pub async fn get_async>(&self, key: K) -> io::Result> { - self.0.get_async(key).await - } - - #[cfg(feature = "quickwit")] - #[doc(hidden)] - pub async fn warm_up_dictionary(&self) -> io::Result<()> { - self.0.warm_up_dictionary().await - } - - #[cfg(feature = "quickwit")] - /// Returns a file slice covering a set of sstable blocks - /// that includes the key range passed in arguments. - pub fn file_slice_for_range( - &self, - key_range: impl std::ops::RangeBounds<[u8]>, - limit: Option, - ) -> FileSlice { - self.0.file_slice_for_range(key_range, limit) - } } /// A TermDictionaryBuilder wrapping either an FST or a SSTable dictionary builder. diff --git a/src/termdict/sstable_termdict/merger.rs b/src/termdict/sstable_termdict/merger.rs deleted file mode 100644 index aec10454f0..0000000000 --- a/src/termdict/sstable_termdict/merger.rs +++ /dev/null @@ -1,110 +0,0 @@ -use std::cmp::Ordering; -use std::collections::BinaryHeap; - -use crate::postings::TermInfo; -use crate::termdict::TermStreamer; - -pub struct HeapItem<'a> { - pub streamer: TermStreamer<'a>, - pub segment_ord: usize, -} - -impl<'a> PartialEq for HeapItem<'a> { - fn eq(&self, other: &Self) -> bool { - self.segment_ord == other.segment_ord - } -} - -impl<'a> Eq for HeapItem<'a> {} - -impl<'a> PartialOrd for HeapItem<'a> { - fn partial_cmp(&self, other: &HeapItem<'a>) -> Option { - Some(self.cmp(other)) - } -} - -impl<'a> Ord for HeapItem<'a> { - fn cmp(&self, other: &HeapItem<'a>) -> Ordering { - (&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord)) - } -} - -/// Given a list of sorted term streams, -/// returns an iterator over sorted unique terms. -/// -/// The item yield is actually a pair with -/// - the term -/// - a slice with the ordinal of the segments containing the terms. -pub struct TermMerger<'a> { - heap: BinaryHeap>, - current_streamers: Vec>, -} - -impl<'a> TermMerger<'a> { - /// Stream of merged term dictionary - pub fn new(streams: Vec>) -> TermMerger<'a> { - TermMerger { - heap: BinaryHeap::new(), - current_streamers: streams - .into_iter() - .enumerate() - .map(|(ord, streamer)| HeapItem { - streamer, - segment_ord: ord, - }) - .collect(), - } - } - - fn advance_segments(&mut self) { - let streamers = &mut self.current_streamers; - let heap = &mut self.heap; - for mut heap_item in streamers.drain(..) { - if heap_item.streamer.advance() { - heap.push(heap_item); - } - } - } - - /// Advance the term iterator to the next term. - /// Returns true if there is indeed another term - /// False if there is none. - pub fn advance(&mut self) -> bool { - self.advance_segments(); - let Some(head) = self.heap.pop() else { - return false; - }; - self.current_streamers.push(head); - while let Some(next_streamer) = self.heap.peek() { - if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() { - break; - } - let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand - self.current_streamers.push(next_heap_it); - } - true - } - - /// Returns the current term. - /// - /// This method may be called - /// if and only if advance() has been called before - /// and "true" was returned. - pub fn key(&self) -> &[u8] { - self.current_streamers[0].streamer.key() - } - - /// Returns the sorted list of segment ordinals - /// that include the current term. - /// - /// This method may be called - /// if and only if advance() has been called before - /// and "true" was returned. - pub fn current_segment_ords_and_term_infos<'b: 'a>( - &'b self, - ) -> impl 'b + Iterator { - self.current_streamers - .iter() - .map(|heap_item| (heap_item.segment_ord, heap_item.streamer.value().clone())) - } -} diff --git a/src/termdict/sstable_termdict/mod.rs b/src/termdict/sstable_termdict/mod.rs deleted file mode 100644 index cc82eba3cd..0000000000 --- a/src/termdict/sstable_termdict/mod.rs +++ /dev/null @@ -1,151 +0,0 @@ -use std::io; - -mod merger; - -use std::iter::ExactSizeIterator; - -use common::VInt; -use sstable::value::{ValueReader, ValueWriter}; -use sstable::SSTable; -use tantivy_fst::automaton::AlwaysMatch; - -pub use self::merger::TermMerger; -use crate::postings::TermInfo; - -/// The term dictionary contains all of the terms in -/// `tantivy index` in a sorted manner. -/// -/// The `Fst` crate is used to associate terms to their -/// respective `TermOrdinal`. The `TermInfoStore` then makes it -/// possible to fetch the associated `TermInfo`. -pub type TermDictionary = sstable::Dictionary; - -/// Builder for the new term dictionary. -pub type TermDictionaryBuilder = sstable::Writer; - -/// `TermStreamer` acts as a cursor over a range of terms of a segment. -/// Terms are guaranteed to be sorted. -pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable, A>; - -/// SSTable used to store TermInfo objects. -#[derive(Clone)] -pub struct TermSSTable; - -pub type TermStreamerBuilder<'a, A = AlwaysMatch> = sstable::StreamerBuilder<'a, TermSSTable, A>; - -impl SSTable for TermSSTable { - type Value = TermInfo; - type ValueReader = TermInfoValueReader; - type ValueWriter = TermInfoValueWriter; -} - -#[derive(Default)] -pub struct TermInfoValueReader { - term_infos: Vec, -} - -impl ValueReader for TermInfoValueReader { - type Value = TermInfo; - - #[inline(always)] - fn value(&self, idx: usize) -> &TermInfo { - &self.term_infos[idx] - } - - fn load(&mut self, mut data: &[u8]) -> io::Result { - let len_before = data.len(); - self.term_infos.clear(); - let num_els = VInt::deserialize_u64(&mut data)?; - let mut postings_start = VInt::deserialize_u64(&mut data)? as usize; - let mut positions_start = VInt::deserialize_u64(&mut data)? as usize; - for _ in 0..num_els { - let doc_freq = VInt::deserialize_u64(&mut data)? as u32; - let postings_num_bytes = VInt::deserialize_u64(&mut data)?; - let positions_num_bytes = VInt::deserialize_u64(&mut data)?; - let postings_end = postings_start + postings_num_bytes as usize; - let positions_end = positions_start + positions_num_bytes as usize; - let term_info = TermInfo { - doc_freq, - postings_range: postings_start..postings_end, - positions_range: positions_start..positions_end, - }; - self.term_infos.push(term_info); - postings_start = postings_end; - positions_start = positions_end; - } - let consumed_len = len_before - data.len(); - Ok(consumed_len) - } -} - -#[derive(Default)] -pub struct TermInfoValueWriter { - term_infos: Vec, -} - -impl ValueWriter for TermInfoValueWriter { - type Value = TermInfo; - - fn write(&mut self, term_info: &TermInfo) { - self.term_infos.push(term_info.clone()); - } - - fn serialize_block(&self, buffer: &mut Vec) { - VInt(self.term_infos.len() as u64).serialize_into_vec(buffer); - if self.term_infos.is_empty() { - return; - } - VInt(self.term_infos[0].postings_range.start as u64).serialize_into_vec(buffer); - VInt(self.term_infos[0].positions_range.start as u64).serialize_into_vec(buffer); - for term_info in &self.term_infos { - VInt(term_info.doc_freq as u64).serialize_into_vec(buffer); - VInt(term_info.postings_range.len() as u64).serialize_into_vec(buffer); - VInt(term_info.positions_range.len() as u64).serialize_into_vec(buffer); - } - } - - fn clear(&mut self) { - self.term_infos.clear(); - } -} - -#[cfg(test)] -mod tests { - use sstable::value::{ValueReader, ValueWriter}; - - use crate::postings::TermInfo; - use crate::termdict::sstable_termdict::TermInfoValueReader; - - #[test] - fn test_block_terminfos() { - let mut term_info_writer = super::TermInfoValueWriter::default(); - term_info_writer.write(&TermInfo { - doc_freq: 120u32, - postings_range: 17..45, - positions_range: 10..122, - }); - term_info_writer.write(&TermInfo { - doc_freq: 10u32, - postings_range: 45..450, - positions_range: 122..1100, - }); - term_info_writer.write(&TermInfo { - doc_freq: 17u32, - postings_range: 450..462, - positions_range: 1100..1302, - }); - let mut buffer = Vec::new(); - term_info_writer.serialize_block(&mut buffer); - let mut term_info_reader = TermInfoValueReader::default(); - let num_bytes: usize = term_info_reader.load(&buffer[..]).unwrap(); - assert_eq!( - term_info_reader.value(0), - &TermInfo { - doc_freq: 120u32, - postings_range: 17..45, - positions_range: 10..122 - } - ); - assert_eq!(buffer.len(), num_bytes); - } -} diff --git a/src/termdict/tests.rs b/src/termdict/tests.rs index 71b3f1c3ec..778aa7f786 100644 --- a/src/termdict/tests.rs +++ b/src/termdict/tests.rs @@ -302,7 +302,6 @@ fn test_stream_range_boundaries_forward() -> crate::Result<()> { Ok(()) } -#[cfg(not(feature = "quickwit"))] #[test] fn test_stream_range_boundaries_backward() -> crate::Result<()> { let term_dictionary = stream_range_test_dict()?;