From daf98e9fd313220cb1d840520703b5989e76e2b9 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Fri, 21 Nov 2025 15:30:12 +0100 Subject: [PATCH] Add secondary search queue --- .../quickwit-config/src/node_config/mod.rs | 14 ++++ .../src/node_config/serialize.rs | 7 ++ quickwit/quickwit-search/src/fetch_docs.rs | 1 + quickwit/quickwit-search/src/leaf.rs | 44 ++++++++---- quickwit/quickwit-search/src/list_fields.rs | 9 ++- quickwit/quickwit-search/src/list_terms.rs | 4 +- quickwit/quickwit-search/src/metrics.rs | 70 ++++++++++++++---- .../quickwit-search/src/metrics_trackers.rs | 8 ++- .../src/search_permit_provider.rs | 32 ++++++--- quickwit/quickwit-search/src/service.rs | 24 ++++++- quickwit/quickwit-search/src/tests.rs | 1 + quickwit/quickwit-serve/src/lib.rs | 4 +- .../quickwit-storage/src/split_cache/mod.rs | 29 +++++--- .../src/split_cache/split_table.rs | 72 +++++++++---------- 14 files changed, 230 insertions(+), 89 deletions(-) diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index bb8a17daaeb..ce2f1a56437 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -283,6 +283,12 @@ pub struct SearcherConfig { pub storage_timeout_policy: Option, pub warmup_memory_budget: ByteSize, pub warmup_single_split_initial_allocation: ByteSize, + + pub secondary_max_num_concurrent_split_searches: usize, + pub secondary_warmup_memory_budget: ByteSize, + pub secondary_targeted_split_count_threshold: Option, + #[serde(default = "SearcherConfig::default_request_timeout_secs")] + secondary_request_timeout_secs: NonZeroU64, } /// Configuration controlling how fast a searcher should timeout a `get_slice` @@ -333,6 +339,11 @@ impl Default for SearcherConfig { storage_timeout_policy: None, warmup_memory_budget: ByteSize::gb(100), warmup_single_split_initial_allocation: ByteSize::gb(1), + + secondary_max_num_concurrent_split_searches: 50, + secondary_warmup_memory_budget: ByteSize::gb(50), + secondary_targeted_split_count_threshold: None, + secondary_request_timeout_secs: Self::default_request_timeout_secs(), } } } @@ -342,6 +353,9 @@ impl SearcherConfig { pub fn request_timeout(&self) -> Duration { Duration::from_secs(self.request_timeout_secs.get()) } + pub fn secondary_request_timeout(&self) -> Duration { + Duration::from_secs(self.secondary_request_timeout_secs.get()) + } fn default_request_timeout_secs() -> NonZeroU64 { NonZeroU64::new(30).unwrap() } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index b5f39ceb0ac..2f142430ea1 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -672,6 +672,13 @@ mod tests { }), warmup_memory_budget: ByteSize::gb(100), warmup_single_split_initial_allocation: ByteSize::gb(1), + + secondary_max_num_concurrent_split_searches: 50, + secondary_warmup_memory_budget: ByteSize::gb(50), + // Splits per leaf search above which the secondary queue is used. If not set, the + // secondary queue is never used. + secondary_targeted_split_count_threshold: None, + secondary_request_timeout_secs: NonZeroU64::new(30).unwrap(), } ); assert_eq!( diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index d3dd0d3082d..25c7bb8552d 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -175,6 +175,7 @@ async fn fetch_docs_in_split( split, Some(doc_mapper.tokenizer_manager()), None, + false, ) .await .context("open-index-for-split")?; diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index f8aa60cf98f..487c0f2d55f 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -45,7 +45,7 @@ use tokio::task::JoinError; use tracing::*; use crate::collector::{IncrementalCollector, make_collector_for_split, make_merge_collector}; -use crate::metrics::SplitSearchOutcomeCounters; +use crate::metrics::{SplitSearchOutcomeCounters, queue_label}; use crate::root::is_metadata_count_request_with_ast; use crate::search_permit_provider::{SearchPermit, compute_initial_memory_allocation}; use crate::service::{SearcherContext, deserialize_doc_mapper}; @@ -92,6 +92,7 @@ pub(crate) async fn open_split_bundle( searcher_context: &SearcherContext, index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, + split_cache_read_only: bool, ) -> anyhow::Result<(FileSlice, BundleStorage)> { let split_file = PathBuf::from(format!("{}.split", split_and_footer_offsets.split_id)); let footer_data = get_split_footer_from_cache_or_fetch( @@ -105,7 +106,11 @@ pub(crate) async fn open_split_bundle( // This is before the bundle storage: at this point, this storage is reading `.split` files. let index_storage_with_split_cache = if let Some(split_cache) = searcher_context.split_cache_opt.as_ref() { - SplitCache::wrap_storage(split_cache.clone(), index_storage.clone()) + SplitCache::wrap_storage( + split_cache.clone(), + index_storage.clone(), + split_cache_read_only, + ) } else { index_storage.clone() }; @@ -148,6 +153,7 @@ pub(crate) async fn open_index_with_caches( split_and_footer_offsets: &SplitIdAndFooterOffsets, tokenizer_manager: Option<&TokenizerManager>, ephemeral_unbounded_cache: Option, + split_cache_read_only: bool, ) -> anyhow::Result<(Index, HotDirectory)> { let index_storage_with_retry_on_timeout = configure_storage_retries(searcher_context, index_storage); @@ -156,6 +162,7 @@ pub(crate) async fn open_index_with_caches( searcher_context, index_storage_with_retry_on_timeout, split_and_footer_offsets, + split_cache_read_only, ) .await?; @@ -438,10 +445,10 @@ async fn leaf_search_single_split( split: SplitIdAndFooterOffsets, aggregations_limits: AggregationLimitsGuard, search_permit: &mut SearchPermit, + is_broad_search: bool, ) -> crate::Result> { let mut leaf_search_state_guard = SplitSearchStateGuard::new(ctx.split_outcome_counters.clone()); - rewrite_request( &mut search_request, &split, @@ -477,6 +484,8 @@ async fn leaf_search_single_split( &split, Some(ctx.doc_mapper.tokenizer_manager()), Some(byte_range_cache.clone()), + // for broad searches, we want to avoid evicting useful cached splits + is_broad_search, ) .await?; @@ -1182,6 +1191,7 @@ pub async fn multi_index_leaf_search( searcher_context: Arc, leaf_search_request: LeafSearchRequest, storage_resolver: &StorageResolver, + is_broad_search: bool, ) -> Result { let search_request: Arc = leaf_search_request .search_request @@ -1240,6 +1250,7 @@ pub async fn multi_index_leaf_search( leaf_search_request_ref.split_offsets, doc_mapper, aggregation_limits, + is_broad_search, ) .await } @@ -1248,11 +1259,13 @@ pub async fn multi_index_leaf_search( leaf_request_tasks.push(leaf_request_future); } - let leaf_responses: Vec> = tokio::time::timeout( - searcher_context.searcher_config.request_timeout(), - try_join_all(leaf_request_tasks), - ) - .await??; + let timeout = if is_broad_search { + searcher_context.searcher_config.secondary_request_timeout() + } else { + searcher_context.searcher_config.request_timeout() + }; + let leaf_responses: Vec> = + tokio::time::timeout(timeout, try_join_all(leaf_request_tasks)).await??; let merge_collector = make_merge_collector(&search_request, aggregation_limits)?; let mut incremental_merge_collector = IncrementalCollector::new(merge_collector); for result in leaf_responses { @@ -1339,6 +1352,7 @@ pub async fn single_doc_mapping_leaf_search( splits: Vec, doc_mapper: Arc, aggregations_limits: AggregationLimitsGuard, + is_broad_search: bool, ) -> Result { let num_docs: u64 = splits.iter().map(|split| split.num_docs).sum(); let num_splits = splits.len(); @@ -1366,10 +1380,12 @@ pub async fn single_doc_mapping_leaf_search( .warmup_single_split_initial_allocation, ) }); - let permit_futures = searcher_context - .search_permit_provider - .get_permits(permit_sizes) - .await; + let permit_provider = if is_broad_search { + &searcher_context.secondary_search_permit_provider + } else { + &searcher_context.search_permit_provider + }; + let permit_futures = permit_provider.get_permits(permit_sizes).await; let leaf_search_context = Arc::new(LeafSearchContext { searcher_context: searcher_context.clone(), @@ -1405,6 +1421,7 @@ pub async fn single_doc_mapping_leaf_search( split, leaf_split_search_permit, aggregations_limits.clone(), + is_broad_search, ) .in_current_span(), ), @@ -1528,9 +1545,11 @@ async fn leaf_search_single_split_wrapper( split: SplitIdAndFooterOffsets, mut search_permit: SearchPermit, aggregations_limits: AggregationLimitsGuard, + is_broad_search: bool, ) { let timer = crate::SEARCH_METRICS .leaf_search_split_duration_secs + .with_label_values([queue_label(is_broad_search)]) .start_timer(); let leaf_search_single_split_opt_res: crate::Result> = leaf_search_single_split( @@ -1540,6 +1559,7 @@ async fn leaf_search_single_split_wrapper( split.clone(), aggregations_limits, &mut search_permit, + is_broad_search, ) .await; diff --git a/quickwit/quickwit-search/src/list_fields.rs b/quickwit/quickwit-search/src/list_fields.rs index f4cf173fe08..b9eec1696b5 100644 --- a/quickwit/quickwit-search/src/list_fields.rs +++ b/quickwit/quickwit-search/src/list_fields.rs @@ -67,8 +67,13 @@ async fn get_fields_from_split( { return Ok(list_fields.fields); } - let (_, split_bundle) = - open_split_bundle(searcher_context, index_storage, split_and_footer_offsets).await?; + let (_, split_bundle) = open_split_bundle( + searcher_context, + index_storage, + split_and_footer_offsets, + false, + ) + .await?; let serialized_split_fields = split_bundle .get_all(Path::new(SPLIT_FIELDS_FILE_NAME)) diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 45dd1256f36..ead949fa623 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -34,6 +34,7 @@ use tantivy::{ReloadPolicy, Term}; use tracing::{debug, error, info, instrument}; use crate::leaf::open_index_with_caches; +use crate::metrics::queue_label; use crate::search_job_placer::group_jobs_by_index_id; use crate::search_permit_provider::compute_initial_memory_allocation; use crate::{ClusterClient, SearchError, SearchJob, SearcherContext, resolve_index_patterns}; @@ -215,7 +216,7 @@ async fn leaf_list_terms_single_split( let cache = ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); let (index, _) = - open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?; + open_index_with_caches(searcher_context, storage, &split, None, Some(cache), false).await?; let split_schema = index.schema(); let reader = index .reader_builder() @@ -348,6 +349,7 @@ pub async fn leaf_list_terms( crate::SEARCH_METRICS.leaf_list_terms_splits_total.inc(); let timer = crate::SEARCH_METRICS .leaf_search_split_duration_secs + .with_label_values([queue_label(false)]) .start_timer(); let leaf_search_single_split_res = leaf_list_terms_single_split( &searcher_context_clone, diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index 9ed11d7ecbb..db4083a7eed 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -108,22 +108,24 @@ pub struct SearchMetrics { pub root_search_requests_total: IntCounterVec<1>, pub root_search_request_duration_seconds: HistogramVec<1>, pub root_search_targeted_splits: HistogramVec<1>, - pub leaf_search_requests_total: IntCounterVec<1>, - pub leaf_search_request_duration_seconds: HistogramVec<1>, - pub leaf_search_targeted_splits: HistogramVec<1>, + pub leaf_search_requests_total: IntCounterVec<2>, + pub leaf_search_request_duration_seconds: HistogramVec<2>, + pub leaf_search_targeted_splits: HistogramVec<2>, pub leaf_list_terms_splits_total: IntCounter, pub split_search_outcome_total: SplitSearchOutcomeCounters, - pub leaf_search_split_duration_secs: Histogram, + pub leaf_search_split_duration_secs: HistogramVec<1>, pub job_assigned_total: IntCounterVec<1>, pub leaf_search_single_split_tasks_pending: IntGauge, pub leaf_search_single_split_tasks_ongoing: IntGauge, + pub leaf_search_single_split_secondary_tasks_pending: IntGauge, + pub leaf_search_single_split_secondary_tasks_ongoing: IntGauge, pub leaf_search_single_split_warmup_num_bytes: Histogram, pub searcher_local_kv_store_size_bytes: IntGauge, } /// From 0.008s to 131.072s fn duration_buckets() -> Vec { - exponential_buckets(0.008, 2.0, 15).unwrap() + exponential_buckets(0.064, 2.0, 15).unwrap() } impl Default for SearchMetrics { @@ -151,12 +153,15 @@ impl Default for SearchMetrics { ByteSize::gb(5).as_u64() as f64, ]; - let leaf_search_single_split_tasks = new_gauge_vec::<1>( + let leaf_search_single_split_tasks = new_gauge_vec::<2>( "leaf_search_single_split_tasks", "Number of single split search tasks pending or ongoing", "search", &[], - ["status"], // takes values "ongoing" or "pending" + [ + "status", // "ongoing" or "pending" + "queue", // "primary" or "secondary" + ], ); SearchMetrics { @@ -188,14 +193,14 @@ impl Default for SearchMetrics { "Total number of leaf search gRPC requests processed.", "search", &[("kind", "server")], - ["status"], + ["status", "queue"], ), leaf_search_request_duration_seconds: new_histogram_vec( "leaf_search_request_duration_seconds", "Duration of leaf search gRPC requests in seconds.", "search", &[("kind", "server")], - ["status"], + ["status", "queue"], duration_buckets(), ), leaf_search_targeted_splits: new_histogram_vec( @@ -203,7 +208,7 @@ impl Default for SearchMetrics { "Number of splits targeted per leaf search GRPC request.", "search", &[], - ["status"], + ["status", "queue"], targeted_splits_buckets, ), @@ -214,18 +219,24 @@ impl Default for SearchMetrics { &[], ), split_search_outcome_total: SplitSearchOutcomeCounters::new_registered(), - - leaf_search_split_duration_secs: new_histogram( + leaf_search_split_duration_secs: new_histogram_vec( "leaf_search_split_duration_secs", "Number of seconds required to run a leaf search over a single split. The timer \ starts after the semaphore is obtained.", "search", + &[], + ["queue"], duration_buckets(), ), + // we need to expose the gauges here to provide a static ref to for the gauge guards leaf_search_single_split_tasks_ongoing: leaf_search_single_split_tasks - .with_label_values(["ongoing"]), + .with_label_values(["ongoing", "primary"]), leaf_search_single_split_tasks_pending: leaf_search_single_split_tasks - .with_label_values(["pending"]), + .with_label_values(["pending", "primary"]), + leaf_search_single_split_secondary_tasks_ongoing: leaf_search_single_split_tasks + .with_label_values(["ongoing", "secondary"]), + leaf_search_single_split_secondary_tasks_pending: leaf_search_single_split_tasks + .with_label_values(["pending", "secondary"]), leaf_search_single_split_warmup_num_bytes: new_histogram( "leaf_search_single_split_warmup_num_bytes", "Size of the short lived cache for a single split once the warmup is done.", @@ -250,6 +261,37 @@ impl Default for SearchMetrics { } } +pub fn queue_label(is_broad_search: bool) -> &'static str { + if is_broad_search { + "secondary" + } else { + "primary" + } +} + +/// Metrics for the various instances of permit providers. +#[derive(Clone)] +pub struct SearchTaskMetrics { + pub ongoing_tasks: &'static IntGauge, + pub pending_tasks: &'static IntGauge, +} + +impl SearchMetrics { + pub fn search_task_metrics(&'static self) -> SearchTaskMetrics { + SearchTaskMetrics { + ongoing_tasks: &self.leaf_search_single_split_tasks_ongoing, + pending_tasks: &self.leaf_search_single_split_tasks_pending, + } + } + + pub fn secondary_search_task_metrics(&'static self) -> SearchTaskMetrics { + SearchTaskMetrics { + ongoing_tasks: &self.leaf_search_single_split_secondary_tasks_ongoing, + pending_tasks: &self.leaf_search_single_split_secondary_tasks_pending, + } + } +} + /// `SEARCH_METRICS` exposes a bunch a set of storage/cache related metrics through a prometheus /// endpoint. pub static SEARCH_METRICS: Lazy = Lazy::new(SearchMetrics::default); diff --git a/quickwit/quickwit-search/src/metrics_trackers.rs b/quickwit/quickwit-search/src/metrics_trackers.rs index 7f2f9fbbfb3..0bc8ccb3650 100644 --- a/quickwit/quickwit-search/src/metrics_trackers.rs +++ b/quickwit/quickwit-search/src/metrics_trackers.rs @@ -22,7 +22,7 @@ use pin_project::{pin_project, pinned_drop}; use quickwit_proto::search::LeafSearchResponse; use crate::SearchError; -use crate::metrics::SEARCH_METRICS; +use crate::metrics::{SEARCH_METRICS, queue_label}; // root @@ -110,6 +110,7 @@ where F: Future> pub start: Instant, pub targeted_splits: usize, pub status: Option<&'static str>, + pub is_broad_search: bool, } #[pinned_drop] @@ -117,7 +118,10 @@ impl PinnedDrop for LeafSearchMetricsFuture where F: Future> { fn drop(self: Pin<&mut Self>) { - let label_values = [self.status.unwrap_or("cancelled")]; + let label_values = [ + self.status.unwrap_or("cancelled"), + queue_label(self.is_broad_search), + ]; SEARCH_METRICS .leaf_search_requests_total .with_label_values(label_values) diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index 315962c6a5c..4bf523b04d5 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -24,6 +24,8 @@ use quickwit_proto::search::SplitIdAndFooterOffsets; use tokio::sync::watch; use tokio::sync::{mpsc, oneshot}; +use crate::metrics::SearchTaskMetrics; + /// Distributor of permits to perform split search operation. /// /// Requests are served in order. Each permit initially reserves a slot for the @@ -78,7 +80,11 @@ pub fn compute_initial_memory_allocation( } impl SearchPermitProvider { - pub fn new(num_download_slots: usize, memory_budget: ByteSize) -> Self { + pub fn new( + num_download_slots: usize, + memory_budget: ByteSize, + metrics: SearchTaskMetrics, + ) -> Self { let (message_sender, message_receiver) = mpsc::unbounded_channel(); #[cfg(test)] let (state_sender, state_receiver) = watch::channel(false); @@ -91,6 +97,7 @@ impl SearchPermitProvider { total_memory_allocated: 0u64, #[cfg(test)] stopped: state_sender, + metrics, }; tokio::spawn(actor.run()); Self { @@ -126,6 +133,7 @@ impl SearchPermitProvider { } struct SearchPermitActor { + metrics: SearchTaskMetrics, msg_receiver: mpsc::UnboundedReceiver, msg_sender: mpsc::WeakUnboundedSender, num_warmup_slots_available: usize, @@ -208,12 +216,11 @@ impl SearchPermitActor { } fn assign_available_permits(&mut self) { + let ongoing_tasks_metric = self.metrics.ongoing_tasks; while let Some((permit_requester_tx, next_permit_size)) = self.pop_next_request_if_serviceable() { - let mut ongoing_gauge_guard = GaugeGuard::from_gauge( - &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, - ); + let mut ongoing_gauge_guard = GaugeGuard::from_gauge(ongoing_tasks_metric); ongoing_gauge_guard.add(1); self.total_memory_allocated += next_permit_size; self.num_warmup_slots_available -= 1; @@ -228,8 +235,8 @@ impl SearchPermitActor { // created SearchPermit which releases the resources .ok(); } - crate::SEARCH_METRICS - .leaf_search_single_split_tasks_pending + self.metrics + .pending_tasks .set(self.permits_requests.len() as i64); } } @@ -311,10 +318,15 @@ mod tests { use tokio::task::JoinSet; use super::*; + use crate::metrics::SEARCH_METRICS; + + fn test_metrics() -> SearchTaskMetrics { + SEARCH_METRICS.search_task_metrics() + } #[tokio::test] async fn test_search_permit_order() { - let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100)); + let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100), test_metrics()); let mut all_futures = Vec::new(); let first_batch_of_permits = permit_provider .get_permits(repeat_n(ByteSize::mb(10), 10)) @@ -364,7 +376,7 @@ mod tests { #[tokio::test] async fn test_search_permit_early_drops() { - let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100)); + let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100), test_metrics()); let permit_fut1 = permit_provider .get_permits(vec![ByteSize::mb(10)]) .await @@ -405,7 +417,7 @@ mod tests { #[tokio::test] async fn test_memory_budget() { - let permit_provider = SearchPermitProvider::new(100, ByteSize::mb(100)); + let permit_provider = SearchPermitProvider::new(100, ByteSize::mb(100), test_metrics()); let mut permit_futs = permit_provider .get_permits(repeat_n(ByteSize::mb(10), 14)) .await; @@ -435,7 +447,7 @@ mod tests { #[tokio::test] async fn test_warmup_slot() { - let permit_provider = SearchPermitProvider::new(10, ByteSize::mb(100)); + let permit_provider = SearchPermitProvider::new(10, ByteSize::mb(100), test_metrics()); let mut permit_futs = permit_provider .get_permits(repeat_n(ByteSize::mb(1), 16)) .await; diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 88dba389c83..7c053cc42a5 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -38,6 +38,7 @@ use crate::leaf_cache::LeafSearchCache; use crate::list_fields::{leaf_list_fields, root_list_fields}; use crate::list_fields_cache::ListFieldsCache; use crate::list_terms::{leaf_list_terms, root_list_terms}; +use crate::metrics::SEARCH_METRICS; use crate::metrics_trackers::LeafSearchMetricsFuture; use crate::root::fetch_docs_phase; use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset}; @@ -186,15 +187,27 @@ impl SearchService for SearchServiceImpl { .map(|req| req.split_offsets.len()) .sum::(); + let is_broad_search = if let Some(threshold) = self + .searcher_context + .searcher_config + .secondary_targeted_split_count_threshold + { + num_splits >= threshold + } else { + false + }; + LeafSearchMetricsFuture { tracked: multi_index_leaf_search( self.searcher_context.clone(), leaf_search_request, &self.storage_resolver, + is_broad_search, ), start: Instant::now(), targeted_splits: num_splits, status: None, + is_broad_search, } .await } @@ -401,8 +414,10 @@ pub struct SearcherContext { pub searcher_config: SearcherConfig, /// Fast fields cache. pub fast_fields_cache: Arc, - /// Counting semaphore to limit concurrent leaf search split requests. + /// Limit the concurrency for small snappy interactive searches. pub search_permit_provider: SearchPermitProvider, + /// Limit the concurrency for larger searches that target many splits. + pub secondary_search_permit_provider: SearchPermitProvider, /// Split footer cache. pub split_footer_cache: MemorySizedCache, /// Recent sub-query cache. @@ -441,6 +456,12 @@ impl SearcherContext { let leaf_search_split_semaphore = SearchPermitProvider::new( searcher_config.max_num_concurrent_split_searches, searcher_config.warmup_memory_budget, + SEARCH_METRICS.search_task_metrics(), + ); + let secondary_leaf_search_split_semaphore = SearchPermitProvider::new( + searcher_config.secondary_max_num_concurrent_split_searches, + searcher_config.secondary_warmup_memory_budget, + SEARCH_METRICS.secondary_search_task_metrics(), ); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize; let storage_long_term_cache = Arc::new(QuickwitCache::new(fast_field_cache_capacity)); @@ -457,6 +478,7 @@ impl SearcherContext { searcher_config, fast_fields_cache: storage_long_term_cache, search_permit_provider: leaf_search_split_semaphore, + secondary_search_permit_provider: secondary_leaf_search_split_semaphore, split_footer_cache: global_split_footer_cache, leaf_search_cache, list_fields_cache, diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index 83d5abad8a1..e0847304236 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -1058,6 +1058,7 @@ async fn test_search_util(test_sandbox: &TestSandbox, query: &str) -> Vec { splits_offsets, test_sandbox.doc_mapper(), agg_limits, + false, ) .await .unwrap(); diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 29ba227c039..5e977a07db4 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -1026,7 +1026,6 @@ async fn setup_searcher( .await?; let search_service_clone = search_service.clone(); let max_message_size = node_config.grpc_config.max_message_size; - let request_timeout = node_config.searcher_config.request_timeout(); let searcher_change_stream = cluster_change_stream.filter_map(move |cluster_change| { let search_service_clone = search_service_clone.clone(); Box::pin(async move { @@ -1046,7 +1045,8 @@ async fn setup_searcher( SearchServiceClient::from_service(search_service_clone, grpc_addr); Some(Change::Insert(grpc_addr, search_client)) } else { - let timeout_channel = Timeout::new(node.channel(), request_timeout); + // Set a very high timeout just to be sure that we cleanup at some point. + let timeout_channel = Timeout::new(node.channel(), Duration::from_mins(30)); let search_client = create_search_client_from_channel( grpc_addr, timeout_channel, diff --git a/quickwit/quickwit-storage/src/split_cache/mod.rs b/quickwit/quickwit-storage/src/split_cache/mod.rs index 431cc471bf7..58f8716cda3 100644 --- a/quickwit/quickwit-storage/src/split_cache/mod.rs +++ b/quickwit/quickwit-storage/src/split_cache/mod.rs @@ -126,10 +126,15 @@ impl SplitCache { } /// Wraps a storage with our split cache. - pub fn wrap_storage(self_arc: Arc, storage: Arc) -> Arc { + pub fn wrap_storage( + self_arc: Arc, + storage: Arc, + read_only: bool, + ) -> Arc { let cache = Arc::new(SplitCacheBackingStorage { split_cache: self_arc, storage_root_uri: storage.uri().clone(), + read_only, }); wrap_storage_with_cache(cache, storage) } @@ -152,14 +157,19 @@ impl SplitCache { // Returns a split guard object. As long as it is not dropped, the // split won't be evinced from the cache. - async fn get_split_file(&self, split_id: Ulid, storage_uri: &Uri) -> Option { + async fn get_split_file( + &self, + split_id: Ulid, + storage_uri: &Uri, + read_only: bool, + ) -> Option { // We touch before even checking the fd cache in order to update the file's last access time // for the file cache. - let num_bytes_opt: Option = self - .split_table - .lock() - .unwrap() - .touch(split_id, storage_uri); + let num_bytes_opt: Option = + self.split_table + .lock() + .unwrap() + .touch(split_id, storage_uri, read_only); let num_bytes = num_bytes_opt?; self.fd_cache @@ -195,6 +205,7 @@ fn split_id_from_path(split_path: &Path) -> Option { struct SplitCacheBackingStorage { split_cache: Arc, storage_root_uri: Uri, + read_only: bool, } impl SplitCacheBackingStorage { @@ -202,7 +213,7 @@ impl SplitCacheBackingStorage { let split_id = split_id_from_path(path)?; let split_file: SplitFile = self .split_cache - .get_split_file(split_id, &self.storage_root_uri) + .get_split_file(split_id, &self.storage_root_uri, self.read_only) .await?; split_file.get_range(byte_range).await.ok() } @@ -211,7 +222,7 @@ impl SplitCacheBackingStorage { let split_id = split_id_from_path(path)?; let split_file = self .split_cache - .get_split_file(split_id, &self.storage_root_uri) + .get_split_file(split_id, &self.storage_root_uri, self.read_only) .await?; split_file.get_all().await.ok() } diff --git a/quickwit/quickwit-storage/src/split_cache/split_table.rs b/quickwit/quickwit-storage/src/split_cache/split_table.rs index 492359990fc..b9e5f2d8d2c 100644 --- a/quickwit/quickwit-storage/src/split_cache/split_table.rs +++ b/quickwit/quickwit-storage/src/split_cache/split_table.rs @@ -244,27 +244,28 @@ impl SplitTable { /// If the file is already on the disk cache, return `Some(num_bytes)`. /// If the file is not in cache, return `None`, and register the file in the candidate for /// download list. - pub fn touch(&mut self, split_ulid: Ulid, storage_uri: &Uri) -> Option { + pub fn touch(&mut self, split_ulid: Ulid, storage_uri: &Uri, read_only: bool) -> Option { let timestamp = compute_timestamp(self.origin_time); let status = self.mutate_split(split_ulid, |old_split_info| { if let Some(mut split_info) = old_split_info { split_info.split_key.last_accessed = timestamp; - split_info + Some(split_info) + } else if read_only { + None } else { - SplitInfo { - split_key: SplitKey { - split_ulid, - last_accessed: timestamp, - }, - status: Status::Candidate(CandidateSplit { - storage_uri: storage_uri.clone(), - split_ulid, - living_token: Arc::new(()), - }), - } + let split_key = SplitKey { + split_ulid, + last_accessed: timestamp, + }; + let status = Status::Candidate(CandidateSplit { + storage_uri: storage_uri.clone(), + split_ulid, + living_token: Arc::new(()), + }); + Some(SplitInfo { split_key, status }) } }); - if let Status::OnDisk { num_bytes } = status { + if let Some(Status::OnDisk { num_bytes }) = status { Some(num_bytes) } else { None @@ -278,13 +279,13 @@ impl SplitTable { fn mutate_split( &mut self, split_ulid: Ulid, - mutate_fn: impl FnOnce(Option) -> SplitInfo, - ) -> Status { + mutate_fn: impl FnOnce(Option) -> Option, + ) -> Option { let split_info_opt = self.remove(split_ulid); - let new_split: SplitInfo = mutate_fn(split_info_opt); + let new_split: SplitInfo = mutate_fn(split_info_opt)?; let new_status = new_split.status.clone(); self.insert(new_split); - new_status + Some(new_status) } fn change_split_status(&mut self, split_ulid: Ulid, status: Status) { @@ -292,15 +293,15 @@ impl SplitTable { self.mutate_split(split_ulid, move |split_info_opt| { if let Some(mut split_info) = split_info_opt { split_info.status = status; - split_info + Some(split_info) } else { - SplitInfo { + Some(SplitInfo { split_key: SplitKey { last_accessed: compute_timestamp(start_time), split_ulid, }, status, - } + }) } }); } @@ -309,20 +310,19 @@ impl SplitTable { let origin_time = self.origin_time; self.mutate_split(split_ulid, move |split_info_opt| { if let Some(split_info) = split_info_opt { - return split_info; - } - SplitInfo { - split_key: SplitKey { - last_accessed: compute_timestamp(origin_time) - .saturating_sub(NEWLY_REPORTED_SPLIT_LAST_TIME.as_micros() as u64), - split_ulid, - }, - status: Status::Candidate(CandidateSplit { - storage_uri, - split_ulid, - living_token: Arc::new(()), - }), + return Some(split_info); } + let split_key = SplitKey { + split_ulid, + last_accessed: compute_timestamp(origin_time) + .saturating_sub(NEWLY_REPORTED_SPLIT_LAST_TIME.as_micros() as u64), + }; + let status = Status::Candidate(CandidateSplit { + storage_uri, + split_ulid, + living_token: Arc::new(()), + }); + Some(SplitInfo { split_key, status }) }); } @@ -510,7 +510,7 @@ mod tests { let ulid2 = ulids[1]; split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI)); - let num_bytes_opt = split_table.touch(ulid1, &Uri::for_test("s3://test1/")); + let num_bytes_opt = split_table.touch(ulid1, &Uri::for_test("s3://test1/"), false); assert!(num_bytes_opt.is_none()); let candidate = split_table.best_candidate().unwrap(); assert_eq!(candidate.split_ulid, ulid1); @@ -536,7 +536,7 @@ mod tests { split_table.register_as_downloaded(ulid1, 10_000_000); assert_eq!(split_table.num_bytes(), 10_000_000); assert_eq!( - split_table.touch(ulid1, &Uri::for_test(TEST_STORAGE_URI)), + split_table.touch(ulid1, &Uri::for_test(TEST_STORAGE_URI), false), Some(10_000_000) ); let ulid2 = Ulid::new();