Skip to content

Commit 89348e5

Browse files
committed
More cleanup
Signed-off-by: Atri Sharma <atri.jiit@gmail.com>
1 parent 5fa2ad7 commit 89348e5

File tree

8 files changed

+107
-44
lines changed

8 files changed

+107
-44
lines changed

server/src/main/java/org/opensearch/action/search/SearchPhaseController.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -848,29 +848,6 @@ QueryPhaseResultConsumer newSearchPhaseResults(
848848
}
849849
}
850850

851-
/**
852-
* Returns a new {@link StreamQueryPhaseResultConsumer} instance that reduces search responses incrementally.
853-
*/
854-
StreamQueryPhaseResultConsumer newStreamSearchPhaseResults(
855-
Executor executor,
856-
CircuitBreaker circuitBreaker,
857-
SearchProgressListener listener,
858-
SearchRequest request,
859-
int numShards,
860-
Consumer<Exception> onPartialMergeFailure
861-
) {
862-
return new StreamQueryPhaseResultConsumer(
863-
request,
864-
executor,
865-
circuitBreaker,
866-
this,
867-
listener,
868-
namedWriteableRegistry,
869-
numShards,
870-
onPartialMergeFailure,
871-
null // No ClusterSettings in this legacy path
872-
);
873-
}
874851

