Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ have been removed to keep the changelog focused on Yeehaw's history.
- remove unused `std::iter` imports from test modules.
- expand documentation for document deserialization traits.
- reorder inventory tasks to prioritize fixing doctest regressions.
- remove `quickwit` feature and associated asynchronous APIs.
5 changes: 0 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,13 @@ arc-swap = "1.5.0"
bon = "3.3.1"

columnar = { version = "0.5", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.5", path = "./sstable", package = "tantivy-sstable", optional = true }
stacker = { version = "0.5", path = "./stacker", package = "tantivy-stacker" }
query-grammar = { version = "0.1.0", path = "./query-grammar", package = "tantivy-query-grammar" }
tantivy-bitpacker = { version = "0.8", path = "./bitpacker" }
common = { version = "0.9", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.5", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
futures-util = { version = "0.3.28", optional = true }
futures-channel = { version = "0.3.28", optional = true }
fnv = "1.0.7"

[target.'cfg(windows)'.dependencies]
Expand Down Expand Up @@ -125,8 +122,6 @@ columnar-zstd-compression = ["columnar/zstd-compression"]
failpoints = ["fail", "fail/failpoints"]
unstable = [] # useful for benches.

quickwit = ["sstable", "futures-util", "futures-channel"]

# Compares only the hash of a string when indexing data.
# Increases indexing speed, but may lead to extremely rare missing terms, when there's a hash collision.
# Uses 64bit ahash.
Expand Down
3 changes: 0 additions & 3 deletions src/compat_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ fn path_for_version(version: &str) -> String {

/// feature flag quickwit uses a different dictionary type
#[test]
#[cfg(not(feature = "quickwit"))]
fn test_format_6() {
let path = path_for_version("6");

Expand All @@ -46,7 +45,6 @@ fn test_format_6() {

/// feature flag quickwit uses a different dictionary type
#[test]
#[cfg(not(feature = "quickwit"))]
fn test_format_7() {
let path = path_for_version("7");

Expand All @@ -55,7 +53,6 @@ fn test_format_7() {
assert_date_time_precision(&index, DateTimePrecision::Nanoseconds);
}

#[cfg(not(feature = "quickwit"))]
fn assert_date_time_precision(index: &Index, doc_store_precision: DateTimePrecision) {
use collector::TopDocs;
let reader = index.reader().expect("Failed to create reader");
Expand Down
94 changes: 0 additions & 94 deletions src/core/executor.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use std::sync::Arc;

#[cfg(feature = "quickwit")]
use futures_util::{future::Either, FutureExt};

use crate::TantivyError;

/// Executor makes it possible to run tasks in single thread or
Expand All @@ -15,13 +12,6 @@ pub enum Executor {
ThreadPool(Arc<rayon::ThreadPool>),
}

#[cfg(feature = "quickwit")]
impl From<Arc<rayon::ThreadPool>> for Executor {
fn from(thread_pool: Arc<rayon::ThreadPool>) -> Self {
Executor::ThreadPool(thread_pool)
}
}

impl Executor {
/// Creates an Executor that performs all task in the caller thread.
pub fn single_thread() -> Executor {
Expand Down Expand Up @@ -92,32 +82,6 @@ impl Executor {
}
}
}

/// Spawn a task on the pool, returning a future completing on task success.
///
/// If the task panics, returns `Err(())`.
#[cfg(feature = "quickwit")]
pub fn spawn_blocking<T: Send + 'static>(
&self,
cpu_intensive_task: impl FnOnce() -> T + Send + 'static,
) -> impl std::future::Future<Output = Result<T, ()>> {
match self {
Executor::SingleThread => Either::Left(std::future::ready(Ok(cpu_intensive_task()))),
Executor::ThreadPool(pool) => {
let (sender, receiver) = oneshot::channel();
pool.spawn(|| {
if sender.is_closed() {
return;
}
let task_result = cpu_intensive_task();
let _ = sender.send(task_result);
});

let res = receiver.map(|res| res.map_err(|_| ()));
Either::Right(res)
}
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -173,62 +137,4 @@ mod tests {
assert_eq!(result[i], i * 2);
}
}

#[cfg(feature = "quickwit")]
#[test]
fn test_cancel_cpu_intensive_tasks() {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

let counter: Arc<AtomicU64> = Default::default();

let other_counter: Arc<AtomicU64> = Default::default();

let mut futures = Vec::new();
let mut other_futures = Vec::new();

let (tx, rx) = crossbeam_channel::bounded::<()>(0);
let rx = Arc::new(rx);
let executor = Executor::multi_thread(3, "search-test").unwrap();
for _ in 0..1000 {
let counter_clone: Arc<AtomicU64> = counter.clone();
let other_counter_clone: Arc<AtomicU64> = other_counter.clone();

let rx_clone = rx.clone();
let rx_clone2 = rx.clone();
let fut = executor.spawn_blocking(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
let _ = rx_clone.recv();
});
futures.push(fut);
let other_fut = executor.spawn_blocking(move || {
other_counter_clone.fetch_add(1, Ordering::SeqCst);
let _ = rx_clone2.recv();
});
other_futures.push(other_fut);
}

// We execute 100 futures.
for _ in 0..100 {
tx.send(()).unwrap();
}

let counter_val = counter.load(Ordering::SeqCst);
let other_counter_val = other_counter.load(Ordering::SeqCst);
assert!(counter_val >= 30);
assert!(other_counter_val >= 30);

drop(other_futures);

// We execute 100 futures.
for _ in 0..100 {
tx.send(()).unwrap();
}

let counter_val2 = counter.load(Ordering::SeqCst);
assert!(counter_val2 >= counter_val + 100 - 6);

let other_counter_val2 = other_counter.load(Ordering::SeqCst);
assert!(other_counter_val2 <= other_counter_val + 6);
}
}
24 changes: 0 additions & 24 deletions src/core/searcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,6 @@ impl Searcher {
cache_stats
}

/// Fetches a document in an asynchronous manner.
#[cfg(feature = "quickwit")]
pub async fn doc_async<D: DocumentDeserialize>(
&self,
doc_address: DocAddress,
) -> crate::Result<D> {
let executor = self.inner.index.search_executor();
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
store_reader.get_async(doc_address.doc_id, executor).await
}

/// Access the schema associated with the index of this searcher.
pub fn schema(&self) -> &Schema {
&self.inner.schema
Expand All @@ -140,19 +129,6 @@ impl Searcher {
Ok(total_doc_freq)
}

/// Return the overall number of documents containing
/// the given term in an asynchronous manner.
#[cfg(feature = "quickwit")]
pub async fn doc_freq_async(&self, term: &Term) -> crate::Result<u64> {
let mut total_doc_freq = 0;
for segment_reader in &self.inner.segment_readers {
let inverted_index = segment_reader.inverted_index(term.field())?;
let doc_freq = inverted_index.doc_freq_async(term).await?;
total_doc_freq += u64::from(doc_freq);
}
Ok(total_doc_freq)
}

/// Return the list of segment readers
pub fn segment_readers(&self) -> &[SegmentReader] {
&self.inner.segment_readers
Expand Down
6 changes: 1 addition & 5 deletions src/fastfield/readers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ impl FastFieldReaders {
}

fn resolve_field(&self, column_name: &str) -> crate::Result<Option<String>> {
let default_field_opt: Option<Field> = if cfg!(feature = "quickwit") {
self.schema.get_field("_dynamic").ok()
} else {
None
};
let default_field_opt: Option<Field> = None;
self.resolve_column_name_given_default_field(column_name, default_field_opt)
}

Expand Down
Loading
Loading