Skip to content

Commit 0f5c5c2

Browse files
Atri SharmaAtri Sharma
authored andcommitted
Revert SearchProgressListener API break for source compatibility
1 parent 737d8a4 commit 0f5c5c2

28 files changed

+91
-1569
lines changed

server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 1 addition & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -287,18 +287,7 @@ private ReduceResult partialReduce(
287287
SearchShardTarget target = result.getSearchShardTarget();
288288
processedShards.add(new SearchShard(target.getClusterAlias(), target.getShardId()));
289289
}
290-
// For streaming search with TopDocs, use the new notification method
291-
if (hasTopDocs && newTopDocs != null) {
292-
progressListener.notifyPartialReduceWithTopDocs(
293-
processedShards,
294-
topDocsStats.getTotalHits(),
295-
newTopDocs,
296-
newAggs,
297-
numReducePhases
298-
);
299-
} else {
300-
progressListener.notifyPartialReduce(processedShards, topDocsStats.getTotalHits(), newAggs, numReducePhases);
301-
}
290+
progressListener.notifyPartialReduce(processedShards, topDocsStats.getTotalHits(), newAggs, numReducePhases);
302291
// we leave the results un-serialized because serializing is slow but we compute the serialized
303292
// size as an estimate of the memory used by the newly reduced aggregations.
304293
long serializedSize = hasAggs ? newAggs.getSerializedSize() : 0;
@@ -434,87 +423,6 @@ void consume(QuerySearchResult result, Runnable callback) {
434423
}
435424
}
436425

