Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
d8a9424
Add streaming search with configurable scoring modes
atris Aug 25, 2025
3fe4f52
add javadocs
atris Aug 28, 2025
0b4d6a2
Fix changelog
atris Aug 28, 2025
f46f224
Fix forbidden APIs
atris Aug 28, 2025
9251736
Intermediate commit
atris Aug 30, 2025
ca8f639
Working intermediate commit
atris Aug 30, 2025
b8be3f6
Get streaming infra working
atris Aug 30, 2025
9cbdcbc
Phase 2
atris Aug 30, 2025
e967492
Intermediate commit
atris Aug 31, 2025
357848d
working commit
atris Sep 1, 2025
a30df2a
Working commit 2
atris Sep 2, 2025
c0d2a06
Cleanup
atris Sep 6, 2025
0e298e0
More cleanup
atris Sep 7, 2025
5061f09
Add streaming search with scoring using Hoeffding bounds
atris Sep 22, 2025
ac28752
Cleanup
atris Sep 24, 2025
3f78994
Add spotless output
atris Sep 24, 2025
9973de3
more cleanup
atris Sep 24, 2025
5edfe3c
Update per comments
atris Sep 27, 2025
6a4d92e
More cleanup
atris Sep 27, 2025
df7ad7b
Fix forbidden API issue
atris Sep 27, 2025
c084b56
Merge branch 'main' into streaming-scoring-clean
atris Oct 9, 2025
ad9c30d
Fix build issues
atris Oct 9, 2025
b4b16b0
More shenanigans
atris Oct 10, 2025
cbf228d
Remove confidence based streaming
atris Oct 11, 2025
dfcbbed
More cleanup
Oct 17, 2025
3d90216
Make spotless changes
Oct 17, 2025
dc5e1e8
Intermittent commit
atris Nov 4, 2025
938951f
4 to go
atris Nov 5, 2025
9233200
use global ordinals; fix per-leaf reset; enable under concurrent sear…
atris Nov 11, 2025
14f81b1
Fix more tests
atris Nov 11, 2025
a6a21b0
More tests fixes and cleanup
atris Nov 11, 2025
d751f73
Fix reindexing tests
atris Nov 20, 2025
cd0f276
Some tests pass
atris Jan 11, 2026
169bc33
Fix FlushModeResolver tests
atris Jan 11, 2026
1da8e37
Yet more fixes
atris Jan 12, 2026
b202193
Cleanup
atris Jan 12, 2026
4ce9a97
Merge remote-tracking branch 'origin/main' into streaming-scoring-clean
atris Jan 12, 2026
b4ef9e8
Fix compilation errors and merge conflicts after upstream merge
atris Jan 12, 2026
f851389
Spotless clean up
atris Jan 13, 2026
ae1b22f
Remove forbidden APIs
atris Jan 13, 2026
c967cbb
Sigh more test fixes
atris Jan 16, 2026
aceb756
More fixes
atris Jan 16, 2026
774588a
Spotless fixes
atris Jan 16, 2026
ae6c911
Yet more fixes
atris Jan 19, 2026
d0510cd
More cleanup
atris Jan 19, 2026
5352427
Miscellaneous refactoring
atris Jan 20, 2026
201110a
More refactor
atris Jan 21, 2026
3e1079a
Explicitly set partial to true
atris Jan 21, 2026
2fbd384
Revert silent drop of partial packets
atris Jan 21, 2026
edeabe5
Spotless changes
atris Jan 22, 2026
c204273
Merge branch 'main' into streaming-scoring-clean
atris Jan 22, 2026
5c095cf
Cleanup
atris Jan 22, 2026
6af9270
Fix serialization issue
atris Jan 22, 2026
87a3712
Streaming multiple partial results at coordinator and test fixes
Jan 28, 2026
b2f1903
Merge branch 'main' into streaming-scoring-clean
atris Jan 28, 2026
6d822bf
Multi shard failing tests
atris Jan 30, 2026
49cbe0f
More misc fixes
atris Feb 1, 2026
a72741d
Remove StreamingPerformanceBenchmarkTests.java to address reviewer fe…
Feb 26, 2026
737d8a4
Cleanup
Mar 3, 2026
0f5c5c2
Revert SearchProgressListener API break for source compatibility
Mar 6, 2026
6fc677b
Cleanup
Mar 13, 2026
b7f3128
More cleanup
Mar 17, 2026
e3fe3ae
Merge origin/main into streaming-scoring-clean
Mar 17, 2026
2e09c6b
Add missing javadoc
Mar 17, 2026
e9c1472
Remove transport and bound-provider changes from streaming PR
Mar 18, 2026
0092c49
More cleanup
Mar 18, 2026
0998491
Remove Streaming Search metrics
Mar 19, 2026
582a4c3
Trim unrelated plugin and reindex test churn from streaming PR
Mar 19, 2026
208eed3
MOre cleanup
Mar 19, 2026
560c347
Yet more cleanup
Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,14 @@
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.StreamSearchAction;
import org.opensearch.action.search.StreamTransportSearchAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportGetAllPitsAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
import org.opensearch.action.search.TransportStreamSearchAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.AutoCreateIndex;
import org.opensearch.action.support.DestructiveOperations;
Expand Down Expand Up @@ -741,9 +741,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class, TransportShardMultiGetAction.class);
actions.register(BulkAction.INSTANCE, TransportBulkAction.class, TransportShardBulkAction.class);
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)) {
actions.register(StreamSearchAction.INSTANCE, StreamTransportSearchAction.class);
}
actions.register(StreamSearchAction.INSTANCE, TransportStreamSearchAction.class);
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
actions.register(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
actions.register(ExplainAction.INSTANCE, TransportExplainAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ int getBatchReduceSize(int requestBatchedReduceSize, int minBatchReduceSize) {
return (hasAggs || hasTopDocs) ? Math.min(requestBatchedReduceSize, minBatchReduceSize) : minBatchReduceSize;
}

/**
* Protected accessor for progressListener to allow subclasses to access it.
* @return the search progress listener
*/
protected SearchProgressListener progressListener() {
return this.progressListener;
}

@Override
public void close() {
Releasables.close(pendingReduces);
Expand All @@ -160,7 +168,7 @@ public void close() {
public void consumeResult(SearchPhaseResult result, Runnable next) {
super.consumeResult(result, () -> {});
QuerySearchResult querySearchResult = result.queryResult();
progressListener.notifyQueryResult(querySearchResult.getShardIndex());
progressListener.notifyQueryResult(querySearchResult.getShardIndex(), querySearchResult.getSearchShardTarget());
pendingReduces.consume(querySearchResult, next);
}

Expand Down Expand Up @@ -228,7 +236,10 @@ private ReduceResult partialReduce(
Arrays.sort(toConsume, Comparator.comparingInt(QuerySearchResult::getShardIndex));

for (QuerySearchResult result : toConsume) {
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
// Use non-consuming topDocs() for stats aggregation only
if (result.hasTopDocs()) {
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
}
}

final TopDocs newTopDocs;
Expand All @@ -238,7 +249,9 @@ private ReduceResult partialReduce(
topDocsList.add(lastReduceResult.reducedTopDocs);
}
for (QuerySearchResult result : toConsume) {
// Consume TopDocs exactly once for merge/reduce phase
TopDocsAndMaxScore topDocs = result.consumeTopDocs();
// For streaming, avoid reassigning shardIndex if already set
SearchPhaseController.setShardIndex(topDocs.topDocs, result.getShardIndex());
topDocsList.add(topDocs.topDocs);
}
Expand Down Expand Up @@ -462,35 +475,39 @@ private void tryExecuteNext() {
runningTask.compareAndSet(null, task);
}

executor.execute(new AbstractRunnable() {
@Override
protected void doRun() {
final ReduceResult thisReduceResult = reduceResult;
long estimatedTotalSize = (thisReduceResult != null ? thisReduceResult.estimatedSize : 0) + task.aggsBufferSize;
final ReduceResult newReduceResult;
try {
final QuerySearchResult[] toConsume = task.consumeBuffer();
if (toConsume == null) {
onAfterReduce(task, null, 0);
try {
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() {
final ReduceResult thisReduceResult = reduceResult;
long estimatedTotalSize = (thisReduceResult != null ? thisReduceResult.estimatedSize : 0) + task.aggsBufferSize;
final ReduceResult newReduceResult;
try {
final QuerySearchResult[] toConsume = task.consumeBuffer();
if (toConsume == null) {
onAfterReduce(task, null, 0);
return;
}
long estimateRamBytesUsedForReduce = estimateRamBytesUsedForReduce(estimatedTotalSize);
addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce);
estimatedTotalSize += estimateRamBytesUsedForReduce;
++numReducePhases;
newReduceResult = partialReduce(toConsume, task.emptyResults, topDocsStats, thisReduceResult, numReducePhases);
} catch (Exception t) {
PendingReduces.this.onFailure(t);
return;
}
long estimateRamBytesUsedForReduce = estimateRamBytesUsedForReduce(estimatedTotalSize);
addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce);
estimatedTotalSize += estimateRamBytesUsedForReduce;
++numReducePhases;
newReduceResult = partialReduce(toConsume, task.emptyResults, topDocsStats, thisReduceResult, numReducePhases);
} catch (Exception t) {
PendingReduces.this.onFailure(t);
return;
onAfterReduce(task, newReduceResult, estimatedTotalSize);
}
onAfterReduce(task, newReduceResult, estimatedTotalSize);
}

@Override
public void onFailure(Exception exc) {
PendingReduces.this.onFailure(exc);
}
});
@Override
public void onFailure(Exception exc) {
PendingReduces.this.onFailure(exc);
}
});
} catch (Exception e) {
onFailure(e);
}
}

private void onAfterReduce(ReduceTask task, ReduceResult newResult, long estimatedSize) {
Expand All @@ -516,7 +533,11 @@ private void onAfterReduce(ReduceTask task, ReduceResult newResult, long estimat
}
}
task.consumeListener();
executor.execute(this::tryExecuteNext);
try {
executor.execute(this::tryExecuteNext);
} catch (Exception e) {
onFailure(e);
}
}

// Idempotent and thread-safe failure handling
Expand Down Expand Up @@ -549,7 +570,10 @@ private synchronized void clearReduceTaskQueue() {

private synchronized SearchPhaseController.TopDocsStats consumeTopDocsStats() {
for (QuerySearchResult result : buffer) {
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
// Use non-consuming topDocs() for stats aggregation only
if (result.hasTopDocs()) {
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
}
}
return topDocsStats;
}
Expand All @@ -563,6 +587,7 @@ private synchronized List<TopDocs> consumeTopDocs() {
topDocsList.add(reduceResult.reducedTopDocs);
}
for (QuerySearchResult result : buffer) {
// Consume TopDocs exactly once for merge/reduce phase
TopDocsAndMaxScore topDocs = result.consumeTopDocs();
SearchPhaseController.setShardIndex(topDocs.topDocs, result.getShardIndex());
topDocsList.add(topDocs.topDocs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,14 @@ static TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
}

static void setShardIndex(TopDocs topDocs, int shardIndex) {
assert topDocs.scoreDocs.length == 0 || topDocs.scoreDocs[0].shardIndex == -1 : "shardIndex is already set";
// Idempotent assignment: in streaming flows partial reductions may touch the same TopDocs more than once.
if (topDocs.scoreDocs.length == 0) {
return;
}
if (topDocs.scoreDocs[0].shardIndex != -1) {
// Already set by a previous pass; avoid reassigning to prevent assertion failures
return;
}
for (ScoreDoc doc : topDocs.scoreDocs) {
doc.shardIndex = shardIndex;
}
Expand Down Expand Up @@ -424,6 +431,7 @@ public ReduceContext forFinalReduction() {
final List<TopDocs> topDocs = new ArrayList<>();
for (SearchPhaseResult sortedResult : queryResults) {
QuerySearchResult queryResult = sortedResult.queryResult();
// Consume TopDocs exactly once for merge/reduce phase
final TopDocsAndMaxScore td = queryResult.consumeTopDocs();
assert td != null;
topDocsStats.add(td, queryResult.searchTimedOut(), queryResult.terminatedEarly());
Expand Down Expand Up @@ -795,40 +803,32 @@ QueryPhaseResultConsumer newSearchPhaseResults(
Consumer<Exception> onPartialMergeFailure,
BooleanSupplier isTaskCancelled
) {
return new QueryPhaseResultConsumer(
request,
executor,
circuitBreaker,
this,
listener,
namedWriteableRegistry,
numShards,
onPartialMergeFailure,
isTaskCancelled
);
}

/**
* Returns a new {@link StreamQueryPhaseResultConsumer} instance that reduces search responses incrementally.
*/
StreamQueryPhaseResultConsumer newStreamSearchPhaseResults(
Executor executor,
CircuitBreaker circuitBreaker,
SearchProgressListener listener,
SearchRequest request,
int numShards,
Consumer<Exception> onPartialMergeFailure
) {
return new StreamQueryPhaseResultConsumer(
request,
executor,
circuitBreaker,
this,
listener,
namedWriteableRegistry,
numShards,
onPartialMergeFailure
);
String streamingMode = request.getStreamingSearchMode();
if (streamingMode != null) {
return new StreamQueryPhaseResultConsumer(
request,
executor,
circuitBreaker,
this,
listener,
namedWriteableRegistry,
numShards,
onPartialMergeFailure
);
} else {
// Regular QueryPhaseResultConsumer
return new QueryPhaseResultConsumer(
request,
executor,
circuitBreaker,
this,
listener,
namedWriteableRegistry,
numShards,
onPartialMergeFailure,
isTaskCancelled
);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ protected void onListShards(List<SearchShard> shards, List<SearchShard> skippedS
*/
protected void onQueryResult(int shardIndex) {}

/**
* Executed when a shard returns a query result.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
* @param shardTarget The shard target that returned the result.
*/
protected void onQueryResult(int shardIndex, SearchShardTarget shardTarget) {
onQueryResult(shardIndex);
}

/**
* Executed when a shard reports a query failure.
*
Expand All @@ -100,6 +110,7 @@ protected void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exc
*/
protected void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}


/**
* Executed once when the final reduce is created.
*
Expand Down Expand Up @@ -146,6 +157,17 @@ final void notifyQueryResult(int shardIndex) {
}
}

final void notifyQueryResult(int shardIndex, SearchShardTarget shardTarget) {
try {
onQueryResult(shardIndex, shardTarget);
} catch (Exception e) {
logger.warn(
() -> new ParameterizedMessage("[{}] Failed to execute progress listener on query result", shards.get(shardIndex)),
e
);
}
}

final void notifyQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
try {
onQueryFailure(shardIndex, shardTarget, exc);
Expand All @@ -165,6 +187,7 @@ final void notifyPartialReduce(List<SearchShard> shards, TotalHits totalHits, In
}
}


protected final void notifyFinalReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
try {
onFinalReduce(shards, totalHits, aggs, reducePhase);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.lucene.search.TopFieldDocs;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.core.action.ActionListener;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchShardTarget;
Expand Down Expand Up @@ -149,17 +150,19 @@ protected void onShardResult(SearchPhaseResult result, SearchShardIterator shard
if (queryResult.isNull() == false
// disable sort optims for scroll requests because they keep track of the last bottom doc locally (per shard)
&& getRequest().scroll() == null
&& queryResult.topDocs() != null
&& queryResult.topDocs().topDocs.getClass() == TopFieldDocs.class) {
TopFieldDocs topDocs = (TopFieldDocs) queryResult.topDocs().topDocs;
if (bottomSortCollector == null) {
synchronized (this) {
if (bottomSortCollector == null) {
bottomSortCollector = new BottomSortValuesCollector(topDocsSize, topDocs.fields);
&& queryResult.hasTopDocs()) {
TopDocsAndMaxScore topDocsAndMaxScore = queryResult.topDocs();
if (topDocsAndMaxScore != null && topDocsAndMaxScore.topDocs.getClass() == TopFieldDocs.class) {
TopFieldDocs topDocs = (TopFieldDocs) topDocsAndMaxScore.topDocs;
if (bottomSortCollector == null) {
synchronized (this) {
if (bottomSortCollector == null) {
bottomSortCollector = new BottomSortValuesCollector(topDocsSize, topDocs.fields);
}
}
}
bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats());
}
bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats());
}
super.onShardResult(result, shardIt);
}
Expand Down
Loading
Loading