Skip to content

Commit 5fa2ad7

Browse files
committed
Cleanup
Signed-off-by: Atri Sharma <atri.jiit@gmail.com>
1 parent 7abf2f1 commit 5fa2ad7

24 files changed

+392
-1992
lines changed

server/src/main/java/org/opensearch/action/search/SearchPhaseController.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.lucene.search.TotalHits.Relation;
4949
import org.apache.lucene.search.grouping.CollapseTopFieldDocs;
5050
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
51+
import org.opensearch.common.settings.ClusterSettings;
5152
import org.opensearch.core.common.breaker.CircuitBreaker;
5253
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
5354
import org.opensearch.index.fielddata.IndexFieldData;
@@ -783,7 +784,7 @@ QueryPhaseResultConsumer newSearchPhaseResults(
783784
int numShards,
784785
Consumer<Exception> onPartialMergeFailure
785786
) {
786-
return newSearchPhaseResults(executor, circuitBreaker, listener, request, numShards, onPartialMergeFailure, () -> false);
787+
return newSearchPhaseResults(executor, circuitBreaker, listener, request, numShards, onPartialMergeFailure, () -> false, null);
787788
}
788789

789790
/**
@@ -797,12 +798,29 @@ QueryPhaseResultConsumer newSearchPhaseResults(
797798
int numShards,
798799
Consumer<Exception> onPartialMergeFailure,
799800
BooleanSupplier isTaskCancelled
801+
) {
802+
return newSearchPhaseResults(executor, circuitBreaker, listener, request, numShards, onPartialMergeFailure, isTaskCancelled, null);
803+
}
804+
805+
/**
806+
* Returns a new {@link QueryPhaseResultConsumer} instance that reduces search responses incrementally.
807+
*/
808+
QueryPhaseResultConsumer newSearchPhaseResults(
809+
Executor executor,
810+
CircuitBreaker circuitBreaker,
811+
SearchProgressListener listener,
812+
SearchRequest request,
813+
int numShards,
814+
Consumer<Exception> onPartialMergeFailure,
815+
BooleanSupplier isTaskCancelled,
816+
ClusterSettings clusterSettings
800817
) {
801818
// Check if this is a streaming search request
802819
String streamingMode = request.getStreamingSearchMode();
803-
logger.info("🔍 STREAMING: SearchPhaseController - request.getStreamingSearchMode() = {}", streamingMode);
820+
if (logger.isDebugEnabled()) {
821+
logger.debug("Streaming mode on request: {}", streamingMode);
822+
}
804823
if (streamingMode != null) {
805-
logger.info("🔍 STREAMING: Creating StreamQueryPhaseResultConsumer for mode: {}", streamingMode);
806824
return new StreamQueryPhaseResultConsumer(
807825
request,
808826
executor,
@@ -811,7 +829,8 @@ QueryPhaseResultConsumer newSearchPhaseResults(
811829
listener,
812830
namedWriteableRegistry,
813831
numShards,
814-
onPartialMergeFailure
832+
onPartialMergeFailure,
833+
clusterSettings
815834
);
816835
} else {
817836
// Regular QueryPhaseResultConsumer
@@ -848,7 +867,8 @@ StreamQueryPhaseResultConsumer newStreamSearchPhaseResults(
848867
listener,
849868
namedWriteableRegistry,
850869
numShards,
851-
onPartialMergeFailure
870+
onPartialMergeFailure,
871+
null // No ClusterSettings in this legacy path
852872
);
853873
}
854874

server/src/main/java/org/opensearch/action/search/StreamQueryPhaseResultConsumer.java

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.settings.ClusterSettings;
1314
import org.opensearch.core.common.breaker.CircuitBreaker;
1415
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
16+
import org.opensearch.search.streaming.StreamingSearchSettings;
1517
import org.opensearch.search.SearchPhaseResult;
1618
import org.opensearch.search.query.QuerySearchResult;
1719
import org.opensearch.search.query.StreamingSearchMode;
@@ -22,10 +24,8 @@
2224
import java.util.function.Consumer;
2325

2426
/**
25-
* Streaming query phase result consumer that follows the same pattern as streaming aggregations.
26-
*
27-
* Just like streaming aggregations can emit partial results multiple times,
28-
* this consumer enables streaming search results with different scoring modes.
27+
* Query phase result consumer for streaming search.
28+
* Supports progressive batch reduction with configurable scoring modes.
2929
*
3030
* @opensearch.internal
3131
*/
@@ -34,6 +34,7 @@ public class StreamQueryPhaseResultConsumer extends QueryPhaseResultConsumer {
3434
private static final Logger logger = LogManager.getLogger(StreamQueryPhaseResultConsumer.class);
3535

3636
private final StreamingSearchMode scoringMode;
37+
private final ClusterSettings clusterSettings;
3738
private int resultsReceived = 0;
3839

3940
public StreamQueryPhaseResultConsumer(
@@ -44,7 +45,8 @@ public StreamQueryPhaseResultConsumer(
4445
SearchProgressListener progressListener,
4546
NamedWriteableRegistry namedWriteableRegistry,
4647
int expectedResultSize,
47-
Consumer<Exception> onPartialMergeFailure
48+
Consumer<Exception> onPartialMergeFailure,
49+
ClusterSettings clusterSettings
4850
) {
4951
super(
5052
request,
@@ -59,51 +61,50 @@ public StreamQueryPhaseResultConsumer(
5961

6062
// Initialize scoring mode from request
6163
String mode = request.getStreamingSearchMode();
62-
logger.info("🔍 STREAMING: StreamQueryPhaseResultConsumer constructor - request.getStreamingSearchMode() = {}", mode);
6364
this.scoringMode = (mode != null) ? StreamingSearchMode.fromString(mode) : StreamingSearchMode.SCORED_SORTED;
64-
logger.info("🔍 STREAMING: StreamQueryPhaseResultConsumer constructor - scoringMode set to {}", this.scoringMode);
65+
this.clusterSettings = clusterSettings;
6566
}
6667

6768
/**
68-
* Controls how often we trigger partial reductions based on scoring mode.
69-
* This is the same pattern used by streaming aggregations.
69+
* Controls partial reduction frequency based on scoring mode.
7070
*
71-
* @param minBatchReduceSize: pass as number of shard
71+
* @param requestBatchedReduceSize request batch size
72+
* @param minBatchReduceSize minimum batch size
7273
*/
7374
@Override
7475
int getBatchReduceSize(int requestBatchedReduceSize, int minBatchReduceSize) {
7576
// Handle null during construction (parent constructor calls this before our constructor body runs)
7677
if (scoringMode == null) {
77-
logger.warn("⚠️ STREAMING: scoringMode is null, using fallback batch size");
7878
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 10);
7979
}
8080

81-
int batchSize;
8281
switch (scoringMode) {
8382
case NO_SCORING:
8483
// Reduce immediately for fastest TTFB (similar to streaming aggs with low batch size)
85-
batchSize = Math.min(requestBatchedReduceSize, 1);
86-
logger.info("🎯 STREAMING: NO_SCORING mode: using batch size {} for fastest TTFB", batchSize);
87-
return batchSize;
84+
return Math.min(requestBatchedReduceSize, 1);
8885
case SCORED_UNSORTED:
8986
// Small batches for quick emission without sorting overhead
90-
batchSize = super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 2);
91-
logger.info("🎯 STREAMING: SCORED_UNSORTED mode: using batch size {} for quick emission", batchSize);
92-
return batchSize;
87+
int suMult = clusterSettings != null
88+
? clusterSettings.get(StreamingSearchSettings.STREAMING_SCORED_UNSORTED_BATCH_MULTIPLIER)
89+
: 2;
90+
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * suMult);
9391
case CONFIDENCE_BASED:
9492
// Moderate batching for progressive emission with confidence
95-
batchSize = super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 3);
96-
logger.info("🎯 STREAMING: CONFIDENCE_BASED mode: using batch size {} for progressive emission", batchSize);
97-
return batchSize;
93+
int cMult = clusterSettings != null
94+
? clusterSettings.get(StreamingSearchSettings.STREAMING_CONFIDENCE_BATCH_MULTIPLIER)
95+
: 3;
96+
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * cMult);
9897
case SCORED_SORTED:
9998
// Higher batch size to collect more results before reducing (sorting is expensive)
100-
batchSize = super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 10);
101-
logger.info("🎯 STREAMING: SCORED_SORTED mode: using batch size {} for sorting efficiency", batchSize);
102-
return batchSize;
99+
int ssMult = clusterSettings != null
100+
? clusterSettings.get(StreamingSearchSettings.STREAMING_SCORED_SORTED_BATCH_MULTIPLIER)
101+
: 10;
102+
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * ssMult);
103103
default:
104-
batchSize = super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 10);
105-
logger.warn("⚠️ STREAMING: Unknown mode {}, using default batch size {}", scoringMode, batchSize);
106-
return batchSize;
104+
int defMult = clusterSettings != null
105+
? clusterSettings.get(StreamingSearchSettings.STREAMING_SCORED_SORTED_BATCH_MULTIPLIER)
106+
: 10;
107+
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * defMult);
107108
}
108109
}
109110

