Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ pub struct SearcherConfig {
pub split_footer_cache_capacity: ByteSize,
pub partial_request_cache_capacity: ByteSize,
pub max_num_concurrent_split_searches: usize,
pub max_splits_per_search: Option<usize>,
// Deprecated: stream search requests are no longer supported.
#[serde(alias = "max_num_concurrent_split_streams", default, skip_serializing)]
pub _max_num_concurrent_split_streams: Option<serde::de::IgnoredAny>,
Expand Down Expand Up @@ -325,6 +326,7 @@ impl Default for SearcherConfig {
split_footer_cache_capacity: ByteSize::mb(500),
partial_request_cache_capacity: ByteSize::mb(64),
max_num_concurrent_split_searches: 100,
max_splits_per_search: None,
_max_num_concurrent_split_streams: None,
aggregation_memory_limit: ByteSize::mb(500),
aggregation_bucket_limit: 65000,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ mod tests {
split_footer_cache_capacity: ByteSize::gb(1),
partial_request_cache_capacity: ByteSize::mb(64),
max_num_concurrent_split_searches: 150,
max_splits_per_search: None,
_max_num_concurrent_split_streams: Some(serde::de::IgnoredAny),
split_cache: None,
request_timeout_secs: NonZeroU64::new(30).unwrap(),
Expand Down
66 changes: 66 additions & 0 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,21 @@ pub async fn root_search(
"root_search"
);

if let Some(max_total_split_searches) = searcher_context.searcher_config.max_splits_per_search
&& max_total_split_searches < num_splits
{
tracing::error!(
num_splits,
max_total_split_searches,
index=?search_request.index_id_patterns,
query=%search_request.query_ast,
"max total splits exceeded"
);
return Err(SearchError::InvalidArgument(format!(
"Number of targeted splits {num_splits} exceeds the limit {max_total_split_searches}"
)));
}

let mut search_response_result = RootSearchMetricsFuture {
start: start_instant,
tracked: root_search_aux(
Expand Down Expand Up @@ -5228,4 +5243,55 @@ mod tests {
assert_eq!(search_response.failed_splits.len(), 1);
Ok(())
}

#[tokio::test]
async fn test_root_search_too_many_splits() -> anyhow::Result<()> {
let search_request = quickwit_proto::search::SearchRequest {
index_id_patterns: vec!["test-index".to_string()],
query_ast: qast_json_helper("test", &["body"]),
max_hits: 10,
..Default::default()
};
let mut mock_metastore = MockMetastoreService::new();
let index_metadata = IndexMetadata::for_test("test-index", "ram:///test-index");
let index_uid = index_metadata.index_uid.clone();
mock_metastore
.expect_list_indexes_metadata()
.returning(move |_index_ids_query| {
Ok(ListIndexesMetadataResponse::for_test(vec![
index_metadata.clone(),
]))
});
mock_metastore
.expect_list_splits()
.returning(move |_filter| {
let splits = vec![
MockSplitBuilder::new("split1")
.with_index_uid(&index_uid)
.build(),
MockSplitBuilder::new("split2")
.with_index_uid(&index_uid)
.build(),
];
let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap();
Ok(ServiceStream::from(vec![Ok(splits_response)]))
});
let mock_search_service = MockSearchService::new();
let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service)]);
let search_job_placer = SearchJobPlacer::new(searcher_pool);
let cluster_client = ClusterClient::new(search_job_placer.clone());

let mut searcher_context = SearcherContext::for_test();
searcher_context.searcher_config.max_splits_per_search = Some(1);
let search_error = root_search(
&searcher_context,
search_request,
MetastoreServiceClient::from_mock(mock_metastore),
&cluster_client,
)
.await
.unwrap_err();
assert!(matches!(search_error, SearchError::InvalidArgument { .. }));
Ok(())
}
}