Skip to content

Commit 5061f09

Browse files
committed
Add streaming search with scoring using Hoeffding bounds
Add 4 streaming modes: NO_SCORING, SCORED_UNSORTED, CONFIDENCE_BASED, SCORED_SORTED Implement progressive result delivery via StreamQueryPhaseResultConsumer Add Hoeffding bounds for statistical confidence in CONFIDENCE_BASED mode Integrate StreamingSearchProgressListener for partial result emission Add bounded buffer support to reduce memory usage by 80-90% Wire streaming through TransportSearchAction and SearchPhaseController Add REST API support via streaming_mode parameter in RestSearchAction Include benchmark test demonstrating TTFB improvements Reduces Time To First Byte (TTFB) by delivering results as shards complete rather than waiting for all shards. Maintains scoring accuracy using statistical bounds for early termination decisions. Signed-off-by: Atri Sharma <atri.jiit@gmail.com>
1 parent 0e298e0 commit 5061f09

28 files changed

+983
-819
lines changed

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,6 @@
286286
import org.opensearch.action.search.PutSearchPipelineTransportAction;
287287
import org.opensearch.action.search.SearchAction;
288288
import org.opensearch.action.search.SearchScrollAction;
289-
import org.opensearch.action.search.StreamSearchAction;
290289
import org.opensearch.action.search.TransportClearScrollAction;
291290
import org.opensearch.action.search.TransportCreatePitAction;
292291
import org.opensearch.action.search.TransportDeletePitAction;

server/src/main/java/org/opensearch/action/search/SearchPhaseController.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@
3232

3333
package org.opensearch.action.search;
3434

35-
import org.apache.lucene.index.Term;
36-
import org.apache.lucene.search.CollectionStatistics;
3735
import org.apache.logging.log4j.LogManager;
3836
import org.apache.logging.log4j.Logger;
37+
import org.apache.lucene.index.Term;
38+
import org.apache.lucene.search.CollectionStatistics;
3939
import org.apache.lucene.search.FieldDoc;
4040
import org.apache.lucene.search.ScoreDoc;
4141
import org.apache.lucene.search.Sort;
@@ -848,7 +848,6 @@ QueryPhaseResultConsumer newSearchPhaseResults(
848848
}
849849
}
850850