@@ -121,15 +122,13 @@ void consumeStreamResult(SearchPhaseResult result, Runnable next) {
121122
}
122123

123124
resultsReceived++;
124-
logger.info("🎯 STREAMING: Consumed result #{} from shard {}, partial={}, hasTopDocs={}",
125+
logger.debug("Consumed result #{} from shard {}, partial={}, hasTopDocs={}",
125126
resultsReceived, result.getShardIndex(), querySearchResult.isPartial(),
126127
querySearchResult.topDocs() != null);
127128

128129
// Use parent's pendingMerges to consume the result
129130
// Partial reduces are automatically triggered by batchReduceSize
130-
logger.info("🔄 STREAMING: About to consume result in pendingMerges");
131131
pendingMerges.consume(querySearchResult, next);
132-
logger.info("✅ STREAMING: Result consumed in pendingMerges");
133132
}
134133
}
135134

server/src/main/java/org/opensearch/action/search/StreamSearchQueryThenFetchAsyncAction.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,7 @@ SearchActionListener<SearchPhaseResult> createShardActionListener(
9797
protected void innerOnStreamResponse(SearchPhaseResult result) {
9898
try {
9999
int count = streamResultsReceived.incrementAndGet();
100-
logger.info("🚀 STREAMING: Received streaming result #{} from shard {}, partial={}",
101-
count, result.getShardIndex(), result.queryResult().isPartial());
102100
onStreamResult(result, shardIt, () -> successfulStreamExecution());
103-
logger.info("✅ STREAMING: Processed streaming result #{} from shard {}", count, result.getShardIndex());
104101
} finally {
105102
executeNext(pendingExecutions, thread);
106103
}

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1284,7 +1284,8 @@ AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction(
12841284
searchRequest,
12851285
shardIterators.size(),
12861286
exc -> cancelTask(task, exc),
1287-
task::isCancelled
1287+
task::isCancelled,
1288+
clusterService.getClusterSettings()
12881289
);
12891290
AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction;
12901291
switch (searchRequest.searchType()) {

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@
157157
import org.opensearch.rest.BaseRestHandler;
158158
import org.opensearch.script.ScriptService;
159159
import org.opensearch.search.SearchService;
160+
import org.opensearch.search.streaming.StreamingSearchSettings;
160161
import org.opensearch.search.aggregations.MultiBucketConsumerService;
161162
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
162163
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
@@ -449,6 +450,35 @@ public void apply(Settings value, Settings current, Settings previous) {
449450
NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING,
450451
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING,
451452
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING,
453+
// Streaming search settings
454+
StreamingSearchSettings.STREAMING_SEARCH_ENABLED,
455+
StreamingSearchSettings.STREAMING_SEARCH_ENABLED_FOR_EXPENSIVE_QUERIES,
456+
StreamingSearchSettings.STREAMING_BLOCK_SIZE,
457+
StreamingSearchSettings.STREAMING_BATCH_SIZE,
458+
StreamingSearchSettings.STREAMING_NO_SCORING_BATCH_MULTIPLIER,
459+
StreamingSearchSettings.STREAMING_SCORED_UNSORTED_BATCH_MULTIPLIER,
460+
StreamingSearchSettings.STREAMING_CONFIDENCE_BATCH_MULTIPLIER,
461+
StreamingSearchSettings.STREAMING_SCORED_SORTED_BATCH_MULTIPLIER,
462+
StreamingSearchSettings.STREAMING_EMISSION_INTERVAL,
463+
StreamingSearchSettings.STREAMING_INITIAL_CONFIDENCE,
464+
StreamingSearchSettings.STREAMING_CONFIDENCE_DECAY_RATE,
465+
StreamingSearchSettings.STREAMING_MIN_CONFIDENCE,
466+
StreamingSearchSettings.STREAMING_MIN_DOCS_FOR_STREAMING,
467+
StreamingSearchSettings.STREAMING_MIN_SHARD_RESPONSE_RATIO,
468+
StreamingSearchSettings.STREAMING_OUTLIER_THRESHOLD_SIGMA,
469+
StreamingSearchSettings.STREAMING_MAX_BUFFER_SIZE,
470+
StreamingSearchSettings.STREAMING_MAX_CONCURRENT_STREAMS,
471+
StreamingSearchSettings.STREAMING_CLIENT_TIMEOUT,
472+
StreamingSearchSettings.STREAMING_COMPRESSION_ENABLED,
473+
StreamingSearchSettings.STREAMING_CIRCUIT_BREAKER_LIMIT,
474+
StreamingSearchSettings.STREAMING_CIRCUIT_BREAKER_OVERHEAD,
475+
StreamingSearchSettings.STREAMING_METRICS_ENABLED,
476+
StreamingSearchSettings.STREAMING_METRICS_INTERVAL,
477+
StreamingSearchSettings.STREAMING_BLOCK_SKIP_THRESHOLD_RATIO,
478+
StreamingSearchSettings.STREAMING_MIN_COMPETITIVE_DOCS,
479+
StreamingSearchSettings.STREAMING_SCORE_MODE,
480+
StreamingSearchSettings.STREAMING_ADAPTIVE_BATCHING,
481+
StreamingSearchSettings.STREAMING_PREDICTIVE_SCORING,
452482
TransportReplicationAction.REPLICATION_INITIAL_RETRY_BACKOFF_BOUND,
453483
TransportReplicationAction.REPLICATION_RETRY_TIMEOUT,
454484
PublishCheckpointAction.PUBLISH_CHECK_POINT_RETRY_TIMEOUT,

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
import org.opensearch.search.slice.SliceBuilder;
106106
import org.opensearch.search.sort.SortAndFormats;
107107
import org.opensearch.search.suggest.SuggestionSearchContext;
108+
import org.opensearch.search.streaming.StreamingSearchSettings;
108109

109110
import java.io.IOException;
110111
import java.io.UncheckedIOException;
@@ -224,6 +225,9 @@ final class DefaultSearchContext extends SearchContext {
224225
private StreamSearchChannelListener listener;
225226
private StreamingSearchMode streamingMode;
226227

228+
// Streaming settings cache (simple read-through to cluster settings for now)
229+
private volatile Integer streamingBatchSizeOverride;
230+
227231
DefaultSearchContext(
228232
ReaderContext readerContext,
229233
ShardSearchRequest request,
@@ -1300,4 +1304,21 @@ public StreamingSearchMode getStreamingMode() {
13001304
public void setStreamingMode(StreamingSearchMode mode) {
13011305
this.streamingMode = mode;
13021306
}
1307+
1308+
@Override
1309+
public int getStreamingBatchSize() {
1310+
// If we have cluster service available, prefer the dynamic setting when present
1311+
try {
1312+
if (streamingBatchSizeOverride != null) {
1313+
return streamingBatchSizeOverride;
1314+
}
1315+
if (clusterService != null) {
1316+
// Use dynamic cluster setting if available; otherwise falls back to default
1317+
return clusterService.getClusterSettings().get(StreamingSearchSettings.STREAMING_BATCH_SIZE);
1318+
}
1319+
} catch (Exception ignore) {
1320+
// Fall through to default
1321+
}
1322+
return 10;
1323+
}
13031324
}

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -816,15 +816,11 @@ private SearchPhaseResult executeQueryPhase(
816816

817817
// NEW: Set streaming mode
818818
if (request.getStreamingSearchMode() != null) {
819-
System.out.println("DEBUG: SearchService - Setting streaming mode: " + request.getStreamingSearchMode());
820819
context.setStreamingMode(StreamingSearchMode.fromString(request.getStreamingSearchMode()));
821-
System.out.println("DEBUG: SearchService - Streaming mode set to: " + context.getStreamingMode());
822820
} else {
823-
System.out.println("DEBUG: SearchService - No streaming mode in request");
824821
// FALLBACK: If this is a streaming search but no mode is set, use a default mode
825822
// This handles the case where ShardSearchRequest is created without copying streaming fields
826823
if (isStreamSearch) {
827-
System.out.println("DEBUG: SearchService - Using fallback streaming mode: NO_SCORING");
828824
context.setStreamingMode(StreamingSearchMode.NO_SCORING);
829825
}
830826
}

server/src/main/java/org/opensearch/search/internal/SearchContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,4 +582,13 @@ public boolean isStreamSearch() {
582582
public boolean isStreamingSearch() {
583583
return getStreamingMode() != null;
584584
}
585+
586+
/**
587+
* Returns the configured batch size for streaming emissions.
588+
* Default implementation returns 10. Implementations may override to pull dynamic settings.
589+
* @return the batch size for streaming collection
590+
*/
591+
public int getStreamingBatchSize() {
592+
return 10;
593+
}
585594
}

server/src/main/java/org/opensearch/search/query/QueryPhase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,6 @@ void postProcess(QuerySearchResult result) throws IOException {
521521
// Check if this is a streaming search request FIRST
522522
if (searchContext.isStreamingSearch()) {
523523
// Use streaming collectors for streaming search
524-
System.out.println("DEBUG: QueryPhase - Using streaming collector for mode: " + searchContext.getStreamingMode());
525524
if (logger.isTraceEnabled()) {
526525
logger.trace("Using streaming collector for mode: {}", searchContext.getStreamingMode());
527526
}

0 commit comments

Comments
 (0)