diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index bb8a17daaeb..eb1edfce889 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -268,6 +268,7 @@ pub struct SearcherConfig { pub split_footer_cache_capacity: ByteSize, pub partial_request_cache_capacity: ByteSize, pub max_num_concurrent_split_searches: usize, + pub max_splits_per_search: Option, // Deprecated: stream search requests are no longer supported. #[serde(alias = "max_num_concurrent_split_streams", default, skip_serializing)] pub _max_num_concurrent_split_streams: Option, @@ -325,6 +326,7 @@ impl Default for SearcherConfig { split_footer_cache_capacity: ByteSize::mb(500), partial_request_cache_capacity: ByteSize::mb(64), max_num_concurrent_split_searches: 100, + max_splits_per_search: None, _max_num_concurrent_split_streams: None, aggregation_memory_limit: ByteSize::mb(500), aggregation_bucket_limit: 65000, diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index b5f39ceb0ac..74af6b07201 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -662,6 +662,7 @@ mod tests { split_footer_cache_capacity: ByteSize::gb(1), partial_request_cache_capacity: ByteSize::mb(64), max_num_concurrent_split_searches: 150, + max_splits_per_search: None, _max_num_concurrent_split_streams: Some(serde::de::IgnoredAny), split_cache: None, request_timeout_secs: NonZeroU64::new(30).unwrap(), diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index c2225b7524e..218758cbdc4 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -1224,6 +1224,21 @@ pub async fn root_search( "root_search" ); + if let Some(max_total_split_searches) = searcher_context.searcher_config.max_splits_per_search + && max_total_split_searches < num_splits + { + tracing::error!( + num_splits, + max_total_split_searches, + index=?search_request.index_id_patterns, + query=%search_request.query_ast, + "max total splits exceeded" + ); + return Err(SearchError::InvalidArgument(format!( + "Number of targeted splits {num_splits} exceeds the limit {max_total_split_searches}" + ))); + } + let mut search_response_result = RootSearchMetricsFuture { start: start_instant, tracked: root_search_aux( @@ -5228,4 +5243,55 @@ mod tests { assert_eq!(search_response.failed_splits.len(), 1); Ok(()) } + + #[tokio::test] + async fn test_root_search_too_many_splits() -> anyhow::Result<()> { + let search_request = quickwit_proto::search::SearchRequest { + index_id_patterns: vec!["test-index".to_string()], + query_ast: qast_json_helper("test", &["body"]), + max_hits: 10, + ..Default::default() + }; + let mut mock_metastore = MockMetastoreService::new(); + let index_metadata = IndexMetadata::for_test("test-index", "ram:///test-index"); + let index_uid = index_metadata.index_uid.clone(); + mock_metastore + .expect_list_indexes_metadata() + .returning(move |_index_ids_query| { + Ok(ListIndexesMetadataResponse::for_test(vec![ + index_metadata.clone(), + ])) + }); + mock_metastore + .expect_list_splits() + .returning(move |_filter| { + let splits = vec![ + MockSplitBuilder::new("split1") + .with_index_uid(&index_uid) + .build(), + MockSplitBuilder::new("split2") + .with_index_uid(&index_uid) + .build(), + ]; + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) + }); + let mock_search_service = MockSearchService::new(); + let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service)]); + let search_job_placer = SearchJobPlacer::new(searcher_pool); + let cluster_client = ClusterClient::new(search_job_placer.clone()); + + let mut searcher_context = SearcherContext::for_test(); + searcher_context.searcher_config.max_splits_per_search = Some(1); + let search_error = root_search( + &searcher_context, + search_request, + MetastoreServiceClient::from_mock(mock_metastore), + &cluster_client, + ) + .await + .unwrap_err(); + assert!(matches!(search_error, SearchError::InvalidArgument { .. })); + Ok(()) + } }