Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,12 @@ pub struct SearcherConfig {
pub storage_timeout_policy: Option<StorageTimeoutPolicy>,
pub warmup_memory_budget: ByteSize,
pub warmup_single_split_initial_allocation: ByteSize,

pub secondary_max_num_concurrent_split_searches: usize,
pub secondary_warmup_memory_budget: ByteSize,
pub secondary_targeted_split_count_threshold: Option<usize>,
#[serde(default = "SearcherConfig::default_request_timeout_secs")]
secondary_request_timeout_secs: NonZeroU64,
}

/// Configuration controlling how fast a searcher should timeout a `get_slice`
Expand Down Expand Up @@ -333,6 +339,11 @@ impl Default for SearcherConfig {
storage_timeout_policy: None,
warmup_memory_budget: ByteSize::gb(100),
warmup_single_split_initial_allocation: ByteSize::gb(1),

secondary_max_num_concurrent_split_searches: 50,
secondary_warmup_memory_budget: ByteSize::gb(50),
secondary_targeted_split_count_threshold: None,
secondary_request_timeout_secs: Self::default_request_timeout_secs(),
}
}
}
Expand All @@ -342,6 +353,9 @@ impl SearcherConfig {
pub fn request_timeout(&self) -> Duration {
Duration::from_secs(self.request_timeout_secs.get())
}
pub fn secondary_request_timeout(&self) -> Duration {
Duration::from_secs(self.secondary_request_timeout_secs.get())
}
fn default_request_timeout_secs() -> NonZeroU64 {
NonZeroU64::new(30).unwrap()
}
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,13 @@ mod tests {
}),
warmup_memory_budget: ByteSize::gb(100),
warmup_single_split_initial_allocation: ByteSize::gb(1),

