Skip to content

Commit 41fb388

Browse files
authored
ISSUE-1118 Check collector logic at root and optimize (#1121)
1 parent fd0eb6c commit 41fb388

File tree

3 files changed

+13
-3
lines changed

3 files changed

+13
-3
lines changed

quickwit-search/src/collector.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,8 @@ fn merge_leaf_responses(
321321
.sum();
322322
let failed_splits = leaf_responses
323323
.iter()
324-
.flat_map(|leaf_response| leaf_response.failed_splits.iter().cloned())
324+
.flat_map(|leaf_response| leaf_response.failed_splits.iter())
325+
.cloned()
325326
.collect_vec();
326327
let all_partial_hits: Vec<PartialHit> = leaf_responses
327328
.into_iter()
@@ -342,7 +343,7 @@ fn merge_leaf_responses(
342343
///
343344
/// TODO we could possibly optimize the sort away (but I doubt it matters).
344345
fn top_k_partial_hits(mut partial_hits: Vec<PartialHit>, num_hits: usize) -> Vec<PartialHit> {
345-
partial_hits.sort_by(|left, right| {
346+
partial_hits.sort_unstable_by(|left, right| {
346347
let left_key = partial_hit_sorting_key(left);
347348
let right_key = partial_hit_sorting_key(right);
348349
left_key.cmp(&right_key)

quickwit-search/src/leaf.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,11 @@ pub async fn leaf_search(
267267
Err(err) => Either::Right(err),
268268
});
269269

270+
// Creates a collector which merges responses into one
270271
let merge_collector = make_merge_collector(request);
272+
273+
// Merging is a cpu-bound task.
274+
// It should be executed by Tokio's blocking threads.
271275
let mut merged_search_response =
272276
spawn_blocking(move || merge_collector.merge_fruits(split_search_responses))
273277
.instrument(info_span!("merge_search_responses"))

quickwit-search/src/root.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,12 @@ pub async fn root_search(
167167
}),
168168
)
169169
.await?;
170+
171+
// Creates a collector which merges responses into one
170172
let merge_collector = make_merge_collector(search_request);
173+
174+
// Merging is a cpu-bound task.
175+
// It should be executed by Tokio's blocking threads.
171176
let leaf_search_response =
172177
spawn_blocking(move || merge_collector.merge_fruits(leaf_search_responses))
173178
.await?
@@ -222,7 +227,7 @@ pub async fn root_search(
222227
.into_iter()
223228
.flat_map(|response| response.hits)
224229
.collect();
225-
hits.sort_by_key(|hit| {
230+
hits.sort_unstable_by_key(|hit| {
226231
Reverse(
227232
hit.partial_hit
228233
.as_ref()

0 commit comments

Comments
 (0)