437-
/**
438-
* Performs a transient, non-destructive reduction of the current state and an optional partial result,
439-
* and notifies the listener. This is used for coordinated streaming search.
440-
*/
441-
synchronized void notifySnapshot(QuerySearchResult partial) {
442-
checkCancellation();
443-
if (hasFailure()) {
444-
return;
445-
}
446-
447-
List<TopDocs> topDocsList = new ArrayList<>();
448-
List<InternalAggregations> aggsList = new ArrayList<>();
449-
List<SearchShard> snapshotShards = new ArrayList<>(emptyResults);
450-
451-
if (reduceResult != null) {
452-
if (reduceResult.reducedTopDocs != null) {
453-
topDocsList.add(reduceResult.reducedTopDocs);
454-
}
455-
if (reduceResult.reducedAggs != null) {
456-
aggsList.add(reduceResult.reducedAggs);
457-
}
458-
snapshotShards.addAll(reduceResult.processedShards);
459-
}
460-
461-
for (QuerySearchResult result : buffer) {
462-
if (result.hasTopDocs()) {
463-
TopDocs td = result.topDocs().topDocs;
464-
SearchPhaseController.setShardIndex(td, result.getShardIndex());
465-
topDocsList.add(td);
466-
}
467-
if (result.hasAggs()) {
468-
aggsList.add(result.aggregations().expand());
469-
}
470-
SearchShardTarget target = result.getSearchShardTarget();
471-
snapshotShards.add(new SearchShard(target.getClusterAlias(), target.getShardId()));
472-
}
473-
474-
TotalHits snapshotTotalHits = topDocsStats.getTotalHits();
475-
476-
if (partial != null) {
477-
if (partial.hasTopDocs()) {
478-
TopDocs td = partial.topDocs().topDocs;
479-
SearchPhaseController.setShardIndex(td, partial.getShardIndex());
480-
topDocsList.add(td);
481-
long val = snapshotTotalHits == null ? 0 : snapshotTotalHits.value();
482-
val += td.totalHits.value();
483-
snapshotTotalHits = new TotalHits(
484-
val,
485-
snapshotTotalHits == null ? td.totalHits.relation() : snapshotTotalHits.relation()
486-
);
487-
}
488-
if (partial.hasAggs()) {
489-
aggsList.add(partial.aggregations().expand());
490-
}
491-
SearchShardTarget target = partial.getSearchShardTarget();
492-
snapshotShards.add(new SearchShard(target.getClusterAlias(), target.getShardId()));
493-
}
494-
495-
TopDocs mergedTopDocs = null;
496-
if (hasTopDocs && topDocsList.isEmpty() == false) {
497-
mergedTopDocs = SearchPhaseController.mergeTopDocs(topDocsList, topNSize, 0);
498-
}
499-
500-
InternalAggregations mergedAggs = null;
501-
if (hasAggs && aggsList.isEmpty() == false) {
502-
mergedAggs = InternalAggregations.topLevelReduce(aggsList, aggReduceContextBuilder.forPartialReduction());
503-
}
504-
505-
if (hasTopDocs && mergedTopDocs != null) {
506-
progressListener.notifyPartialReduceWithTopDocs(
507-
snapshotShards,
508-
snapshotTotalHits,
509-
mergedTopDocs,
510-
mergedAggs,
511-
numReducePhases
512-
);
513-
} else {
514-
progressListener.notifyPartialReduce(snapshotShards, snapshotTotalHits, mergedAggs, numReducePhases);
515-
}
516-
}
517-
518426
private synchronized boolean consumeResult(QuerySearchResult result, Runnable callback) {
519427
if (hasFailure()) {
520428
result.consumeAll(); // release memory

server/src/main/java/org/opensearch/action/search/SearchRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
129129
private Boolean phaseTook = null;
130130

131131
private boolean streamingScoring = false;
132-
private String streamingSearchMode = null; // Will use StreamingSearchMode.SCORED_UNSORTED if null
132+
private String streamingSearchMode = null; // Will use StreamingSearchMode.NO_SCORING if null
133133

134134
public SearchRequest() {
135135
this.localClusterAlias = null;

server/src/main/java/org/opensearch/action/search/SearchResponse.java

Lines changed: 0 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,6 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
9191
private static final ParseField TIMED_OUT = new ParseField("timed_out");
9292
private static final ParseField TERMINATED_EARLY = new ParseField("terminated_early");
9393
private static final ParseField NUM_REDUCE_PHASES = new ParseField("num_reduce_phases");
94-
95-
// Streaming REST fields
96-
private static final ParseField IS_PARTIAL = new ParseField("is_partial");
97-
private static final ParseField SEQUENCE_NUMBER = new ParseField("sequence_number");
98-
private static final ParseField TOTAL_PARTIALS = new ParseField("total_partials");
9994

10095
private final SearchResponseSections internalResponse;
10196
private final String scrollId;
@@ -108,11 +103,6 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
108103
private final long tookInMillis;
109104
private final PhaseTook phaseTook;
110105

111-
// Fields for streaming responses
112-
private boolean isPartial = false;
113-
private int sequenceNumber = 0;
114-
private int totalPartials = 0;
115-
116106
public SearchResponse(StreamInput in) throws IOException {
117107
super(in);
118108
internalResponse = new InternalSearchResponse(in);
@@ -137,12 +127,6 @@ public SearchResponse(StreamInput in) throws IOException {
137127
}
138128
skippedShards = in.readVInt();
139129
pointInTimeId = in.readOptionalString();
140-
// Read streaming fields
141-
if (in.getVersion().onOrAfter(Version.V_2_15_0)) {
142-
isPartial = in.readBoolean();
143-
sequenceNumber = in.readVInt();
144-
totalPartials = in.readVInt();
145-
}
146130
}
147131

148132
public SearchResponse(
@@ -318,31 +302,6 @@ public String getScrollId() {
318302
return scrollId;
319303
}
320304

321-
// Streaming response methods
322-
public boolean isPartial() {
323-
return isPartial;
324-
}
325-
326-
public void setPartial(boolean partial) {
327-
this.isPartial = partial;
328-
}
329-
330-
public int getSequenceNumber() {
331-
return sequenceNumber;
332-
}
333-
334-
public void setSequenceNumber(int sequenceNumber) {
335-
this.sequenceNumber = sequenceNumber;
336-
}
337-
338-
public int getTotalPartials() {
339-
return totalPartials;
340-
}
341-
342-
public void setTotalPartials(int totalPartials) {
343-
this.totalPartials = totalPartials;
344-
}
345-
346305
/**
347306
* Returns the encoded string of the search context that the search request is used to executed
348307
*/
@@ -397,16 +356,6 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
397356
builder.field(NUM_REDUCE_PHASES.getPreferredName(), getNumReducePhases());
398357
}
399358

400-
// Write streaming fields if it's a streaming partial
401-
// or if we have sequenced streaming responses
402-
if (isPartial) {
403-
builder.field(IS_PARTIAL.getPreferredName(), isPartial);
404-
builder.field(SEQUENCE_NUMBER.getPreferredName(), sequenceNumber);
405-
if (totalPartials > 0) {
406-
builder.field(TOTAL_PARTIALS.getPreferredName(), totalPartials);
407-
}
408-
}
409-
410359
RestActions.buildBroadcastShardsHeader(
411360
builder,
412361
params,
@@ -445,9 +394,6 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
445394
int skippedShards = 0; // 0 for BWC
446395
String scrollId = null;
447396
String searchContextId = null;
448-
boolean isPartial = false;
449-
int sequenceNumber = 0;
450-
int totalPartials = 0;
451397
List<ShardSearchFailure> failures = new ArrayList<>();
452398
Clusters clusters = Clusters.EMPTY;
453399
List<SearchExtBuilder> extBuilders = new ArrayList<>();
@@ -468,12 +414,6 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
468414
terminatedEarly = parser.booleanValue();
469415
} else if (NUM_REDUCE_PHASES.match(currentFieldName, parser.getDeprecationHandler())) {
470416
numReducePhases = parser.intValue();
471-
} else if (IS_PARTIAL.match(currentFieldName, parser.getDeprecationHandler())) {
472-
isPartial = parser.booleanValue();
473-
} else if (SEQUENCE_NUMBER.match(currentFieldName, parser.getDeprecationHandler())) {
474-
sequenceNumber = parser.intValue();
475-
} else if (TOTAL_PARTIALS.match(currentFieldName, parser.getDeprecationHandler())) {
476-
totalPartials = parser.intValue();
477417
} else {
478418
parser.skipChildren();
479419
}
@@ -614,22 +554,6 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
614554
clusters,
615555
searchContextId
616556
);
617-
SearchResponse response = new SearchResponse(
618-
internalResponse,
619-
scrollId,
620-
totalShards,
621-
successfulShards,
622-
skippedShards,
623-
tookInMillis,
624-
phaseTook,
625-
failures.toArray(ShardSearchFailure.EMPTY_ARRAY),
626-
clusters,
627-
searchContextId
628-
);
629-
response.setPartial(isPartial);
630-
response.setSequenceNumber(sequenceNumber);
631-
response.setTotalPartials(totalPartials);
632-
return response;
633557
}
634558

635559
@Override
@@ -650,13 +574,6 @@ public void writeTo(StreamOutput out) throws IOException {
650574
}
651575
out.writeVInt(skippedShards);
652576
out.writeOptionalString(pointInTimeId);
653-
654-
// Write streaming fields
655-
if (out.getVersion().onOrAfter(Version.V_2_15_0)) {
656-
out.writeBoolean(isPartial);
657-
out.writeVInt(sequenceNumber);
658-
out.writeVInt(totalPartials);
659-
}
660577
}
661578

