Skip to content

Commit 60ae3eb

Browse files
committed
More alignment and cleanup
Signed-off-by: Atri Sharma <atri.jiit@gmail.com>
1 parent df7ad7b commit 60ae3eb

35 files changed

+2830
-1871
lines changed

server/src/main/java/org/opensearch/action/search/StreamQueryPhaseResultConsumer.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
* Batch reduction frequency is controlled by per-mode multipliers:
2828
* - NO_SCORING: Immediate reduction (batch size = 1) for fastest time-to-first-byte
2929
* - SCORED_UNSORTED: Small batches (minBatchReduceSize * 2)
30-
* - CONFIDENCE_BASED: Moderate batches (minBatchReduceSize * 3)
3130
* - SCORED_SORTED: Larger batches (minBatchReduceSize * 10)
3231
*
3332
* These multipliers are applied to the base batch reduce size (typically 5) to determine
@@ -87,20 +86,19 @@ public StreamQueryPhaseResultConsumer(
8786
@Override
8887
int getBatchReduceSize(int requestBatchedReduceSize, int minBatchReduceSize) {
8988
// Handle null during construction (parent constructor calls this before our constructor body runs)
90-
if (scoringMode == null) {
91-
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 10);
89+
// Use SCORED_UNSORTED as default to match other defaults
90+
StreamingSearchMode mode = scoringMode;
91+
if (mode == null) {
92+
mode = StreamingSearchMode.SCORED_UNSORTED;
9293
}
9394

94-
switch (scoringMode) {
95+
switch (mode) {
9596
case NO_SCORING:
9697
// Reduce immediately for fastest TTFB
9798
return Math.min(requestBatchedReduceSize, 1);
9899
case SCORED_UNSORTED:
99100
// Small batches for quick emission without sorting overhead
100101
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 2);
101-
case CONFIDENCE_BASED:
102-
// Moderate batching for progressive emission with confidence
103-
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 3);
104102
case SCORED_SORTED:
105103
// Higher batch size to collect more results before reducing (sorting is expensive)
106104
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 10);

server/src/main/java/org/opensearch/action/search/StreamingSearchProgressListener.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
/**
2424
* SearchProgressListener implementation for streaming search with scoring.
25-
* Computes partial search results when confidence thresholds are met.
25+
* Computes partial search results at configured intervals and milestones.
2626
*
2727
* @opensearch.internal
2828
*/
@@ -96,7 +96,7 @@ protected void onPartialReduceWithTopDocs(
9696
collectPartialResponse(partialResponse);
9797

9898
int count = streamEmissions.incrementAndGet();
99-
logger.info("Computed streaming partial #{} with {} docs from {} shards", count, topDocs.scoreDocs.length, shards.size());
99+
logger.debug("Computed streaming partial #{} with {} docs from {} shards", count, topDocs.scoreDocs.length, shards.size());
100100

101101
} catch (Exception e) {
102102
logger.error("Failed to send partial TopDocs", e);
@@ -113,7 +113,7 @@ private void collectPartialResponse(SearchResponse partialResponse) {
113113

114114
@Override
115115
protected void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
116-
logger.info(
116+
logger.debug(
117117
"Final reduce: {} total hits from {} shards, {} partial computations",
118118
totalHits.value(),
119119
shards.size(),

server/src/main/java/org/opensearch/action/search/StreamingSearchProgressMessage.java

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ public static Milestone fromProgress(float progress) {
5959
private final long requestId;
6060
private final Milestone milestone;
6161
private final TopDocs topDocs;
62-
private final float confidence;
6362
private final ProgressStatistics statistics;
6463
private final boolean isFinal;
6564

@@ -70,7 +69,6 @@ public StreamingSearchProgressMessage() {
7069
this.requestId = -1;
7170
this.milestone = Milestone.INITIAL;
7271
this.topDocs = null;
73-
this.confidence = 0.0f;
7472
this.statistics = null;
7573
this.isFinal = false;
7674
}
@@ -81,7 +79,6 @@ public StreamingSearchProgressMessage(
8179
long requestId,
8280
Milestone milestone,
8381
TopDocs topDocs,
84-
float confidence,
8582
ProgressStatistics statistics,
8683
boolean isFinal
8784
) {
@@ -90,7 +87,6 @@ public StreamingSearchProgressMessage(
9087
this.requestId = requestId;
9188
this.milestone = milestone;
9289
this.topDocs = topDocs;
93-
this.confidence = confidence;
9490
this.statistics = statistics;
9591
this.isFinal = isFinal;
9692
}
@@ -113,7 +109,6 @@ public StreamingSearchProgressMessage(StreamInput in) throws IOException {
113109
}
114110
this.topDocs = new TopDocs(new TotalHits(totalHits, TotalHits.Relation.EQUAL_TO), scoreDocs);
115111

116-
this.confidence = in.readFloat();
117112
this.statistics = new ProgressStatistics(in);
118113
this.isFinal = in.readBoolean();
119114
}
@@ -133,7 +128,6 @@ public void writeTo(StreamOutput out) throws IOException {
133128
out.writeFloat(doc.score);
134129
}
135130

136-
out.writeFloat(confidence);
137131
statistics.writeTo(out);
138132
out.writeBoolean(isFinal);
139133
}
@@ -158,10 +152,6 @@ public TopDocs getTopDocs() {
158152
return topDocs;
159153
}
160154

161-
public float getConfidence() {
162-
return confidence;
163-
}
164-
165155
public ProgressStatistics getStatistics() {
166156
return statistics;
167157
}
@@ -270,7 +260,6 @@ public static class Builder {
270260
private long requestId;
271261
private Milestone milestone;
272262
private TopDocs topDocs;
273-
private float confidence = 0.0f;
274263
private ProgressStatistics statistics;
275264
private boolean isFinal = false;
276265

@@ -299,11 +288,6 @@ public Builder topDocs(TopDocs topDocs) {
299288
return this;
300289
}
301290

302-
public Builder confidence(float confidence) {
303-
this.confidence = confidence;
304-
return this;
305-
}
306-
307291
public Builder statistics(ProgressStatistics statistics) {
308292
this.statistics = statistics;
309293
return this;
@@ -315,16 +299,7 @@ public Builder isFinal(boolean isFinal) {
315299
}
316300

317301
public StreamingSearchProgressMessage build() {
318-
return new StreamingSearchProgressMessage(
319-
shardTarget,
320-
shardIndex,
321-
requestId,
322-
milestone,
323-
topDocs,
324-
confidence,
325-
statistics,
326-
isFinal
327-
);
302+
return new StreamingSearchProgressMessage(shardTarget, shardIndex, requestId, milestone, topDocs, statistics, isFinal);
328303
}
329304
}
330305
}

server/src/main/java/org/opensearch/action/search/StreamingSearchResponseListener.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void onPartialResponse(SearchResponse partialResponse) {
5858
// Track TTFB - first partial result delivery time
5959
if (count == 1 && partialResponse.getHits() != null) {
6060
int numHits = partialResponse.getHits().getHits().length;
61-
logger.info("First partial result delivered with {} hits", numHits);
61+
logger.debug("First partial result delivered with {} hits", numHits);
6262
}
6363
}
6464

@@ -90,7 +90,7 @@ private void logPartialResponse(SearchResponse partialResponse, int count) {
9090
int numHits = partialResponse.getHits().getHits().length;
9191
long totalHits = partialResponse.getHits().getTotalHits().value();
9292

93-
logger.info("Streaming partial result #{}: {} hits, total: {}", count, numHits, totalHits);
93+
logger.debug("Streaming partial result #{}: {} hits, total: {}", count, numHits, totalHits);
9494

9595
if (logger.isDebugEnabled() && numHits > 0) {
9696
float topScore = partialResponse.getHits().getHits()[0].getScore();
@@ -102,13 +102,13 @@ private void logPartialResponse(SearchResponse partialResponse, int count) {
102102
private void logStreamingSummary(SearchResponse finalResponse) {
103103
int totalPartials = partialCount.get();
104104
if (totalPartials > 0) {
105-
logger.info("Streaming search complete: {} partial computations", totalPartials);
105+
logger.debug("Streaming search complete: {} partial computations", totalPartials);
106106

107107
if (!partialResponses.isEmpty()) {
108108
long totalDocsProcessed = partialResponses.stream()
109109
.mapToLong(r -> r.getHits() != null ? r.getHits().getHits().length : 0)
110110
.sum();
111-
logger.info("Processed {} docs across {} partial emissions", totalDocsProcessed, partialResponses.size());
111+
logger.debug("Processed {} docs across {} partial emissions", totalDocsProcessed, partialResponses.size());
112112
}
113113
} else {
114114
logger.debug("No partial computations performed");

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.opensearch.search.pipeline.SearchPipelineService;
8383
import org.opensearch.search.profile.ProfileShardResult;
8484
import org.opensearch.search.profile.SearchProfileShardResults;
85+
import org.opensearch.search.query.StreamingSearchMode;
8586
import org.opensearch.search.slice.SliceBuilder;
8687
import org.opensearch.tasks.CancellableTask;
8788
import org.opensearch.tasks.Task;
@@ -135,7 +136,8 @@
135136
* @opensearch.internal
136137
*/
137138
public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
138-
// Streaming search integrated via streamingSearchMode in SearchRequest
139+
// Streaming is integrated via SearchAction (TransportSearchAction). When stream transport is available,
140+
// register its request handler here. A separate StreamSearchAction registration is not required.
139141

140142
/** The maximum number of shards for a single search request. */
141143
public static final Setting<Long> SHARD_COUNT_LIMIT_SETTING = Setting.longSetting(
@@ -1269,7 +1271,13 @@ AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction(
12691271
tracer
12701272
);
12711273
} else {
1272-
final boolean isStreamingRequest = searchRequest.getStreamingSearchMode() != null
1274+
// Set default streaming mode if flag is present but mode is null
1275+
if (searchRequest.isStreamingScoring() && searchRequest.getStreamingSearchMode() == null) {
1276+
searchRequest.setStreamingSearchMode(StreamingSearchMode.SCORED_UNSORTED.toString());
1277+
}
1278+
1279+
// Consider streaming if either flag is set or mode is specified
1280+
final boolean isStreamingRequest = (searchRequest.isStreamingScoring() || searchRequest.getStreamingSearchMode() != null)
12731281
&& (searchRequest.source() == null || searchRequest.source().size() > 0);
12741282

12751283
final SearchProgressListener progressListener = isStreamingRequest

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -821,7 +821,7 @@ private SearchPhaseResult executeQueryPhase(
821821
// FALLBACK: If this is a streaming search but no mode is set, use a default mode
822822
// This handles the case where ShardSearchRequest is created without copying streaming fields
823823
if (isStreamSearch) {
824-
context.setStreamingMode(StreamingSearchMode.NO_SCORING);
824+
context.setStreamingMode(StreamingSearchMode.SCORED_UNSORTED);
825825
}
826826
}
827827
}

server/src/main/java/org/opensearch/search/builder/StreamingSearchParameters.java

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,13 @@ public class StreamingSearchParameters implements Writeable, ToXContent {
2626

2727
public static final String STREAMING_FIELD = "streaming";
2828
public static final String ENABLED_FIELD = "enabled";
29-
public static final String CONFIDENCE_FIELD = "confidence";
3029
public static final String BATCH_SIZE_FIELD = "batch_size";
3130
public static final String EMISSION_INTERVAL_FIELD = "emission_interval";
3231
public static final String MIN_DOCS_FIELD = "min_docs";
3332
public static final String ADAPTIVE_BATCHING_FIELD = "adaptive_batching";
3433
public static final String MILESTONES_FIELD = "milestones";
3534

3635
private boolean enabled = false;
37-
private float initialConfidence = 0.99f;
3836
private int batchSize = 10;
3937
private int emissionIntervalMillis = 100;
4038
private int minDocsForStreaming = 5;
@@ -45,7 +43,6 @@ public StreamingSearchParameters() {}
4543

4644
public StreamingSearchParameters(StreamInput in) throws IOException {
4745
this.enabled = in.readBoolean();
48-
this.initialConfidence = in.readFloat();
4946
this.batchSize = in.readVInt();
5047
this.emissionIntervalMillis = in.readVInt();
5148
this.minDocsForStreaming = in.readVInt();
@@ -56,7 +53,6 @@ public StreamingSearchParameters(StreamInput in) throws IOException {
5653
@Override
5754
public void writeTo(StreamOutput out) throws IOException {
5855
out.writeBoolean(enabled);
59-
out.writeFloat(initialConfidence);
6056
out.writeVInt(batchSize);
6157
out.writeVInt(emissionIntervalMillis);
6258
out.writeVInt(minDocsForStreaming);
@@ -68,7 +64,6 @@ public void writeTo(StreamOutput out) throws IOException {
6864
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
6965
builder.startObject(STREAMING_FIELD);
7066
builder.field(ENABLED_FIELD, enabled);
71-
builder.field(CONFIDENCE_FIELD, initialConfidence);
7267
builder.field(BATCH_SIZE_FIELD, batchSize);
7368
builder.field(EMISSION_INTERVAL_FIELD, emissionIntervalMillis);
7469
builder.field(MIN_DOCS_FIELD, minDocsForStreaming);
@@ -90,8 +85,6 @@ public static StreamingSearchParameters fromXContent(XContentParser parser) thro
9085
} else if (token.isValue()) {
9186
if (ENABLED_FIELD.equals(currentFieldName)) {
9287
params.enabled = parser.booleanValue();
93-
} else if (CONFIDENCE_FIELD.equals(currentFieldName)) {
94-
params.initialConfidence = parser.floatValue();
9588
} else if (BATCH_SIZE_FIELD.equals(currentFieldName)) {
9689
params.batchSize = parser.intValue();
9790
} else if (EMISSION_INTERVAL_FIELD.equals(currentFieldName)) {
@@ -120,18 +113,6 @@ public StreamingSearchParameters enabled(boolean enabled) {
120113
return this;
121114
}
122115

123-
public float getInitialConfidence() {
124-
return initialConfidence;
125-
}
126-
127-
public StreamingSearchParameters initialConfidence(float confidence) {
128-
if (confidence <= 0.0f || confidence > 1.0f) {
129-
throw new IllegalArgumentException("Confidence must be between 0 and 1");
130-
}
131-
this.initialConfidence = confidence;
132-
return this;
133-
}
134-
135116
public int getBatchSize() {
136117
return batchSize;
137118
}
@@ -192,7 +173,6 @@ public boolean equals(Object o) {
192173
if (o == null || getClass() != o.getClass()) return false;
193174
StreamingSearchParameters that = (StreamingSearchParameters) o;
194175
return enabled == that.enabled
195-
&& Float.compare(that.initialConfidence, initialConfidence) == 0
196176
&& batchSize == that.batchSize
197177
&& emissionIntervalMillis == that.emissionIntervalMillis
198178
&& minDocsForStreaming == that.minDocsForStreaming
@@ -202,24 +182,14 @@ public boolean equals(Object o) {
202182

203183
@Override
204184
public int hashCode() {
205-
return Objects.hash(
206-
enabled,
207-
initialConfidence,
208-
batchSize,
209-
emissionIntervalMillis,
210-
minDocsForStreaming,
211-
adaptiveBatching,
212-
useMilestones
213-
);
185+
return Objects.hash(enabled, batchSize, emissionIntervalMillis, minDocsForStreaming, adaptiveBatching, useMilestones);
214186
}
215187

216188
@Override
217189
public String toString() {
218190
return "StreamingSearchParameters{"
219191
+ "enabled="
220192
+ enabled
221-
+ ", confidence="
222-
+ initialConfidence
223193
+ ", batchSize="
224194
+ batchSize
225195
+ ", emissionInterval="

server/src/main/java/org/opensearch/search/internal/SearchContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,4 +580,16 @@ public boolean isStreamingSearch() {
580580
public int getStreamingBatchSize() {
581581
return 10;
582582
}
583+
584+
public org.opensearch.common.unit.TimeValue getStreamingTimeInterval() {
585+
return org.opensearch.common.unit.TimeValue.timeValueMillis(100);
586+
}
587+
588+
public Boolean getStreamingFirstHitImmediate() {
589+
return true;
590+
}
591+
592+
public Boolean getStreamingEnableCoalescing() {
593+
return true;
594+
}
583595
}

0 commit comments

Comments
 (0)