Skip to content

Commit 97823ad

Browse files
committed
Use JoinSet instead of its ordered variation
1 parent 7b39b3f commit 97823ad

File tree

3 files changed

+16
-306
lines changed

3 files changed

+16
-306
lines changed

quickwit/quickwit-search/src/join_vec.rs

Lines changed: 0 additions & 292 deletions
This file was deleted.

quickwit/quickwit-search/src/leaf.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,10 @@ use tantivy::directory::FileSlice;
4141
use tantivy::fastfield::FastFieldReaders;
4242
use tantivy::schema::Field;
4343
use tantivy::{DateTime, Index, ReloadPolicy, Searcher, TantivyError, Term};
44-
use tokio::task::JoinError;
44+
use tokio::task::{JoinError, JoinSet};
4545
use tracing::*;
4646

4747
use crate::collector::{IncrementalCollector, make_collector_for_split, make_merge_collector};
48-
use crate::join_vec::{JoinVec, TryJoinVec};
4948
use crate::root::is_metadata_count_request_with_ast;
5049
use crate::search_permit_provider::{SearchPermit, compute_initial_memory_allocation};
5150
use crate::service::{SearcherContext, deserialize_doc_mapper};
@@ -1180,7 +1179,7 @@ pub async fn multi_index_leaf_search(
11801179
//
11811180
// It is a little bit tricky how to handle which is now the incremental_merge_collector, one
11821181
// per index, e.g. when to merge results and how to avoid lock contention.
1183-
let mut try_join_vec = TryJoinVec::new();
1182+
let mut join_set = JoinSet::new();
11841183
for leaf_search_request_ref in leaf_search_request.leaf_requests.into_iter() {
11851184
let index_uri = quickwit_common::uri::Uri::from_str(
11861185
leaf_search_request
@@ -1203,7 +1202,7 @@ pub async fn multi_index_leaf_search(
12031202
})?
12041203
.clone();
12051204

1206-
try_join_vec.spawn({
1205+
join_set.spawn({
12071206
let storage_resolver = storage_resolver.clone();
12081207
let searcher_context = searcher_context.clone();
12091208
let search_request = search_request.clone();
@@ -1224,11 +1223,10 @@ pub async fn multi_index_leaf_search(
12241223
});
12251224
}
12261225

1227-
let leaf_responses: Vec<LeafSearchResponse> = try_join_vec.try_join_all().await?;
12281226
let merge_collector = make_merge_collector(&search_request, aggregation_limits)?;
12291227
let mut incremental_merge_collector = IncrementalCollector::new(merge_collector);
1230-
for result in leaf_responses {
1231-
incremental_merge_collector.add_result(result)?;
1228+
while let Some(result) = join_set.join_next().await {
1229+
incremental_merge_collector.add_result(result??)?;
12321230
}
12331231

12341232
crate::search_thread_pool()
@@ -1313,8 +1311,8 @@ pub async fn single_doc_mapping_leaf_search(
13131311
.search_permit_provider
13141312
.get_permits(permit_sizes)
13151313
.await;
1316-
1317-
let mut join_vec = JoinVec::new_with_capacity(split_with_req.len());
1314+
let mut join_set = JoinSet::new();
1315+
let mut split_with_task_id = Vec::with_capacity(split_with_req.len());
13181316
for ((split, mut request), permit_fut) in
13191317
split_with_req.into_iter().zip(permit_futures.into_iter())
13201318
{
@@ -1326,8 +1324,8 @@ pub async fn single_doc_mapping_leaf_search(
13261324
if !can_be_better && !run_all_splits {
13271325
continue;
13281326
}
1329-
join_vec.spawn(
1330-
split.split_id.clone(),
1327+
let split_id = split.split_id.clone();
1328+
let handle = join_set.spawn(
13311329
leaf_search_single_split_wrapper(
13321330
request,
13331331
searcher_context.clone(),
@@ -1341,20 +1339,25 @@ pub async fn single_doc_mapping_leaf_search(
13411339
)
13421340
.in_current_span(),
13431341
);
1342+
split_with_task_id.push((split_id, handle.id()));
13441343
}
13451344

13461345
// TODO we could cancel running splits when !run_all_splits and the running split can no
13471346
// longer give better results after some other split answered.
13481347
let mut split_search_join_errors: Vec<(String, JoinError)> = Vec::new();
13491348

1350-
let leaf_search_single_split_join_handles = join_vec.join_all().await;
1351-
for (split, leaf_search_join_result) in leaf_search_single_split_join_handles {
1349+
while let Some(leaf_search_join_result) = join_set.join_next().await {
13521350
// splits that did not panic were already added to the collector
13531351
if let Err(join_error) = leaf_search_join_result {
13541352
if join_error.is_cancelled() {
13551353
// An explicit task cancellation is not an error.
13561354
continue;
13571355
}
1356+
let position = split_with_task_id
1357+
.iter()
1358+
.position(|(_, task_id)| *task_id == join_error.id())
1359+
.unwrap();
1360+
let (split, _) = split_with_task_id.remove(position);
13581361
if join_error.is_panic() {
13591362
error!(split=%split, "leaf search task panicked");
13601363
} else {

quickwit/quickwit-search/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ mod collector;
2323
mod error;
2424
mod fetch_docs;
2525
mod find_trace_ids_collector;
26-
mod join_vec;
2726
mod leaf;
2827
mod leaf_cache;
2928
mod list_fields;

0 commit comments

Comments
 (0)