secondary_max_num_concurrent_split_searches: 50,
secondary_warmup_memory_budget: ByteSize::gb(50),
// Splits per leaf search above which the secondary queue is used. If not set, the
// secondary queue is never used.
secondary_targeted_split_count_threshold: None,
secondary_request_timeout_secs: NonZeroU64::new(30).unwrap(),
}
);
assert_eq!(
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ async fn fetch_docs_in_split(
split,
Some(doc_mapper.tokenizer_manager()),
None,
false,
)
.await
.context("open-index-for-split")?;
Expand Down
44 changes: 32 additions & 12 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use tokio::task::JoinError;
use tracing::*;

use crate::collector::{IncrementalCollector, make_collector_for_split, make_merge_collector};
use crate::metrics::SplitSearchOutcomeCounters;
use crate::metrics::{SplitSearchOutcomeCounters, queue_label};
use crate::root::is_metadata_count_request_with_ast;
use crate::search_permit_provider::{SearchPermit, compute_initial_memory_allocation};
use crate::service::{SearcherContext, deserialize_doc_mapper};
Expand Down Expand Up @@ -92,6 +92,7 @@ pub(crate) async fn open_split_bundle(
searcher_context: &SearcherContext,
index_storage: Arc<dyn Storage>,
split_and_footer_offsets: &SplitIdAndFooterOffsets,
split_cache_read_only: bool,
) -> anyhow::Result<(FileSlice, BundleStorage)> {
let split_file = PathBuf::from(format!("{}.split", split_and_footer_offsets.split_id));
let footer_data = get_split_footer_from_cache_or_fetch(
Expand All @@ -105,7 +106,11 @@ pub(crate) async fn open_split_bundle(
// This is before the bundle storage: at this point, this storage is reading `.split` files.
let index_storage_with_split_cache =
if let Some(split_cache) = searcher_context.split_cache_opt.as_ref() {
SplitCache::wrap_storage(split_cache.clone(), index_storage.clone())
SplitCache::wrap_storage(
split_cache.clone(),
index_storage.clone(),
split_cache_read_only,
)
} else {
index_storage.clone()
};
Expand Down Expand Up @@ -148,6 +153,7 @@ pub(crate) async fn open_index_with_caches(
split_and_footer_offsets: &SplitIdAndFooterOffsets,
tokenizer_manager: Option<&TokenizerManager>,
ephemeral_unbounded_cache: Option<ByteRangeCache>,
split_cache_read_only: bool,
) -> anyhow::Result<(Index, HotDirectory)> {
let index_storage_with_retry_on_timeout =
configure_storage_retries(searcher_context, index_storage);
Expand All @@ -156,6 +162,7 @@ pub(crate) async fn open_index_with_caches(
searcher_context,
index_storage_with_retry_on_timeout,
split_and_footer_offsets,
split_cache_read_only,
)
.await?;

Expand Down Expand Up @@ -438,10 +445,10 @@ async fn leaf_search_single_split(
split: SplitIdAndFooterOffsets,
aggregations_limits: AggregationLimitsGuard,
search_permit: &mut SearchPermit,
is_broad_search: bool,
) -> crate::Result<Option<LeafSearchResponse>> {
let mut leaf_search_state_guard =
SplitSearchStateGuard::new(ctx.split_outcome_counters.clone());

rewrite_request(
&mut search_request,
&split,
Expand Down Expand Up @@ -477,6 +484,8 @@ async fn leaf_search_single_split(
&split,
Some(ctx.doc_mapper.tokenizer_manager()),
Some(byte_range_cache.clone()),
// for broad searches, we want to avoid evicting useful cached splits
is_broad_search,
)
.await?;

Expand Down Expand Up @@ -1182,6 +1191,7 @@ pub async fn multi_index_leaf_search(
searcher_context: Arc<SearcherContext>,
leaf_search_request: LeafSearchRequest,
storage_resolver: &StorageResolver,
is_broad_search: bool,
) -> Result<LeafSearchResponse, SearchError> {
let search_request: Arc<SearchRequest> = leaf_search_request
.search_request
Expand Down Expand Up @@ -1240,6 +1250,7 @@ pub async fn multi_index_leaf_search(
leaf_search_request_ref.split_offsets,
doc_mapper,
aggregation_limits,
is_broad_search,
)
.await
}
Expand All @@ -1248,11 +1259,13 @@ pub async fn multi_index_leaf_search(
leaf_request_tasks.push(leaf_request_future);
}

let leaf_responses: Vec<crate::Result<LeafSearchResponse>> = tokio::time::timeout(
searcher_context.searcher_config.request_timeout(),
try_join_all(leaf_request_tasks),
)
.await??;
let timeout = if is_broad_search {
searcher_context.searcher_config.secondary_request_timeout()
} else {
searcher_context.searcher_config.request_timeout()
};
let leaf_responses: Vec<crate::Result<LeafSearchResponse>> =
tokio::time::timeout(timeout, try_join_all(leaf_request_tasks)).await??;
let merge_collector = make_merge_collector(&search_request, aggregation_limits)?;
let mut incremental_merge_collector = IncrementalCollector::new(merge_collector);
for result in leaf_responses {
Expand Down Expand Up @@ -1339,6 +1352,7 @@ pub async fn single_doc_mapping_leaf_search(
splits: Vec<SplitIdAndFooterOffsets>,
doc_mapper: Arc<DocMapper>,
aggregations_limits: AggregationLimitsGuard,
is_broad_search: bool,
) -> Result<LeafSearchResponse, SearchError> {
let num_docs: u64 = splits.iter().map(|split| split.num_docs).sum();
let num_splits = splits.len();
Expand Down Expand Up @@ -1366,10 +1380,12 @@ pub async fn single_doc_mapping_leaf_search(
.warmup_single_split_initial_allocation,
)
});
let permit_futures = searcher_context
.search_permit_provider
.get_permits(permit_sizes)
.await;
let permit_provider = if is_broad_search {
&searcher_context.secondary_search_permit_provider
} else {
&searcher_context.search_permit_provider
};
let permit_futures = permit_provider.get_permits(permit_sizes).await;

let leaf_search_context = Arc::new(LeafSearchContext {
searcher_context: searcher_context.clone(),
Expand Down Expand Up @@ -1405,6 +1421,7 @@ pub async fn single_doc_mapping_leaf_search(
split,
leaf_split_search_permit,
aggregations_limits.clone(),
is_broad_search,
)
.in_current_span(),
),
Expand Down Expand Up @@ -1528,9 +1545,11 @@ async fn leaf_search_single_split_wrapper(
split: SplitIdAndFooterOffsets,
mut search_permit: SearchPermit,
aggregations_limits: AggregationLimitsGuard,
is_broad_search: bool,
) {
let timer = crate::SEARCH_METRICS
.leaf_search_split_duration_secs
.with_label_values([queue_label(is_broad_search)])
.start_timer();
let leaf_search_single_split_opt_res: crate::Result<Option<LeafSearchResponse>> =
leaf_search_single_split(
Expand All @@ -1540,6 +1559,7 @@ async fn leaf_search_single_split_wrapper(
split.clone(),
aggregations_limits,
&mut search_permit,
is_broad_search,
)
.await;

Expand Down
9 changes: 7 additions & 2 deletions quickwit/quickwit-search/src/list_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ async fn get_fields_from_split(
{
return Ok(list_fields.fields);
}
let (_, split_bundle) =
open_split_bundle(searcher_context, index_storage, split_and_footer_offsets).await?;
let (_, split_bundle) = open_split_bundle(
searcher_context,
index_storage,
split_and_footer_offsets,
false,
)
.await?;

let serialized_split_fields = split_bundle
.get_all(Path::new(SPLIT_FIELDS_FILE_NAME))
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use tantivy::{ReloadPolicy, Term};
use tracing::{debug, error, info, instrument};

use crate::leaf::open_index_with_caches;
use crate::metrics::queue_label;
use crate::search_job_placer::group_jobs_by_index_id;
use crate::search_permit_provider::compute_initial_memory_allocation;
use crate::{ClusterClient, SearchError, SearchJob, SearcherContext, resolve_index_patterns};
Expand Down Expand Up @@ -215,7 +216,7 @@ async fn leaf_list_terms_single_split(
let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let (index, _) =
open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?;
open_index_with_caches(searcher_context, storage, &split, None, Some(cache), false).await?;
let split_schema = index.schema();
let reader = index
.reader_builder()
Expand Down Expand Up @@ -348,6 +349,7 @@ pub async fn leaf_list_terms(
crate::SEARCH_METRICS.leaf_list_terms_splits_total.inc();
let timer = crate::SEARCH_METRICS
.leaf_search_split_duration_secs
.with_label_values([queue_label(false)])
.start_timer();
let leaf_search_single_split_res = leaf_list_terms_single_split(
&searcher_context_clone,
Expand Down
70 changes: 56 additions & 14 deletions quickwit/quickwit-search/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,22 +108,24 @@ pub struct SearchMetrics {
pub root_search_requests_total: IntCounterVec<1>,
pub root_search_request_duration_seconds: HistogramVec<1>,
pub root_search_targeted_splits: HistogramVec<1>,
pub leaf_search_requests_total: IntCounterVec<1>,
pub leaf_search_request_duration_seconds: HistogramVec<1>,
pub leaf_search_targeted_splits: HistogramVec<1>,
pub leaf_search_requests_total: IntCounterVec<2>,
pub leaf_search_request_duration_seconds: HistogramVec<2>,
pub leaf_search_targeted_splits: HistogramVec<2>,
pub leaf_list_terms_splits_total: IntCounter,
pub split_search_outcome_total: SplitSearchOutcomeCounters,
pub leaf_search_split_duration_secs: Histogram,
pub leaf_search_split_duration_secs: HistogramVec<1>,
pub job_assigned_total: IntCounterVec<1>,
pub leaf_search_single_split_tasks_pending: IntGauge,
pub leaf_search_single_split_tasks_ongoing: IntGauge,
pub leaf_search_single_split_secondary_tasks_pending: IntGauge,
pub leaf_search_single_split_secondary_tasks_ongoing: IntGauge,
pub leaf_search_single_split_warmup_num_bytes: Histogram,
pub searcher_local_kv_store_size_bytes: IntGauge,
}

/// From 0.008s to 131.072s
fn duration_buckets() -> Vec<f64> {
exponential_buckets(0.008, 2.0, 15).unwrap()
exponential_buckets(0.064, 2.0, 15).unwrap()
}

impl Default for SearchMetrics {
Expand Down Expand Up @@ -151,12 +153,15 @@ impl Default for SearchMetrics {
ByteSize::gb(5).as_u64() as f64,
];

let leaf_search_single_split_tasks = new_gauge_vec::<1>(
let leaf_search_single_split_tasks = new_gauge_vec::<2>(
"leaf_search_single_split_tasks",
"Number of single split search tasks pending or ongoing",
"search",
&[],
["status"], // takes values "ongoing" or "pending"
[
"status", // "ongoing" or "pending"
"queue", // "primary" or "secondary"
],
);

SearchMetrics {
Expand Down Expand Up @@ -188,22 +193,22 @@ impl Default for SearchMetrics {
"Total number of leaf search gRPC requests processed.",
"search",
&[("kind", "server")],
["status"],
["status", "queue"],
),
leaf_search_request_duration_seconds: new_histogram_vec(
"leaf_search_request_duration_seconds",
"Duration of leaf search gRPC requests in seconds.",
"search",
&[("kind", "server")],
["status"],
["status", "queue"],
duration_buckets(),
),
leaf_search_targeted_splits: new_histogram_vec(
"leaf_search_targeted_splits",
"Number of splits targeted per leaf search GRPC request.",
"search",
&[],
["status"],
["status", "queue"],
targeted_splits_buckets,
),

Expand All @@ -214,18 +219,24 @@ impl Default for SearchMetrics {
&[],
),
split_search_outcome_total: SplitSearchOutcomeCounters::new_registered(),

leaf_search_split_duration_secs: new_histogram(
leaf_search_split_duration_secs: new_histogram_vec(
"leaf_search_split_duration_secs",
"Number of seconds required to run a leaf search over a single split. The timer \
starts after the semaphore is obtained.",
"search",
&[],
["queue"],
duration_buckets(),
),
// we need to expose the gauges here to provide a static ref to for the gauge guards
leaf_search_single_split_tasks_ongoing: leaf_search_single_split_tasks
.with_label_values(["ongoing"]),
.with_label_values(["ongoing", "primary"]),
leaf_search_single_split_tasks_pending: leaf_search_single_split_tasks
.with_label_values(["pending"]),
.with_label_values(["pending", "primary"]),
leaf_search_single_split_secondary_tasks_ongoing: leaf_search_single_split_tasks
.with_label_values(["ongoing", "secondary"]),
leaf_search_single_split_secondary_tasks_pending: leaf_search_single_split_tasks
.with_label_values(["pending", "secondary"]),
leaf_search_single_split_warmup_num_bytes: new_histogram(
"leaf_search_single_split_warmup_num_bytes",
"Size of the short lived cache for a single split once the warmup is done.",
Expand All @@ -250,6 +261,37 @@ impl Default for SearchMetrics {
}
}

pub fn queue_label(is_broad_search: bool) -> &'static str {
if is_broad_search {
"secondary"
} else {
"primary"
}
}

/// Metrics for the various instances of permit providers.
#[derive(Clone)]
pub struct SearchTaskMetrics {
pub ongoing_tasks: &'static IntGauge,
pub pending_tasks: &'static IntGauge,
}

impl SearchMetrics {
pub fn search_task_metrics(&'static self) -> SearchTaskMetrics {
SearchTaskMetrics {
ongoing_tasks: &self.leaf_search_single_split_tasks_ongoing,
pending_tasks: &self.leaf_search_single_split_tasks_pending,
}
}

pub fn secondary_search_task_metrics(&'static self) -> SearchTaskMetrics {
SearchTaskMetrics {
ongoing_tasks: &self.leaf_search_single_split_secondary_tasks_ongoing,
pending_tasks: &self.leaf_search_single_split_secondary_tasks_pending,
}
}
}

/// `SEARCH_METRICS` exposes a bunch a set of storage/cache related metrics through a prometheus
/// endpoint.
pub static SEARCH_METRICS: Lazy<SearchMetrics> = Lazy::new(SearchMetrics::default);
Loading