662579
@Override

server/src/main/java/org/opensearch/action/search/StreamQueryPhaseResultConsumer.java

Lines changed: 6 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -10,43 +10,19 @@
1010

1111
import org.opensearch.core.common.breaker.CircuitBreaker;
1212
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
13-
import org.opensearch.search.SearchPhaseResult;
14-
import org.opensearch.search.query.QuerySearchResult;
15-
import org.opensearch.search.query.StreamingSearchMode;
1613

1714
import java.util.concurrent.Executor;
18-
import java.util.concurrent.atomic.AtomicInteger;
1915
import java.util.function.Consumer;
2016

2117
/**
2218
* Query phase result consumer for streaming search.
23-
* Supports progressive batch reduction with configurable scoring modes.
24-
*
25-
* Batch reduction frequency is controlled by per-mode multipliers:
26-
* - NO_SCORING: Immediate reduction (batch size = 1) for fastest
27-
* time-to-first-byte
28-
* - SCORED_UNSORTED: Small batches (minBatchReduceSize * 2)
29-
* - SCORED_SORTED: Larger batches (minBatchReduceSize * 10)
30-
*
31-
* These multipliers are applied to the base batch reduce size (typically 5) to
32-
* determine
33-
* how many shard results are accumulated before triggering a partial reduction.
34-
* Lower values
35-
* mean more frequent reductions and faster streaming, but higher coordinator
36-
* CPU usage.
19+
* Supports progressive batch reduction with a fixed batch policy suitable for unsorted streaming.
3720
*
3821
* @opensearch.internal
3922
*/
4023
public class StreamQueryPhaseResultConsumer extends QueryPhaseResultConsumer {
4124

42-
private final StreamingSearchMode scoringMode;
43-
private int resultsReceived = 0;
4425

45-
// TTFB tracking for demonstrating fetch phase timing
46-
private long queryStartTime = System.currentTimeMillis();
47-
private long firstBatchReadyForFetchTime = -1;
48-
private boolean firstBatchReadyForFetch = false;
49-
private final AtomicInteger batchesReduced = new AtomicInteger(0);
5026

5127
/**
5228
* Creates a streaming query phase result consumer.
@@ -71,80 +47,20 @@ public StreamQueryPhaseResultConsumer(
7147
expectedResultSize,
7248
onPartialMergeFailure
7349
);
74-
75-
// Initialize scoring mode from request
76-
String mode = request.getStreamingSearchMode();
77-
this.scoringMode = (mode != null) ? StreamingSearchMode.fromString(mode) : StreamingSearchMode.SCORED_SORTED;
7850
}
7951

8052
/**
81-
* Controls partial reduction frequency based on scoring mode.
53+
* Controls partial reduction frequency.
54+
* With NO_SCORING streaming, we reduce immediately for fastest TTFB.
8255
*
8356
* @param requestBatchedReduceSize request batch size
8457
* @param minBatchReduceSize minimum batch size
8558
*/
8659
@Override
8760
int getBatchReduceSize(int requestBatchedReduceSize, int minBatchReduceSize) {
88-
// Handle null during construction (parent constructor calls this before our
89-
// constructor body runs)
90-
if (scoringMode == null) {
91-
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 10);
92-
}
93-
94-
switch (scoringMode) {
95-
case NO_SCORING:
96-
// Reduce immediately for fastest TTFB
97-
return Math.min(requestBatchedReduceSize, 1);
98-
case SCORED_UNSORTED:
99-
// Small batches for quick emission without sorting overhead
100-
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 2);
101-
case SCORED_SORTED:
102-
// Higher batch size to collect more results before reducing (sorting is
103-
// expensive)
104-
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 10);
105-
default:
106-
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 10);
107-
}
108-
}
109-
110-
/**
111-
* Consume streaming results with frequency-based emission
112-
*/
113-
114-
@Override
115-
public void consumeResult(SearchPhaseResult result, Runnable next) {
116-
// Handle partial results (shard progress updates)
117-
if (result.queryResult() != null && result.queryResult().isPartial()) {
118-
// Coordinated snapshot: merge this partial result with current final results
119-
// for immediate emission without saving the partial result in the reducer state.
120-
QuerySearchResult queryResult = result.queryResult();
121-
pendingReduces.notifySnapshot(queryResult);
122-
123-
// Continue the pipeline
124-
next.run();
125-
return;
126-
}
127-
128-
// Handle final shard results
129-
super.consumeResult(result, () -> {
130-
// Once a final result is added to the reducer (via super.consumeResult),
131-
// trigger a snapshot emission to show the new global state immediately.
132-
pendingReduces.notifySnapshot(null);
133-
next.run();
134-
});
61+
// Reduce immediately for fastest TTFB
62+
return Math.min(requestBatchedReduceSize, 1);
13563
}
64+
}
13665

137-
/**
138-
* Get TTFB metrics for benchmarking
139-
*/
140-
public long getTimeToFirstBatch() {
141-
if (firstBatchReadyForFetchTime > 0) {
142-
return firstBatchReadyForFetchTime - queryStartTime;
143-
}
144-
return -1;
145-
}
14666

147-
public boolean isFirstBatchReady() {
148-
return firstBatchReadyForFetch;
149-
}
150-
}

0 commit comments

Comments
 (0)