851-
852851
/**
853852
* The top docs statistics
854853
*

server/src/main/java/org/opensearch/action/search/SearchRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,7 @@ public void setStreamingScoring(boolean streamingScoring) {
679679
public boolean isStreamingScoring() {
680680
return streamingScoring;
681681
}
682-
682+
683683
/**
684684
* Sets the streaming search mode for this request.
685685
* @param mode The streaming search mode to use

server/src/main/java/org/opensearch/action/search/StreamQueryPhaseResultConsumer.java

Lines changed: 56 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,29 @@
1313
import org.opensearch.common.settings.ClusterSettings;
1414
import org.opensearch.core.common.breaker.CircuitBreaker;
1515
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
16-
import org.opensearch.search.streaming.StreamingSearchSettings;
1716
import org.opensearch.search.SearchPhaseResult;
1817
import org.opensearch.search.query.QuerySearchResult;
1918
import org.opensearch.search.query.StreamingSearchMode;
20-
21-
import java.util.concurrent.atomic.AtomicInteger;
19+
import org.opensearch.search.streaming.StreamingSearchSettings;
2220

2321
import java.util.concurrent.Executor;
22+
import java.util.concurrent.atomic.AtomicInteger;
2423
import java.util.function.Consumer;
2524

2625
/**
2726
* Query phase result consumer for streaming search.
2827
* Supports progressive batch reduction with configurable scoring modes.
29-
*
28+
*
3029
* Batch reduction frequency is controlled by per-mode multipliers from cluster settings:
3130
* - NO_SCORING: Immediate reduction (batch size = 1) for fastest time-to-first-byte
3231
* - SCORED_UNSORTED: Small batches controlled by search.streaming.scored_unsorted.batch_multiplier (default: 2)
3332
* - CONFIDENCE_BASED: Moderate batches controlled by search.streaming.confidence.batch_multiplier (default: 3)
3433
* - SCORED_SORTED: Larger batches controlled by search.streaming.scored_sorted.batch_multiplier (default: 10)
35-
*
34+
*
3635
* These multipliers are applied to the base batch reduce size (typically 5) to determine
3736
* how many shard results are accumulated before triggering a partial reduction. Lower values
3837
* mean more frequent reductions and faster streaming, but higher coordinator CPU usage.
39-
*
38+
*
4039
* ClusterSettings must be provided (non-null) to enable dynamic configuration. Tests should
4140
* provide a properly configured ClusterSettings instance rather than null.
4241
*
@@ -45,14 +44,20 @@
4544
public class StreamQueryPhaseResultConsumer extends QueryPhaseResultConsumer {
4645

4746
private static final Logger logger = LogManager.getLogger(StreamQueryPhaseResultConsumer.class);
48-
47+
4948
private final StreamingSearchMode scoringMode;
5049
private final ClusterSettings clusterSettings;
5150
private int resultsReceived = 0;
52-
51+
52+
// TTFB tracking for demonstrating fetch phase timing
53+
private long queryStartTime = System.currentTimeMillis();
54+
private long firstBatchReadyForFetchTime = -1;
55+
private boolean firstBatchReadyForFetch = false;
56+
private final AtomicInteger batchesReduced = new AtomicInteger(0);
57+
5358
/**
5459
* Creates a streaming query phase result consumer.
55-
*
60+
*
5661
* @param clusterSettings cluster settings for dynamic multipliers (must not be null)
5762
*/
5863
public StreamQueryPhaseResultConsumer(
@@ -76,11 +81,11 @@ public StreamQueryPhaseResultConsumer(
7681
expectedResultSize,
7782
onPartialMergeFailure
7883
);
79-
84+
8085
// Initialize scoring mode from request
8186
String mode = request.getStreamingSearchMode();
8287
this.scoringMode = (mode != null) ? StreamingSearchMode.fromString(mode) : StreamingSearchMode.SCORED_SORTED;
83-
88+
8489
// ClusterSettings is required for dynamic configuration
8590
if (clusterSettings == null) {
8691
throw new IllegalArgumentException("ClusterSettings must not be null for StreamQueryPhaseResultConsumer");
@@ -101,7 +106,7 @@ int getBatchReduceSize(int requestBatchedReduceSize, int minBatchReduceSize) {
101106
if (scoringMode == null || clusterSettings == null) {
102107
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 10);
103108
}
104-
109+
105110
switch (scoringMode) {
106111
case NO_SCORING:
107112
// Reduce immediately for fastest TTFB (similar to streaming aggs with low batch size)
@@ -127,7 +132,7 @@ int getBatchReduceSize(int requestBatchedReduceSize, int minBatchReduceSize) {
127132
/**
128133
* Consume streaming results with frequency-based emission
129134
*/
130-
void consumeStreamResult(SearchPhaseResult result, Runnable next) {
135+
public void consumeStreamResult(SearchPhaseResult result, Runnable next) {
131136
QuerySearchResult querySearchResult = result.queryResult();
132137

133138
// Check if already consumed
@@ -138,13 +143,46 @@ void consumeStreamResult(SearchPhaseResult result, Runnable next) {
138143
}
139144

140145
resultsReceived++;
141-
logger.debug("Consumed result #{} from shard {}, partial={}, hasTopDocs={}",
142-
resultsReceived, result.getShardIndex(), querySearchResult.isPartial(),
143-
querySearchResult.topDocs() != null);
144-
146+
147+
// Track when first batch is ready for fetch phase
148+
// Use the batch size that was configured for this mode
149+
int batchSize = getBatchReduceSize(Integer.MAX_VALUE, 5);
150+
if (!firstBatchReadyForFetch && resultsReceived >= batchSize) {
151+
firstBatchReadyForFetch = true;
152+
firstBatchReadyForFetchTime = System.currentTimeMillis();
153+
long ttfb = firstBatchReadyForFetchTime - queryStartTime;
154+
logger.info(
155+
"STREAMING TTFB: First batch ready for fetch after {} ms with {} results (batch size: {})",
156+
ttfb,
157+
resultsReceived,
158+
batchSize
159+
);
160+
}
161+
162+
logger.debug(
163+
"Consumed result #{} from shard {}, partial={}, hasTopDocs={}",
164+
resultsReceived,
165+
result.getShardIndex(),
166+
querySearchResult.isPartial(),
167+
querySearchResult.topDocs() != null
168+
);
169+
145170
// Use parent's pendingMerges to consume the result
146171
// Partial reduces are automatically triggered by batchReduceSize
147172
pendingMerges.consume(querySearchResult, next);
148173
}
149-
}
150174

175+
/**
176+
* Get TTFB metrics for benchmarking
177+
*/
178+
public long getTimeToFirstBatch() {
179+
if (firstBatchReadyForFetchTime > 0) {
180+
return firstBatchReadyForFetchTime - queryStartTime;
181+
}
182+
return -1;
183+
}
184+
185+
public boolean isFirstBatchReady() {
186+
return firstBatchReadyForFetch;
187+
}
188+
}

server/src/main/java/org/opensearch/action/search/StreamSearchQueryThenFetchAsyncAction.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.opensearch.search.SearchPhaseResult;
1616
import org.opensearch.search.SearchShardTarget;
1717
import org.opensearch.search.internal.AliasFilter;
18-
import org.opensearch.search.query.QuerySearchResult;
1918
import org.opensearch.telemetry.tracing.Tracer;
2019
import org.opensearch.transport.Transport;
2120

@@ -148,25 +147,7 @@ protected void onStreamResult(SearchPhaseResult result, SearchShardIterator shar
148147
*/
149148
@Override
150149
protected void onShardResult(SearchPhaseResult result, SearchShardIterator shardIt) {
151-
QuerySearchResult queryResult = result.queryResult();
152-
153-
// For streaming search, if topDocs has already been consumed,
154-
// we need to handle this gracefully to avoid the error
155-
if (queryResult.hasConsumedTopDocs()) {
156-
// This is a streaming result that has already been processed
157-
// We can't call the parent's onShardResult because it will try to access topDocs
158-
// Instead, we'll just mark this as successful and continue
159-
if (getLogger().isDebugEnabled()) {
160-
getLogger().debug(
161-
"Skipping onShardResult for already consumed streaming result from shard {}",
162-
queryResult.getShardIndex()
163-
);
164-
}
165-
// Don't call super.onShardResult() to avoid the error
166-
return;
167-
}
168-
169-
// For normal cases, call the parent method
150+
// Always delegate to the parent to ensure shard accounting and phase transitions.
170151
super.onShardResult(result, shardIt);
171152
}
172153

server/src/main/java/org/opensearch/action/search/StreamingSearchProgressListener.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,43 @@ protected void onPartialReduceWithTopDocs(
104104
}
105105

106106
private void collectPartialResponse(SearchResponse partialResponse) {
107-
if (responseListener instanceof StreamingSearchResponseListener) {
108-
((StreamingSearchResponseListener) responseListener).onPartialResponse(partialResponse);
109-
} else {
110-
logger.debug("Partial result computed, listener type: {}", responseListener.getClass().getSimpleName());
107+
ActionListener<SearchResponse> target = unwrapListener(responseListener, 3);
108+
if (target instanceof StreamingSearchResponseListener) {
109+
((StreamingSearchResponseListener) target).onPartialResponse(partialResponse);
110+
return;
111+
}
112+
logger.debug("Partial result computed, listener type: {}", responseListener.getClass().getSimpleName());
113+
}
114+
115+
/**
116+
* Attempt to unwrap tracing/decorated listeners to reach the original delegate.
117+
* This enables partial emissions to reach StreamingSearchResponseListener even when wrapped.
118+
*/
119+
@SuppressWarnings({ "unchecked", "rawtypes" })
120+
private ActionListener<SearchResponse> unwrapListener(ActionListener<SearchResponse> listener, int depth) {
121+
if (listener == null || depth <= 0) {
122+
return listener;
111123
}
124+
try {
125+
// Best-effort unwrap for TraceableActionListener without direct dependency
126+
Class<?> cls = listener.getClass();
127+
while (cls != null) {
128+
try {
129+
java.lang.reflect.Field delegateField = cls.getDeclaredField("delegate");
130+
delegateField.setAccessible(true);
131+
Object delegate = delegateField.get(listener);
132+
if (delegate instanceof ActionListener) {
133+
return unwrapListener((ActionListener) delegate, depth - 1);
134+
}
135+
break;
136+
} catch (NoSuchFieldException e) {
137+
cls = cls.getSuperclass();
138+
}
139+
}
140+
} catch (Throwable t) {
141+
logger.debug("Failed to unwrap listener: {}", t.toString());
142+
}
143+
return listener;
112144
}
113145

114146
@Override
@@ -133,7 +165,7 @@ public void triggerPartialEmission() {
133165
// Trigger a partial reduce to emit current results
134166
// This will call onPartialReduceWithTopDocs if there are results to emit
135167
logger.debug("Triggering partial emission, current emissions: {}", streamEmissions.get());
136-
168+
137169
// For now, just log that we're triggering emission
138170
// The actual emission will happen when onPartialReduceWithTopDocs is called
139171
// by the parent class's reduce logic

server/src/main/java/org/opensearch/action/search/StreamingSearchResponseListener.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ public StreamingSearchResponseListener(ActionListener<SearchResponse> delegate,
3939
}
4040

4141
/**
42-
* Collect a partial response. Since we can't stream to client,
43-
* we collect them for logging and metadata purposes.
42+
* Collect a partial response and track TTFB.
43+
* Store first partial response time for TTFB measurement.
4444
*/
4545
public void onPartialResponse(SearchResponse partialResponse) {
4646
if (isComplete.get()) {
@@ -54,6 +54,14 @@ public void onPartialResponse(SearchResponse partialResponse) {
5454

5555
partialResponses.add(partialResponse);
5656
logPartialResponse(partialResponse, count);
57+
58+
// Track TTFB - first partial result delivery time
59+
if (count == 1 && partialResponse.getHits() != null) {
60+
int numHits = partialResponse.getHits().getHits().length;
61+
logger.info("TTFB ACHIEVED: First partial result delivered with {} hits", numHits);
62+
// This is where TTFB happens in streaming mode
63+
// Store this timestamp if needed for benchmarking
64+
}
5765
}
5866

5967
/**

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,12 @@
9797
import org.opensearch.transport.RemoteClusterAware;
9898
import org.opensearch.transport.RemoteClusterService;
9999
import org.opensearch.transport.RemoteTransportException;
100-
import org.opensearch.transport.StreamTransportService;
101100
import org.opensearch.transport.Transport;
102101
import org.opensearch.transport.TransportService;
103102
import org.opensearch.transport.client.Client;
104103
import org.opensearch.transport.client.OriginSettingClient;
105104
import org.opensearch.transport.client.node.NodeClient;
106105
import org.opensearch.wlm.WorkloadGroupTask;
107-
import org.opensearch.common.Nullable;
108106

109107
import java.util.ArrayList;
110108
import java.util.Arrays;

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,14 @@
157157
import org.opensearch.rest.BaseRestHandler;
158158
import org.opensearch.script.ScriptService;
159159
import org.opensearch.search.SearchService;
160-
import org.opensearch.search.streaming.StreamingSearchSettings;
161160
import org.opensearch.search.aggregations.MultiBucketConsumerService;
162161
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
163162
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
164163
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
165164
import org.opensearch.search.backpressure.settings.SearchTaskSettings;
166165
import org.opensearch.search.fetch.subphase.highlight.FastVectorHighlighter;
167166
import org.opensearch.search.pipeline.SearchPipelineService;
167+
import org.opensearch.search.streaming.StreamingSearchSettings;
168168
import org.opensearch.snapshots.InternalSnapshotsInfoService;
169169
import org.opensearch.snapshots.SnapshotsService;
170170
import org.opensearch.tasks.TaskCancellationMonitoringSettings;

server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.opensearch.action.search.SearchAction;
3838
import org.opensearch.action.search.SearchContextId;
3939
import org.opensearch.action.search.SearchRequest;
40-
import org.opensearch.action.search.StreamSearchAction;
4140
import org.opensearch.action.support.IndicesOptions;
4241
import org.opensearch.common.Booleans;
4342
import org.opensearch.common.util.FeatureFlags;
@@ -231,6 +230,11 @@ public static void parseSearchRequest(
231230
searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));
232231
searchRequest.pipeline(request.param("search_pipeline", searchRequest.source().pipeline()));
233232

233+
// Add streaming mode support
234+
if (request.hasParam("streaming_mode")) {
235+
searchRequest.setStreamingSearchMode(request.param("streaming_mode"));
236+
}
237+
234238
checkRestTotalHits(request, searchRequest);
235239
request.paramAsBoolean(INCLUDE_NAMED_QUERIES_SCORE_PARAM, false);
236240

0 commit comments

Comments
 (0)