Skip to content

Commit 6fc677b

Browse files
Atri SharmaAtri Sharma
authored andcommitted
Cleanup
Signed-off-by: Atri Sharma <atrisharma@Atris-Mac-Studio.local>
1 parent 0f5c5c2 commit 6fc677b

14 files changed

+15
-88
lines changed

server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.logging.log4j.LogManager;
3636
import org.apache.logging.log4j.Logger;
3737
import org.apache.lucene.search.TopDocs;
38-
import org.apache.lucene.search.TotalHits;
3938
import org.opensearch.common.lease.Releasable;
4039
import org.opensearch.common.lease.Releasables;
4140
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;

server/src/main/java/org/opensearch/action/search/SearchResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
355355
if (getNumReducePhases() != 1) {
356356
builder.field(NUM_REDUCE_PHASES.getPreferredName(), getNumReducePhases());
357357
}
358-
358+
359359
RestActions.buildBroadcastShardsHeader(
360360
builder,
361361
params,

server/src/main/java/org/opensearch/action/search/StreamQueryPhaseResultConsumer.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,11 @@
1616

1717
/**
1818
* Query phase result consumer for streaming search.
19-
* Supports progressive batch reduction with a fixed batch policy suitable for unsorted streaming.
2019
*
2120
* @opensearch.internal
2221
*/
2322
public class StreamQueryPhaseResultConsumer extends QueryPhaseResultConsumer {
2423

25-
26-
2724
/**
2825
* Creates a streaming query phase result consumer.
2926
*/
@@ -49,18 +46,9 @@ public StreamQueryPhaseResultConsumer(
4946
);
5047
}
5148

52-
/**
53-
* Controls partial reduction frequency.
54-
* With NO_SCORING streaming, we reduce immediately for fastest TTFB.
55-
*
56-
* @param requestBatchedReduceSize request batch size
57-
* @param minBatchReduceSize minimum batch size
58-
*/
5949
@Override
6050
int getBatchReduceSize(int requestBatchedReduceSize, int minBatchReduceSize) {
6151
// Reduce immediately for fastest TTFB
6252
return Math.min(requestBatchedReduceSize, 1);
6353
}
6454
}
65-
66-

server/src/main/java/org/opensearch/action/search/StreamSearchQueryThenFetchAsyncAction.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,6 @@ public class StreamSearchQueryThenFetchAsyncAction extends SearchQueryThenFetchA
7373
this.logger = logger;
7474
}
7575