875852
/**
876853
* The top docs statistics

server/src/main/java/org/opensearch/action/search/StreamQueryPhaseResultConsumer.java

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,19 @@
2626
/**
2727
* Query phase result consumer for streaming search.
2828
* Supports progressive batch reduction with configurable scoring modes.
29+
*
30+
* Batch reduction frequency is controlled by per-mode multipliers from cluster settings:
31+
* - NO_SCORING: Immediate reduction (batch size = 1) for fastest time-to-first-byte
32+
* - SCORED_UNSORTED: Small batches controlled by search.streaming.scored_unsorted.batch_multiplier (default: 2)
33+
* - CONFIDENCE_BASED: Moderate batches controlled by search.streaming.confidence.batch_multiplier (default: 3)
34+
* - SCORED_SORTED: Larger batches controlled by search.streaming.scored_sorted.batch_multiplier (default: 10)
35+
*
36+
* These multipliers are applied to the base batch reduce size (typically 5) to determine
37+
* how many shard results are accumulated before triggering a partial reduction. Lower values
38+
* mean more frequent reductions and faster streaming, but higher coordinator CPU usage.
39+
*
40+
* ClusterSettings must be provided (non-null) to enable dynamic configuration. Tests should
41+
* provide a properly configured ClusterSettings instance rather than null.
2942
*
3043
* @opensearch.internal
3144
*/
@@ -37,6 +50,11 @@ public class StreamQueryPhaseResultConsumer extends QueryPhaseResultConsumer {
3750
private final ClusterSettings clusterSettings;
3851
private int resultsReceived = 0;
3952

53+
/**
54+
* Creates a streaming query phase result consumer.
55+
*
56+
* @param clusterSettings cluster settings for dynamic multipliers (must not be null)
57+
*/
4058
public StreamQueryPhaseResultConsumer(
4159
SearchRequest request,
4260
Executor executor,
@@ -62,6 +80,11 @@ public StreamQueryPhaseResultConsumer(
6280
// Initialize scoring mode from request
6381
String mode = request.getStreamingSearchMode();
6482
this.scoringMode = (mode != null) ? StreamingSearchMode.fromString(mode) : StreamingSearchMode.SCORED_SORTED;
83+
84+
// ClusterSettings is required for dynamic configuration
85+
if (clusterSettings == null) {
86+
throw new IllegalArgumentException("ClusterSettings must not be null for StreamQueryPhaseResultConsumer");
87+
}
6588
this.clusterSettings = clusterSettings;
6689
}
6790

@@ -74,7 +97,8 @@ public StreamQueryPhaseResultConsumer(
7497
@Override
7598
int getBatchReduceSize(int requestBatchedReduceSize, int minBatchReduceSize) {
7699
// Handle null during construction (parent constructor calls this before our constructor body runs)
77-
if (scoringMode == null) {
100+
// In this case, clusterSettings is also null, so use a sensible default
101+
if (scoringMode == null || clusterSettings == null) {
78102
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * 10);
79103
}
80104

@@ -84,26 +108,18 @@ int getBatchReduceSize(int requestBatchedReduceSize, int minBatchReduceSize) {
84108
return Math.min(requestBatchedReduceSize, 1);
85109
case SCORED_UNSORTED:
86110
// Small batches for quick emission without sorting overhead
87-
int suMult = clusterSettings != null
88-
? clusterSettings.get(StreamingSearchSettings.STREAMING_SCORED_UNSORTED_BATCH_MULTIPLIER)
89-
: 2;
111+
int suMult = clusterSettings.get(StreamingSearchSettings.STREAMING_SCORED_UNSORTED_BATCH_MULTIPLIER);
90112
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * suMult);
91113
case CONFIDENCE_BASED:
92114
// Moderate batching for progressive emission with confidence
93-
int cMult = clusterSettings != null
94-
? clusterSettings.get(StreamingSearchSettings.STREAMING_CONFIDENCE_BATCH_MULTIPLIER)
95-
: 3;
115+
int cMult = clusterSettings.get(StreamingSearchSettings.STREAMING_CONFIDENCE_BATCH_MULTIPLIER);
96116
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * cMult);
97117
case SCORED_SORTED:
98118
// Higher batch size to collect more results before reducing (sorting is expensive)
99-
int ssMult = clusterSettings != null
100-
? clusterSettings.get(StreamingSearchSettings.STREAMING_SCORED_SORTED_BATCH_MULTIPLIER)
101-
: 10;
119+
int ssMult = clusterSettings.get(StreamingSearchSettings.STREAMING_SCORED_SORTED_BATCH_MULTIPLIER);
102120
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * ssMult);
103121
default:
104-
int defMult = clusterSettings != null
105-
? clusterSettings.get(StreamingSearchSettings.STREAMING_SCORED_SORTED_BATCH_MULTIPLIER)
106-
: 10;
122+
int defMult = clusterSettings.get(StreamingSearchSettings.STREAMING_SCORED_SORTED_BATCH_MULTIPLIER);
107123
return super.getBatchReduceSize(requestBatchedReduceSize, minBatchReduceSize * defMult);
108124
}
109125
}

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,9 +1312,7 @@ AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction(
13121312
);
13131313
break;
13141314
case QUERY_THEN_FETCH:
1315-
System.out.println("DEBUG: isStreamingRequest=" + isStreamingRequest + ", streamSearchTransportService=" + (streamSearchTransportService != null));
13161315
if (isStreamingRequest && streamSearchTransportService != null) {
1317-
System.out.println("DEBUG: Using StreamSearchQueryThenFetchAsyncAction!");
13181316
searchAsyncAction = new StreamSearchQueryThenFetchAsyncAction(
13191317
logger,
13201318
streamSearchTransportService,

server/src/main/java/org/opensearch/search/internal/SearchContext.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -584,9 +584,25 @@ public boolean isStreamingSearch() {
584584
}
585585

586586
/**
587-
* Returns the configured batch size for streaming emissions.
588-
* Default implementation returns 10. Implementations may override to pull dynamic settings.
589-
* @return the batch size for streaming collection
587+
* Returns the configured batch size for streaming document collection.
588+
*
589+
* This value controls how many documents are collected in memory before emission
590+
* during streaming search operations. It reads the dynamic cluster setting
591+
* 'search.streaming.batch_size' (via StreamingSearchSettings.STREAMING_BATCH_SIZE)
592+
* when ClusterSettings is available, with a default of 10 and max of 100.
593+
*
594+
* Default implementation returns 10. Concrete implementations (DefaultSearchContext)
595+
* override this to read from ClusterSettings when available, falling back to 10
596+
* if ClusterSettings is not injected or the setting is not configured.
597+
*
598+
* The batch size affects streaming performance:
599+
* - Smaller values (1-10): Lower latency, more frequent emissions
600+
* - Larger values (50-100): Higher throughput, less network overhead
601+
*
602+
* Note: This value is read once per search context creation and does not update
603+
* dynamically during a search operation.
604+
*
605+
* @return the batch size for streaming collection (default: 10, range: 1-100)
590606
*/
591607
public int getStreamingBatchSize() {
592608
return 10;

server/src/main/java/org/opensearch/search/query/StreamingConfidenceCollectorContext.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,21 @@
2323
import java.util.concurrent.atomic.AtomicLong;
2424

2525
/**
26-
* Collector context for confidence-based streaming mode.
26+
* Streaming collector context for CONFIDENCE_BASED mode.
27+
* Collects documents with scores for progressive confidence-based emission.
28+
*
29+
* Implements memory-bounded collection using a "topK" pattern where the best K
30+
* documents by score are retained. Documents are collected in batches controlled
31+
* by search.streaming.batch_size setting (default: 10, max: 100).
32+
*
33+
* Memory footprint: O(K + batchSize) where K is the requested number of hits.
34+
*
35+
* Circuit Breaker Policy:
36+
* - Batch buffers: No CB checks as they're strictly bounded (10-100 docs) and cleared after emission
37+
* - TopK list: Protected by parent QueryPhaseResultConsumer's circuit breaker during final reduction
38+
* - Max memory per collector: ~8KB for batch (100 docs * 16 bytes) + ~80KB for topK (10000 docs * 16 bytes)
39+
* - Decision rationale: The overhead of CB checks (atomic operations) would exceed the memory saved
40+
* for such small, bounded allocations that are immediately released
2741
*/
2842
public class StreamingConfidenceCollectorContext extends TopDocsCollectorContext {
2943

server/src/main/java/org/opensearch/search/query/StreamingScoredUnsortedCollectorContext.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,21 @@
2121
import java.util.concurrent.atomic.AtomicLong;
2222

2323
/**
24-
* Collector context for scored unsorted streaming mode.
24+
* Streaming collector context for SCORED_UNSORTED mode.
25+
* Collects documents with scores but without sorting for fast emission with relevance.
26+
*
27+
* Implements memory-bounded collection using a "firstK" pattern where only the first K
28+
* documents are retained for the final result. Documents are collected in batches
29+
* controlled by search.streaming.batch_size setting (default: 10, max: 100).
30+
*
31+
* Memory footprint: O(K + batchSize) where K is the requested number of hits.
32+
*
33+
* Circuit Breaker Policy:
34+
* - Batch buffers: No CB checks as they're strictly bounded (10-100 docs) and cleared after emission
35+
* - FirstK list: Protected by parent QueryPhaseResultConsumer's circuit breaker during final reduction
36+
* - Max memory per collector: ~8KB for batch (100 docs * 16 bytes) + ~80KB for firstK (10000 docs * 16 bytes)
37+
* - Decision rationale: The overhead of CB checks (atomic operations) would exceed the memory saved
38+
* for such small, bounded allocations that are immediately released
2539
*/
2640
public class StreamingScoredUnsortedCollectorContext extends TopDocsCollectorContext {
2741

server/src/main/java/org/opensearch/search/query/StreamingSortedCollectorContext.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,22 @@
1818
import java.util.List;
1919

2020
/**
21-
* Collector context for sorted streaming mode.
21+
* Streaming collector context for SCORED_SORTED mode.
22+
* Collects and maintains documents in sorted order (by score or custom sort).
23+
*
24+
* Uses Lucene's TopScoreDocCollectorManager for efficient sorted collection with
25+
* incremental merging. Documents are collected in larger batches (10x default multiplier)
26+
* to amortize sorting costs, controlled by search.streaming.scored_sorted.batch_multiplier.
27+
*
28+
* Memory footprint: O(K) where K is the requested number of hits.
29+
* The TopScoreDocCollector maintains a min-heap of size K.
30+
*
31+
* Circuit Breaker Policy:
32+
* - Heap structure: Protected by TopScoreDocCollector's internal memory management
33+
* - Parent reduction: Protected by QueryPhaseResultConsumer's circuit breaker
34+
* - Max memory per collector: ~80KB for topK heap (10000 docs * 16 bytes)
35+
* - Decision rationale: Sorting requires maintaining all K docs in memory, but Lucene's
36+
* collectors are already optimized for memory efficiency
2237
*/
2338
public class StreamingSortedCollectorContext extends TopDocsCollectorContext {
2439

server/src/main/java/org/opensearch/search/query/StreamingUnsortedCollectorContext.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,19 @@
2525
/**
2626
* Streaming collector context for NO_SCORING mode.
2727
* Collects documents without scoring for fastest emission.
28+
*
29+
* Implements memory-bounded collection using a "firstK" pattern where only the first K
30+
* documents are retained for the final result. Documents are collected in batches
31+
* controlled by search.streaming.batch_size setting (default: 10, max: 100).
32+
*
33+
* Memory footprint: O(K + batchSize) where K is the requested number of hits.
34+
*
35+
* Circuit Breaker Policy:
36+
* - Batch buffers: No CB checks as they're strictly bounded (10-100 docs) and cleared after emission
37+
* - FirstK list: Protected by parent QueryPhaseResultConsumer's circuit breaker during final reduction
38+
* - Max memory per collector: ~8KB for batch (100 docs * 16 bytes) + ~80KB for firstK (10000 docs * 16 bytes)
39+
* - Decision rationale: The overhead of CB checks (atomic operations) would exceed the memory saved
40+
* for such small, bounded allocations that are immediately released
2841
*/
2942
public class StreamingUnsortedCollectorContext extends TopDocsCollectorContext {
3043

0 commit comments

Comments
 (0)