76-
/**
77-
* Override the extension point to create streaming listeners instead of regular
78-
* listeners
79-
*/
8076
@Override
8177
SearchActionListener<SearchPhaseResult> createShardActionListener(
8278
final SearchShardTarget shard,
@@ -145,14 +141,9 @@ protected void onStreamResult(SearchPhaseResult result, SearchShardIterator shar
145141
results.consumeResult(result, next);
146142
}
147143

148-
/**
149-
* Override onShardResult to handle streaming search results safely.
150-
* This prevents the "topDocs already consumed" error when processing
151-
* multiple streaming results from the same shard.
152-
*/
153144
@Override
154145
protected void onShardResult(SearchPhaseResult result, SearchShardIterator shardIt) {
155-
// Safety log: track final shard response receipt in coordinator
146+
// Trace final shard responses to diagnose coordinator sequencing.
156147
if (logger.isTraceEnabled()) {
157148
logger.trace(
158149
"COORDINATOR: received final shard result from shard={}, target={}, totalOps={}, expectedOps={}",
@@ -162,14 +153,9 @@ protected void onShardResult(SearchPhaseResult result, SearchShardIterator shard
162153
expectedTotalOps
163154
);
164155
}
165-
// Always delegate to the parent to ensure shard accounting and phase
166-
// transitions.
167156
super.onShardResult(result, shardIt);
168157
}
169158

170-
/**
171-
* Override successful shard execution to handle stream result synchronization
172-
*/
173159
@Override
174160
void successfulShardExecution(SearchShardIterator shardsIt) {
175161
final int remainingOpsOnIterator;
@@ -194,14 +180,8 @@ void successfulShardExecution(SearchShardIterator shardsIt) {
194180
}
195181
}
196182

197-
/**
198-
* Handle successful stream execution callback
199-
* Since partials are no longer fed into the reducer, this callback is not
200-
* needed for coordination.
201-
*/
202183
private void successfulStreamExecution() {
203-
// No-op: partials are bypassed from reducer, completion is handled by
204-
// successfulShardExecution only
184+
// No-op.
205185
}
206186

207187
}

server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.opensearch.action.search.SearchAction;
4040
import org.opensearch.action.search.SearchContextId;
4141
import org.opensearch.action.search.SearchRequest;
42-
import org.opensearch.action.search.SearchResponse;
4342
import org.opensearch.action.search.StreamSearchAction;
4443
import org.opensearch.action.support.IndicesOptions;
4544
import org.opensearch.common.Booleans;

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
3333
import org.opensearch.search.aggregations.support.ValuesSource;
3434
import org.opensearch.search.internal.SearchContext;
35-
import org.opensearch.search.streaming.StreamingCostMetrics;
3635
import org.opensearch.search.streaming.StreamingCostEstimable;
36+
import org.opensearch.search.streaming.StreamingCostMetrics;
3737

3838
import java.io.IOException;
3939
import java.util.ArrayList;

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -514,9 +514,7 @@ public StreamingCostMetrics estimateStreamingCost(SearchContext context) {
514514
return StreamingCostMetrics.nonStreamable();
515515
}
516516

517-
// Calculate metrics (simplified for now to match StreamNumericTermsAggregator
518-
// pattern essentially)
519-
// But we need total bucket count from ordinals
517+
// Estimate streaming cost using segment ordinals.
520518
ValuesSource.Bytes.WithOrdinals ordinalValuesSource = (ValuesSource.Bytes.WithOrdinals) valuesSource;
521519
List<LeafReaderContext> leaves = context.searcher().getIndexReader().leaves();
522520
long maxCardinality = 0;

server/src/main/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
import org.opensearch.search.aggregations.support.ValuesSource;
1717
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
1818
import org.opensearch.search.internal.SearchContext;
19-
import org.opensearch.search.streaming.StreamingCostMetrics;
2019
import org.opensearch.search.streaming.StreamingCostEstimable;
20+
import org.opensearch.search.streaming.StreamingCostMetrics;
2121

2222
import java.io.IOException;
2323
import java.util.List;

server/src/main/java/org/opensearch/search/query/StreamingSearchMode.java

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,38 +11,28 @@
1111
import org.opensearch.common.annotation.ExperimentalApi;
1212

1313
/**
14-
* Defines the different streaming search strategies based on the design-by-case approach.
15-
* Each mode optimizes for different use cases and performance characteristics.
14+
* Streaming search mode selector.
15+
*
16+
* <p>Only {@link #NO_SCORING} is active in the current streaming implementation.
17+
* Other values are retained for request parsing/backward compatibility.
1618
*
1719
* @opensearch.internal
1820
*/
1921
@ExperimentalApi
2022
public enum StreamingSearchMode {
2123

2224
/**
23-
* Case 1: No scoring, no sorting - fastest TTFB
24-
* - Shard collector: StreamingUnsortedCollector
25-
* - Ring buffer with batch emission
26-
* - Round-robin merge at coordinator
27-
* - Best for: simple filtering, counting, exists queries
25+
* No scoring and no sorting.
2826
*/
2927
NO_SCORING("no_scoring"),
3028

3129
/**
32-
* Case 2: Full scoring + explicit sort - production ready
33-
* - Shard collector: StreamingSortedCollector
34-
* - WAND/Block-Max WAND with windowed top-K heap
35-
* - K-way streaming merge at coordinator
36-
* - Best for: scored searches with sorting
30+
* Retained for backward compatibility.
3731
*/
3832
SCORED_SORTED("scored_sorted"),
3933

4034
/**
41-
* Case 3: Full scoring, no sorting - moderate performance
42-
* - Shard collector: StreamingScoredUnsortedCollector
43-
* - Ring buffer with scoring
44-
* - No merge needed at coordinator
45-
* - Best for: scored searches without sorting
35+
* Retained for backward compatibility.
4636
*/
4737
SCORED_UNSORTED("scored_unsorted");
4838

@@ -68,7 +58,6 @@ public static StreamingSearchMode fromString(String mode) {
6858
return NO_SCORING; // Default
6959
}
7060

71-
// Backward compatibility: coerce older scored modes into NO_SCORING
7261
if ("SCORED_UNSORTED".equalsIgnoreCase(mode) || "SCORED_SORTED".equalsIgnoreCase(mode) || "NO_SCORING".equalsIgnoreCase(mode)) {
7362
return NO_SCORING;
7463
}
@@ -78,7 +67,7 @@ public static StreamingSearchMode fromString(String mode) {
7867
return m;
7968
}
8069
}
81-
70+
8271
throw new IllegalArgumentException("Unknown StreamingSearchMode: " + mode);
8372
}
8473

@@ -98,8 +87,6 @@ public boolean requiresSorting() {
9887
return false; // No sorting is supported in the current mode
9988
}
10089

101-
// Confidence-based mode removed in this branch
102-
10390
@Override
10491
public String toString() {
10592
return value;

server/src/main/java/org/opensearch/search/query/StreamingUnsortedCollectorContext.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,6 @@
2929
/**
3030
* Streaming collector context for NO_SCORING mode.
3131
* Collects documents without scoring for fastest emission.
32-
*
33-
* Implements memory-bounded collection using a "firstK" pattern where only the first K
34-
* documents are retained for the final result. Documents are collected in batches
35-
* controlled by search.streaming.batch_size setting (default: 10, max: 100).
36-
*
37-
* Memory footprint: O(K + batchSize) where K is the requested number of hits.
38-
*
39-
* Circuit Breaker Policy:
40-
* - Batch buffers: No CB checks as they're strictly bounded (10-100 docs) and cleared after emission
41-
* - FirstK list: Protected by parent QueryPhaseResultConsumer's circuit breaker during final reduction
42-
* - Max memory per collector: ~8KB for batch (100 docs * 16 bytes) + ~80KB for firstK (10000 docs * 16 bytes)
43-
* - Decision rationale: The overhead of CB checks (atomic operations) would exceed the memory saved
44-
* for such small, bounded allocations that are immediately released
4532
*/
4633
public class StreamingUnsortedCollectorContext extends TopDocsCollectorContext {
4734

0 commit comments

Comments
 (0)